Есть ли способ повторить поток, найти, существуют ли его элементы, или сохранить их в бд?

#spring #spring-webflux #project-reactor

Вопрос:

У меня возникли проблемы при отображении потока элементов. чего я пытаюсь добиться, так это проверить, сохранены ли элементы уже в базе данных, или же сохранить их. Это сработало бы, если бы элементы уже присутствовали в базе данных, но если есть новые записи и некоторые из них дублируются (а я не могу удалить дубликаты, потому что они понадобятся мне позже), операция поиска не сработает.

Например, учитывая поток, который выдает [«сервис», «сервис», «продукты»] и пустую базу данных, произойдет то, что мой код сохранит все, в то время как я ожидал бы сохранить только [«сервис»,»продукты»]. Это происходит, так как реактор не будет охотно экономить, пока плоская карта потока не будет завершена.

Есть ли способ убедиться, что вложенная операция сохранения завершится до того, как поток создаст следующий элемент?

         Flux<CsvBean> csvBeanFlux = byteArrayInputStreamMono.flatMapIterable(this::parseCsv);

    return csvBeanFlux.flatMap(csvBean -> {
        DictionaryFilter level1Filter = generateDictionaryFilter(dictionary, csvBean.getLevel1(), locale);
        return dictionaryService
                .findOne(level1Filter).log("FIND LEVEL1")
                .switchIfEmpty(mongoTemplate.save(generateCodeValue(locale, csvBean.getLevel1(), UUID.randomUUID().toString(), null), dictionary).log("SAVE LEVEL1"));
    });
 

Комментарии:

1. Вы можете попробовать concatMap вместо flatMap. Он будет обрабатывать элементы последовательно.

2. Это сработало! вы спасли меня после того, как я провел неделю, просматривая справочные документы, и мне удалось найти только flatMapSequence, который не работал. Спасибо!