ApacheBeam-Java: Фильтры MongoDB

#java #mongodb #apache-beam #dataflow

Вопрос:

Я новичок в потоке данных/Apchebeam. Я работаю над получением данных из MongoDB. Соединение MongoDB работает нормально, но я не могу применить фильтры. Выдает следующую ошибку. Я не уверен, что это правильный способ фильтрации данных. Любые предложения были бы полезны.

Ошибка

2021-06-07 10:37:34.615 Сообщение CESTError от рабочего: java.lang.ClassCastException: com.test.поток данных.дофнс.MongodDbQueryFn не может быть передан в org.apache.beam.sdk.io.mongodb.AggregationQuery org.apache.beam.sdk.io.mongodb.MongoDbIO$BoundedMongoDbSource.split(MongoDbIO.java:522)

Код: Разъем MongoDbIO:

 return  pipeline.apply(MongoDbIO.read()                        
.withUri("mongodb://".concat(databaseDetails.getDatabaseHostName()).concat(":").concat(databaseDetails.getPort()))
.withDatabase(databaseDetails.getDatabaseName())
.withCollection(objectDetails.getObjectName())
.withQueryFn(new MongodDbQueryFn("name","Mahesh")));
 

Запрос PTransform

 package com.test.dataflow.dofns;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.bson.Document;

import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;

public class MongodDbQueryFn implements SerializableFunction<MongoCollection<Document>, MongoCursor<Document>> {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    private String keyName;
    private String KeyValue;

    public MongodDbQueryFn(String keyName, String KeyValue) {

        this.keyName = keyName;
        this.KeyValue = KeyValue;

    }

    @Override
    public MongoCursor<Document> apply(MongoCollection<Document> input) {
        return input.find(com.mongodb.client.model.Filters.eq(keyName, KeyValue)).iterator();
    }
    
}
 

Комментарии:

1. Есть ли что-то еще в трассировке стека? Объясняет ли это, почему он не может привести MongoDbQueryFn к запросу агрегации? Потому что мне кажется, что это должно сработать. Может быть, это так же просто, как реализовать запрос агрегации вместо сериализуемой функции?

2. Спасибо за ответ. В журнале почти нет информации. У вас уже есть какие-либо примеры для AggregationQuery?

Ответ №1:

Документация для withQueryFn этого не очень хорошо объясняет это, но, прочитав код MongoDbIO, кажется, что MongoDbIO.Read предполагается, что QueryFn значение равно AggregationQuery или FindQuery . Ошибка , которую вы получаете, возникает из-за того, что код проверяет, является ли запрос a FindQuery , получает значение false, а затем предполагает, что это an AggregationQuery , и пытается его выполнить.

Лучшим решением для вас было бы использовать FindQuery то же поведение, что и то, которое вы написали, следующим образом:

 .withQueryFn(FindQuery.create().withFilters(Filters.eq("name", "Mahesh"))));
 

Комментарии:

1. В качестве примечания я отправил ошибку запуска в Apache Beam Jira, чтобы улучшить документацию.

2. Большое спасибо, это хорошо работает, я боролся с этой проблемой с прошлой недели. Я новичок в apache beam и действительно чувствую, что мне трудно понять и что использовать, когда и не получая больше примеров из Интернета, по крайней мере, для реализации Java. Любые предложения, обмен ссылками или примерами были бы полезны для быстрого получения знаний об этом.

Ответ №2:

Извините, что я отвечаю слишком поздно, но, если вы или кто-либо еще все еще ищете решение для обработки нескольких сценариев критериев фильтрации, в том числе с (И/ИЛИ/И т. Д.).

Способ фильтрации Filters.eq создаст дополнительные накладные расходы(во избежание рекурсивного построения нескольких случаев эквалайзера и эквалайзера и т. Д.).

Мое предложение было бы более в пользу обращения монго.Строка JSON для документирования Хорошая часть заключается в том, что документ mongo принимает json с разделителем через запятую (,), поэтому приведенная ниже пара строк кода может обрабатывать все возможные сценарии фильтрации.

     String filterJson="{n"  
            "     status: "A",n"  
            "     $or: [ { qty: { $lt: 30 } }, { item: /^p/ } ]n"  
            "}";

    Bson dc =BsonDocument.parse(filterJson);

    PCollection<Document> mongoVal = p.apply(MongoDbIO.read()
                    .withUri("mongodb://localhost:27017")
                    .withDatabase("test")
                    .withCollection("inventory_collection")
                    .withQueryFn(FindQuery.create().withFilters(dc)));