#java #reactive-programming #spring-webflux #project-reactor
Вопрос:
Учитывая следующее MyObject
, и Flux<MyObject>
каков наилучший способ удалить MyObjects
с помощью того же самого свойства из этого потока?
import lombok.Data;
import reactor.core.publisher.Flux;
public class Example {
@Data
public class MyObject {
final String name;
final int priority;
}
public Example() {
Flux<MyObject> myFlux = Flux.just(
new MyObject("abc", 2),
new MyObject("abc", 4),
new MyObject("cde", 1));
}
}
Например, я хочу удалить объекты с одинаковыми name
, выбирая объекты с более высокими priority
.
Выход: [Example.MyObject(name=abc, priority=4), Example.MyObject(name=cde, priority=1)]
Если я использую myFlux.distinct(MyObject::getName)
, я не смогу выбрать, какой из них оставить.
Ответ №1:
Чтобы решить эту проблему, вам сначала нужно преобразовать ее Flux<MyObject>
в а Mono<List<MyObject>>
, потому что вам нужно знать все объекты и их приоритеты, чтобы отсортировать их.
Как только у вас появится список всех экземпляров MyObject
, вы сможете использовать API Java 8 Stream для решения этой проблемы:
@Slf4j
public class Example {
public static void main(String[] args) {
Flux<MyObject> myFlux = Flux.just(
new MyObject("abc", 2),
new MyObject("abc", 4),
new MyObject("cde", 1))
.collectList()
.map(myObjectsList -> myObjectsList.stream()
.collect(Collectors
.groupingBy(MyObject::getName)))
// now we have a Map<String, List<MyObject>>
.map(Map::entrySet)
// now we have a Set<Entry<String, List<MyObject>>>
.flatMapIterable(entrySet -> entrySet)
.map(Map.Entry::getValue)
// now we have a Flux<List<MyObject>>
// and all MyObject in that list have
// the same name
.filter(allObjectsWithSameName -> !allObjectsWithSameName.isEmpty())
// now we sort all the lists in descending order
// and return the first element
// which is the one with the highest prio
.map(allObjectsWithSameName -> {
allObjectsWithSameName.sort(new Comparator<MyObject>() {
@Override
public int compare(MyObject o1, MyObject o2) {
return Integer.compare(o2.priority, o1.priority);
}
});
return allObjectsWithSameName.get(0);
}
);
myFlux.subscribe(result -> System.out.println("MyObject: " result.toString()));
}
@Data
@RequiredArgsConstructor
public static class MyObject {
final String name;
final int priority;
}
}
Выход:
MyObject: Example.MyObject(name=abc, priority=4)
MyObject: Example.MyObject(name=cde, priority=1)
Ответ №2:
Вы можете достичь этого с помощью groupBy
reduce
операторов и на Flux
:
Flux.just(
new MyObject("abc", 2),
new MyObject("abc", 4),
new MyObject("cde", 1))
.groupBy(MyObject::getName)
.flatMap(group -> group.reduce((o1, o2) -> o1.getPriority() > o2.getPriority() ? o1 : o2))
.subscribe(System.out::println);
Одним из важных соображений является то, что это хорошо работает только в том случае, если количество групп невелико, иначе это может привести к тупику. В качестве средства правовой защиты вы можете установить maxConcurrency
для параметра of flatMap
более высокое значение.
См. Документацию groupBy
оператора:
Группы необходимо слить и использовать ниже по течению, чтобы groupBy работал правильно. В частности, когда критерии создают большое количество групп, это может привести к зависанию, если группы не будут надлежащим образом потребляться ниже по течению (например. из-за плоского изображения с параметром maxConcurrency, который установлен слишком низко).