Производительность пакетной обработки java Apache Flink для массового обновления mongodb

#mongodb #flink-batch

Вопрос:

Эксперты,

Я пытаюсь выполнить некоторую операцию ETL с большим набором данных в виде пакетной обработки. Мое требование состоит в том , чтобы извлечь данные, преобразовать их и затем сохранить в MongoDB. Я использую Apache FLINK, но производительность очень низкая, так как я выполняю обновление MongoDB в каждой строке.

Есть ли какой-либо способ, которым мы можем потопить как массовую запись, чтобы производительность могла увеличиться.Как и после всех преобразований, мы делаем массовое обновление MongoDB. Мы можем объединить их все и, наконец, поместить их в БД точно так же, как stream [.aggregate() .sink({массовое обновление})]

 
private DataEnrichmentDO submitToFlinkJobManager(ExecutionEnvironment env,
            List<Tuple2<String, Integer>> inputCollection,long collectionSize) throws Exception  {
        
        try {
            DataSet<Tuple2<String, Integer>> inputCollectionData = env.fromCollection(inputCollection);
            DataSet<String> enrichmentContext = env.fromElements(this.clientContext.toString(),
                    this.collectionContext.toString(), this.enrichColumnDefinitions.toString(),
                    this.lookupDefinitions.toString(), this.quantityUnitConversions.toString(),
                    this.technicalDataTypes.toString(), this.errorContext.toString(), this.errorCodeCache.toString(),
                    this.subCurrencyConversions.toString());
            List<DataEnrichmentDO> result = inputCollectionData
                                            .rebalance()
                                            .map(new DataEnrichmentExpressionEvaluator())
                                            .withBroadcastSet(enrichmentContext, "enrichmentContext")
                                            .collect();

 

можем ли мы собрать всю коллекцию после преобразования, а затем выполнить массовое обновление MongoDB. в настоящее время в функции карты я выполняю операцию обновления. Я установил параллелизм на 8 { setParallelism(8);}

Комментарии:

1. Как вы записываете данные в MongoDB? Вы всегда можете реализовать свой собственный приемник, который работает в пакетном режиме.