#java #mongodb #apache-beam #dataflow
Вопрос:
Я новичок в потоке данных/Apchebeam. Я работаю над получением данных из MongoDB. Соединение MongoDB работает нормально, но я не могу применить фильтры. Выдает следующую ошибку. Я не уверен, что это правильный способ фильтрации данных. Любые предложения были бы полезны.
Ошибка
2021-06-07 10:37:34.615 Сообщение CESTError от рабочего: java.lang.ClassCastException: com.test.поток данных.дофнс.MongodDbQueryFn не может быть передан в org.apache.beam.sdk.io.mongodb.AggregationQuery org.apache.beam.sdk.io.mongodb.MongoDbIO$BoundedMongoDbSource.split(MongoDbIO.java:522)
Код: Разъем MongoDbIO:
return pipeline.apply(MongoDbIO.read()
.withUri("mongodb://".concat(databaseDetails.getDatabaseHostName()).concat(":").concat(databaseDetails.getPort()))
.withDatabase(databaseDetails.getDatabaseName())
.withCollection(objectDetails.getObjectName())
.withQueryFn(new MongodDbQueryFn("name","Mahesh")));
Запрос PTransform
package com.test.dataflow.dofns;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.bson.Document;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
public class MongodDbQueryFn implements SerializableFunction<MongoCollection<Document>, MongoCursor<Document>> {
/**
*
*/
private static final long serialVersionUID = 1L;
private String keyName;
private String KeyValue;
public MongodDbQueryFn(String keyName, String KeyValue) {
this.keyName = keyName;
this.KeyValue = KeyValue;
}
@Override
public MongoCursor<Document> apply(MongoCollection<Document> input) {
return input.find(com.mongodb.client.model.Filters.eq(keyName, KeyValue)).iterator();
}
}
Комментарии:
1. Есть ли что-то еще в трассировке стека? Объясняет ли это, почему он не может привести MongoDbQueryFn к запросу агрегации? Потому что мне кажется, что это должно сработать. Может быть, это так же просто, как реализовать запрос агрегации вместо сериализуемой функции?
2. Спасибо за ответ. В журнале почти нет информации. У вас уже есть какие-либо примеры для AggregationQuery?
Ответ №1:
Документация для withQueryFn
этого не очень хорошо объясняет это, но, прочитав код MongoDbIO, кажется, что MongoDbIO.Read
предполагается, что QueryFn
значение равно AggregationQuery
или FindQuery
. Ошибка , которую вы получаете, возникает из-за того, что код проверяет, является ли запрос a FindQuery
, получает значение false, а затем предполагает, что это an AggregationQuery
, и пытается его выполнить.
Лучшим решением для вас было бы использовать FindQuery
то же поведение, что и то, которое вы написали, следующим образом:
.withQueryFn(FindQuery.create().withFilters(Filters.eq("name", "Mahesh"))));
Комментарии:
1. В качестве примечания я отправил ошибку запуска в Apache Beam Jira, чтобы улучшить документацию.
2. Большое спасибо, это хорошо работает, я боролся с этой проблемой с прошлой недели. Я новичок в apache beam и действительно чувствую, что мне трудно понять и что использовать, когда и не получая больше примеров из Интернета, по крайней мере, для реализации Java. Любые предложения, обмен ссылками или примерами были бы полезны для быстрого получения знаний об этом.
Ответ №2:
Извините, что я отвечаю слишком поздно, но, если вы или кто-либо еще все еще ищете решение для обработки нескольких сценариев критериев фильтрации, в том числе с (И/ИЛИ/И т. Д.).
Способ фильтрации Filters.eq создаст дополнительные накладные расходы(во избежание рекурсивного построения нескольких случаев эквалайзера и эквалайзера и т. Д.).
Мое предложение было бы более в пользу обращения монго.Строка JSON для документирования Хорошая часть заключается в том, что документ mongo принимает json с разделителем через запятую (,), поэтому приведенная ниже пара строк кода может обрабатывать все возможные сценарии фильтрации.
String filterJson="{n"
" status: "A",n"
" $or: [ { qty: { $lt: 30 } }, { item: /^p/ } ]n"
"}";
Bson dc =BsonDocument.parse(filterJson);
PCollection<Document> mongoVal = p.apply(MongoDbIO.read()
.withUri("mongodb://localhost:27017")
.withDatabase("test")
.withCollection("inventory_collection")
.withQueryFn(FindQuery.create().withFilters(dc)));