Как развернуть несколько потоков и сохранить оригиналы?

#java #project-reactor #flatmap

#java #проект-реактор #flatmap

Вопрос:

Я хочу сгенерировать чередующийся поток из двух типов объектов.

У меня есть два вспомогательных метода

  • Aaa genAaa() (или Mono<Aaa> ?) для создания одного Aaa и
  • Flux<Bbb> genBbbs(Aaa a) чтобы создать набор Bbb s от до a .

Общий результат должен быть таким Flux<JsonNode> , чтобы обеспечить смешивание двух типов объектов.

Итак, результат будет примерно таким

 [ {'name':'Aaa-X'}, 
    {'name':'Bbb-x1'},
    {'name':'Bbb-x2'},
  {'name':'Aaa-Y'},
    {'name':'Bbb-y1'},
    {'name':'Bbb-y2'},
    {'name':'Bbb-y3'}
]
 

В качестве грубого наброска я попробовал это:

 final ObjectMapper om = new ObjectMapper();

public Flux<JsonNode> create() {
  return Flux.range(0, 2)       // create 2
    .map( idx -> genAaa() )     // bare Aaa's
    .flatMap( a -> genBbbs(a) ) // bare Aaa to Flux<Bbb> ???
    .map( om::valueToTree );    // anything to JsonNode
}
 

Но у меня здесь есть несколько больших проблем:

Поскольку я преобразую Aaa объекты (и, следовательно, использую их), их больше нет в результате. Я понятия не имею, как я могу «использовать» и сохранить их в этом сценарии.

Я подумал, не мог бы я передать «поток в процессе» в качестве параметра функциям генерации, чтобы каждый из них добавлял JsonNodes по мере их создания, но это кажется неправильным (совершенно не асинхронным), и я бы все равно не стал. Я полагаю, что в потоках есть концепция, которая все еще ускользает от меня.

Ответ №1:

Вы можете использовать Flux#concat вместе с genBbbs методом внутри функции, переданной flatMap :

 private static Flux<JsonNode> combine() {
    ObjectMapper objectMapper = new ObjectMapper();
    return Flux.concat(getAaa("a0"), getAaa("a1"))// Flux<Aaa>
            .flatMap(aaa -> Flux.concat(Mono.just(aaa), getBbbs(aaa))) // Flux<Object>
            .map(objectMapper::valueToTree); // Flux<JsonNode>
}
 

concat метод просто объединяет два источника:

  1. Искусственно созданный с использованием Mono.just
  2. Flux<B> Из getBbbs(aaa) вызова

Пример вывода:

 {"name":"a0"}
{"name":"B1-a0"}
{"name":"B2-a0"}
{"name":"a1"}
{"name":"B1-a1"}
{"name":"B2-a1"}
 

Полный список:

 public class Main {
    @AllArgsConstructor
    @Data
    private static class Aaa {
        private String name;
    }

    @AllArgsConstructor
    @Data
    private static class Bbb {
        private String name;
    }

    private static Mono<Aaa> getAaa(String name) {
        return Mono.just(new Aaa(name));
    }

    private static Flux<Bbb> getBbbs(Aaa aaa) {
        return Flux.just(new Bbb("B1-"   aaa.getName()), new Bbb("B2-"   aaa.getName()));
    }


    public static void main(String[] args) {
        combine().subscribe(System.out::println);
    }

    private static Flux<JsonNode> combine() {
        ObjectMapper objectMapper = new ObjectMapper();
        return Flux.concat(getAaa("a0"), getAaa("a1"))// Flux<Aaa>
                .flatMap(aaa -> Flux.concat(Mono.just(aaa), getBbbs(aaa))) // Flux<Object>
                .map(objectMapper::valueToTree); // Flux<JsonNode>
    }
}
 

Комментарии:

1. Я определенно многому научился, глядя на ваше решение. Я применю это к своему мозгу и своему конкретному коду и вернусь к этому.