ReactiveMongoDB saveAll () с одновременным сохранением 100 тыс. документов эффективные альтернативы?

#spring-webflux

#spring-webflux

Вопрос:

Я настроил несколько @Scheduled() заданий в своем проекте WebFlux, и в каждом задании я извлекаю некоторые данные из внешних API и сохраняю некоторые документы с помощью ReactiveMongoDB.

Каждое задание пытается сохранить около 100000 документов за раз, используя ReactiveMongoRepository.saveAll() .

Проблема в этом случае для выполнения этой задачи требуется слишком много времени: около 2 часов.

Каждый документ выглядит следующим образом:

 {
    "_id" : ObjectId("5fb16de94761926f75940599"),
    "subdivision1ISOCode" : "DH",
    "subdivision1Name" : "Dhamār",
    "subdivision2ISOCode" : "",
    "subdivision2Name" : "",
    "cityName" : "Dhamar",
    "timeZone" : "Asia/Aden",
    "geoNameID" : 76184,
    "localeCode" : "en",
    "continentCode" : "AS",
    "continentName" : "Asia",
    "countryISOCode" : "YE",
    "countryName" : "Yemen",
    "europeanUnion" : false,
    "_class" : "project.domain.maxmind.location.GeoLite2CityLocation"
}
  

Есть ли лучшая / более быстрая альтернатива для одновременного сохранения всех этих документов?

Обновить

 @Scheduled(fixedDelay = Milliseconds.DAY)
private void synchronizeGeoLite2CityLocations() {
    this.updateFlux().subscribe();
}

private Flux<Tuple3<GeoLite2CityLocation, GeoLite2CityNetworkBlockIPv4, GeoLite2CityNetworkBlockIPv6>> updateFlux() {
        Flux<GeoLite2CityLocation> cityLocationFlux = this.geoLite2CityLocationService
                .saveAll(this.maxMindAPIService.getCityLocations())
                .onErrorContinue(DuplicateKeyException.class, (e, o) -> {
                });

        Flux<GeoLite2CityNetworkBlockIPv4> cityBlockIPv4Flux = this.geoLite2CityNetworkBlockIPv4Service
                .saveAll(this.maxMindAPIService.getCityBlocksIPv4())
                .onErrorContinue(DuplicateKeyException.class, (e, o) -> {
                });

        Flux<GeoLite2CityNetworkBlockIPv6> cityBlockIPv6Flux = this.geoLite2CityNetworkBlockIPv6Service
                .saveAll(this.maxMindAPIService.getCityBlocksIPv6())
                .onErrorContinue(DuplicateKeyException.class, (e, o) -> {
                });

        return Flux.zip(cityLocationFlux, cityBlockIPv4Flux, cityBlockIPv6Flux);
}
  

где saveAll метод вызывает ReactiveCrudRepository<T, ID>

 <S extends T> Flux<S> saveAll(Publisher<S> var1);
  

Комментарии:

1. не могли бы вы опубликовать свою реализацию.

2. @Toerktumlare, только что обновил вопрос с помощью кода

3. и как выглядит API, который вы извлекаете из maxMindAPIService.getCityLocations() какого типа API, rest? блокировка?