Как выбрать максимальное количество элементов в группе из потока

#java #reactive-programming #spring-webflux #project-reactor

Вопрос:

Учитывая следующее MyObject , и Flux<MyObject> каков наилучший способ удалить MyObjects с помощью того же самого свойства из этого потока?

     import lombok.Data;
    import reactor.core.publisher.Flux;
    
    public class Example {
    
        @Data
        public class MyObject {
            final String name;
            final int priority;
        }
    
        public Example() {
            Flux<MyObject> myFlux = Flux.just(
                    new MyObject("abc", 2),
                    new MyObject("abc", 4),
                    new MyObject("cde", 1));
        }
    }
 

Например, я хочу удалить объекты с одинаковыми name , выбирая объекты с более высокими priority .
Выход: [Example.MyObject(name=abc, priority=4), Example.MyObject(name=cde, priority=1)]

Если я использую myFlux.distinct(MyObject::getName) , я не смогу выбрать, какой из них оставить.

Ответ №1:

Чтобы решить эту проблему, вам сначала нужно преобразовать ее Flux<MyObject> в а Mono<List<MyObject>> , потому что вам нужно знать все объекты и их приоритеты, чтобы отсортировать их.

Как только у вас появится список всех экземпляров MyObject , вы сможете использовать API Java 8 Stream для решения этой проблемы:

 @Slf4j
public class Example {

    public static void main(String[] args) {
        Flux<MyObject> myFlux = Flux.just(
                        new MyObject("abc", 2),
                        new MyObject("abc", 4),
                        new MyObject("cde", 1))
                .collectList()
                .map(myObjectsList -> myObjectsList.stream()
                        .collect(Collectors
                                .groupingBy(MyObject::getName)))
                // now we have a Map<String, List<MyObject>>
                .map(Map::entrySet)
                // now we have a Set<Entry<String, List<MyObject>>>
                .flatMapIterable(entrySet -> entrySet)
                .map(Map.Entry::getValue)
                // now we have a Flux<List<MyObject>>
                // and all MyObject in that list have 
                // the same name
                .filter(allObjectsWithSameName -> !allObjectsWithSameName.isEmpty())
                // now we sort all the lists in descending order
                // and return the first element
                // which is the one with the highest prio
                .map(allObjectsWithSameName -> {
                            allObjectsWithSameName.sort(new Comparator<MyObject>() {
                                @Override
                                public int compare(MyObject o1, MyObject o2) {
                                    return Integer.compare(o2.priority, o1.priority);
                                }
                            });
                            return allObjectsWithSameName.get(0);
                        }
                );

        myFlux.subscribe(result -> System.out.println("MyObject: "   result.toString()));
    }

    @Data
    @RequiredArgsConstructor
    public static class MyObject {
        final String name;
        final int priority;
    }
}

 

Выход:

 MyObject: Example.MyObject(name=abc, priority=4)
MyObject: Example.MyObject(name=cde, priority=1)
 

Ответ №2:

Вы можете достичь этого с помощью groupBy reduce операторов и на Flux :

 Flux.just(
        new MyObject("abc", 2),
        new MyObject("abc", 4),
        new MyObject("cde", 1))
    .groupBy(MyObject::getName)
    .flatMap(group -> group.reduce((o1, o2) -> o1.getPriority() > o2.getPriority() ? o1 : o2))
    .subscribe(System.out::println);
 

Одним из важных соображений является то, что это хорошо работает только в том случае, если количество групп невелико, иначе это может привести к тупику. В качестве средства правовой защиты вы можете установить maxConcurrency для параметра of flatMap более высокое значение.

См. Документацию groupBy оператора:

Группы необходимо слить и использовать ниже по течению, чтобы groupBy работал правильно. В частности, когда критерии создают большое количество групп, это может привести к зависанию, если группы не будут надлежащим образом потребляться ниже по течению (например. из-за плоского изображения с параметром maxConcurrency, который установлен слишком низко).