Scala Spark Mongo — фильтр с предложением «in»

#mongodb #scala #apache-spark

#mongodb #scala #apache-spark

Вопрос:

У меня есть коллекция MongoDB, которая потенциально довольно большая.

Я использую следующий соединитель для чтения данных из этой коллекции с помощью spark:

         <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.11</artifactId>
            <version>2.4.2</version>
        </dependency>
 

Я хотел бы отфильтровать коллекцию по закрытому списку (который также может быть довольно длинным) идентификаторов, который извлекается из другого источника данных.

Из документации я понимаю, что некоторая фильтрация может быть снижена, чтобы произойти на стороне mongo.

например

 rdd.filter(doc => doc.getInteger("test") > 5)
 

Я должен выяснить, есть ли способ выполнить что-то похожее на:

 val ids = spark.sql("select ids from some_non_mongo_table")
val mongoDocs = MongoSpark.load(spark.sparkContext, mongoConf)
                          .filter(doc => doc.id in ids)
 

Если это невозможно, есть ли какое-либо другое разумное решение, кроме извлечения всей коллекции из mongo и объединения результатов с фреймом данных ids?

Комментарии:

1. выполнение широковещательного внутреннего соединения с вашим фреймом данных ID — это отличное решение, чем использование подхода filter (…).

2. но в этом случае я извлекаю всю коллекцию из mongo нет?

3. Набор данных Spark будет загружать только необходимые данные из источника, и это называется выталкиванием предиката. Насколько я понимаю, MangoDB поддерживает нажатие на предикат, как описано в ссылке ниже. raphael-brugier.com/blog/introduction-mongodb-spark-connector /. … Фильтр предикатов произойдет независимо от того, выполняете ли вы filter или выполняете join. Широковещательное соединение было бы лучшим подходом, если ожидается, что ваши идентификаторы вырастут с нескольких до десяти тысяч.

4. Я проверяю этот подход, но, как я понимаю из сообщения в блоге, только предложение where и предложение select переносятся в mongo. «Выдвижение предикатов — это оптимизация с помощью оптимизатора Catalyst в Spark SQL для переноса фильтров where и проекций select в источник данных»

Ответ №1:

Можете ли вы использовать .filter(doc => ids.contain(doc.id)) ?

 val ids = spark.sql("select ids from some_non_mongo_table").collect.map(r => r(0))

val mongoDocs = MongoSpark.load(spark.sparkContext, mongoConf)
                          .filter(doc => ids.contains(doc.id))
 

Комментарии:

1. Спасибо за предложение, я изучал этот подход, но у меня возникли некоторые проблемы с преобразованием результатов в DataFrame. результат имеет тип RDD[Document], и хотя IDE (intelij) завершает метод toDF(), он не компилируется.

2. @OdedRosenberg не могли бы вы показать какие-либо ошибки при компиляции?

3. Ошибка:(41, 8) значение toDF не является членом org.apache.spark.rdd.RDD[org.bson. Document] возможная причина: возможно, перед «значением toDF» отсутствует точка с запятой? .toDF()

4. @OdedRosenberg не могли бы вы попробовать импортировать import spark.implicits._ и повторить toDF попытку ..?

5. Он импортируется в область кода, я думаю, это работает только для классов case

Ответ №2:

В конце я пошел с отправкой значений фильтрации в виде конвейера. Выглядело как предпочтительный способ, поскольку не удалось сбросить фильтры другими способами, а выборка всей коллекции слишком тяжелая.

     // ids is a comma delimited string 
    val pipeline = "{$match: {id : {$in:["   ids   "]}}}"
    val mongoConf = mongoReadConfig


    val existingSnapshots = MongoSpark
      .load(spark.sparkContext, mongoConf)
      .withPipeline(Seq(Document.parse(pipeline)))
      .toDF()