#java #project-reactor #flatmap
#java #проект-реактор #flatmap
Вопрос:
Я хочу сгенерировать чередующийся поток из двух типов объектов.
У меня есть два вспомогательных метода
Aaa genAaa()
(илиMono<Aaa>
?) для создания одногоAaa
иFlux<Bbb> genBbbs(Aaa a)
чтобы создать наборBbb
s от доa
.
Общий результат должен быть таким Flux<JsonNode>
, чтобы обеспечить смешивание двух типов объектов.
Итак, результат будет примерно таким
[ {'name':'Aaa-X'},
{'name':'Bbb-x1'},
{'name':'Bbb-x2'},
{'name':'Aaa-Y'},
{'name':'Bbb-y1'},
{'name':'Bbb-y2'},
{'name':'Bbb-y3'}
]
В качестве грубого наброска я попробовал это:
final ObjectMapper om = new ObjectMapper();
public Flux<JsonNode> create() {
return Flux.range(0, 2) // create 2
.map( idx -> genAaa() ) // bare Aaa's
.flatMap( a -> genBbbs(a) ) // bare Aaa to Flux<Bbb> ???
.map( om::valueToTree ); // anything to JsonNode
}
Но у меня здесь есть несколько больших проблем:
Поскольку я преобразую Aaa
объекты (и, следовательно, использую их), их больше нет в результате. Я понятия не имею, как я могу «использовать» и сохранить их в этом сценарии.
Я подумал, не мог бы я передать «поток в процессе» в качестве параметра функциям генерации, чтобы каждый из них добавлял JsonNodes
по мере их создания, но это кажется неправильным (совершенно не асинхронным), и я бы все равно не стал. Я полагаю, что в потоках есть концепция, которая все еще ускользает от меня.
Ответ №1:
Вы можете использовать Flux#concat
вместе с genBbbs
методом внутри функции, переданной flatMap
:
private static Flux<JsonNode> combine() {
ObjectMapper objectMapper = new ObjectMapper();
return Flux.concat(getAaa("a0"), getAaa("a1"))// Flux<Aaa>
.flatMap(aaa -> Flux.concat(Mono.just(aaa), getBbbs(aaa))) // Flux<Object>
.map(objectMapper::valueToTree); // Flux<JsonNode>
}
concat
метод просто объединяет два источника:
- Искусственно созданный с использованием
Mono.just
Flux<B>
ИзgetBbbs(aaa)
вызова
Пример вывода:
{"name":"a0"}
{"name":"B1-a0"}
{"name":"B2-a0"}
{"name":"a1"}
{"name":"B1-a1"}
{"name":"B2-a1"}
Полный список:
public class Main {
@AllArgsConstructor
@Data
private static class Aaa {
private String name;
}
@AllArgsConstructor
@Data
private static class Bbb {
private String name;
}
private static Mono<Aaa> getAaa(String name) {
return Mono.just(new Aaa(name));
}
private static Flux<Bbb> getBbbs(Aaa aaa) {
return Flux.just(new Bbb("B1-" aaa.getName()), new Bbb("B2-" aaa.getName()));
}
public static void main(String[] args) {
combine().subscribe(System.out::println);
}
private static Flux<JsonNode> combine() {
ObjectMapper objectMapper = new ObjectMapper();
return Flux.concat(getAaa("a0"), getAaa("a1"))// Flux<Aaa>
.flatMap(aaa -> Flux.concat(Mono.just(aaa), getBbbs(aaa))) // Flux<Object>
.map(objectMapper::valueToTree); // Flux<JsonNode>
}
}
Комментарии:
1. Я определенно многому научился, глядя на ваше решение. Я применю это к своему мозгу и своему конкретному коду и вернусь к этому.