Как добавить в список во вложенных плоских картах, используя разных издателей с реактором

#project-reactor

#проект-реактор

Вопрос:

Я новичок в Reactor, и я хотел бы знать, как правильно реализовать эту простую логику. Я хочу сравнить две коллекции элементов, и если элементы совпадают, я хочу добавить их в Mono<Список>. Элементы сопоставляются, если они равны или если элемент является родительским для другого элемента.

Мой метод тестирования иногда проходит, а в других случаях терпит неудачу с неправильным количеством элементов в Mono<Список>. Я не могу понять, в чем проблема. Кажется, что когда я запускаю тесты junit для класса с помощью этого единственного метода тестирования, он проходит. Однако, когда я использую другие методы тестирования для тестирования разных входов / выходов в том же классе, иногда происходит сбой с неправильными элементами в списке. Как будто тестовый метод завершился раньше, до того, как список был обновлен всеми добавленными элементами.

 package test;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

public class ReactorBasicsTests {
    
    @Test
    public void testMatching() {        
        Mono<List<String>> matched = matching(
                Arrays.asList("fruit", "vegetable", "meat"),
                Arrays.asList("apple", "orange", "carrot", "meat"));
        StepVerifier.create(matched)
            .consumeNextWith(list -> {
                assertThat(list).containsExactlyInAnyOrder(
                        "fruit apple", "fruit orange", "vegetable carrot", "meat meat");
            })
            .expectComplete()
            .verify();
    }
    
    /**
     * Items are matched when:
     * 1) they are equal (meat equals to meat) 
     * 2) item is parent of child item (fruit is parent of apple)   
     */
    public Mono<List<String>> matching(List<String> list1, List<String> list2) {
        Flux<String> items1 = Flux.fromIterable(list1);
        Flux<String> items2 = Flux.fromIterable(list2);
        List<String> matched = new ArrayList<>();
        
        return items1
            .flatMap(item1 -> items2
                    .flatMap(item2 -> {
                            // Check if item1 is equal to item2
                            Mono<Void> isEqual = Mono.fromSupplier(() -> {
                                if (item1.equals(item2)) {
                                    matched.add(item1   " "   item2);
                                }
                                return null; // I have to return something to compile the code
                            }).then();
                            
                            // Check if item1 is parent of item2
                            Mono<Void> isParent = this.getParents(item2).hasElement(item1)
                                    .flatMap(booleanValue -> {
                                        if (booleanValue) {
                                            matched.add(item1   " "   item2);
                                        }
                                        return Mono.empty();
                                    });
                            
                            return isEqual.then(isParent).then();
                    })
            ).then(Flux.fromIterable(matched).collectList());
    }
    
    /** This method simulates reactive database repository request that returns Flux<String> **/
    public Flux<String> getParents(String item) {       
        if (item.equals("apple") || item.equals("orange")) return Flux.just("fruit", "food");
        else if (item.equals("carrot")) return Flux.just("vegetable", "food");
        return Flux.empty();
    }       
    
}

  

Комментарии:

1. Зачем вам нужен Mono<Список<Строка>>… Я всегда думаю, что Flux<Строка> — это правильный тип для описания списка<>

2. Мне не нужен Mono<Список<Строка>>. Я переработал его в Mono<List> из Flux<String>, потому что надеялся, что это поможет мне решить мою проблему. Но это не сработало.

3. Я заметил ConcurrentModificationException в трассировке стека. Кажется, что больше потоков пытаются добавить в мой список, и это корень всего зла. Итак, я думаю, мне нужно переработать свой код, чтобы список обновлялся по-другому. Или, может быть, я могу каким-то образом вернуть Flux<String> и забыть о Списке.