#mongodb #flink-batch
Вопрос:
Эксперты,
Я пытаюсь выполнить некоторую операцию ETL с большим набором данных в виде пакетной обработки. Мое требование состоит в том , чтобы извлечь данные, преобразовать их и затем сохранить в MongoDB. Я использую Apache FLINK, но производительность очень низкая, так как я выполняю обновление MongoDB в каждой строке.
Есть ли какой-либо способ, которым мы можем потопить как массовую запись, чтобы производительность могла увеличиться.Как и после всех преобразований, мы делаем массовое обновление MongoDB. Мы можем объединить их все и, наконец, поместить их в БД точно так же, как stream [.aggregate() .sink({массовое обновление})]
private DataEnrichmentDO submitToFlinkJobManager(ExecutionEnvironment env,
List<Tuple2<String, Integer>> inputCollection,long collectionSize) throws Exception {
try {
DataSet<Tuple2<String, Integer>> inputCollectionData = env.fromCollection(inputCollection);
DataSet<String> enrichmentContext = env.fromElements(this.clientContext.toString(),
this.collectionContext.toString(), this.enrichColumnDefinitions.toString(),
this.lookupDefinitions.toString(), this.quantityUnitConversions.toString(),
this.technicalDataTypes.toString(), this.errorContext.toString(), this.errorCodeCache.toString(),
this.subCurrencyConversions.toString());
List<DataEnrichmentDO> result = inputCollectionData
.rebalance()
.map(new DataEnrichmentExpressionEvaluator())
.withBroadcastSet(enrichmentContext, "enrichmentContext")
.collect();
можем ли мы собрать всю коллекцию после преобразования, а затем выполнить массовое обновление MongoDB. в настоящее время в функции карты я выполняю операцию обновления. Я установил параллелизм на 8 { setParallelism(8);}
Комментарии:
1. Как вы записываете данные в MongoDB? Вы всегда можете реализовать свой собственный приемник, который работает в пакетном режиме.