#mongodb #scala #apache-spark
#mongodb #scala #apache-spark
Вопрос:
У меня есть коллекция MongoDB, которая потенциально довольно большая.
Я использую следующий соединитель для чтения данных из этой коллекции с помощью spark:
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>2.4.2</version>
</dependency>
Я хотел бы отфильтровать коллекцию по закрытому списку (который также может быть довольно длинным) идентификаторов, который извлекается из другого источника данных.
Из документации я понимаю, что некоторая фильтрация может быть снижена, чтобы произойти на стороне mongo.
например
rdd.filter(doc => doc.getInteger("test") > 5)
Я должен выяснить, есть ли способ выполнить что-то похожее на:
val ids = spark.sql("select ids from some_non_mongo_table")
val mongoDocs = MongoSpark.load(spark.sparkContext, mongoConf)
.filter(doc => doc.id in ids)
Если это невозможно, есть ли какое-либо другое разумное решение, кроме извлечения всей коллекции из mongo и объединения результатов с фреймом данных ids?
Комментарии:
1. выполнение широковещательного внутреннего соединения с вашим фреймом данных ID — это отличное решение, чем использование подхода filter (…).
2. но в этом случае я извлекаю всю коллекцию из mongo нет?
3. Набор данных Spark будет загружать только необходимые данные из источника, и это называется выталкиванием предиката. Насколько я понимаю, MangoDB поддерживает нажатие на предикат, как описано в ссылке ниже. raphael-brugier.com/blog/introduction-mongodb-spark-connector /. … Фильтр предикатов произойдет независимо от того, выполняете ли вы filter или выполняете join. Широковещательное соединение было бы лучшим подходом, если ожидается, что ваши идентификаторы вырастут с нескольких до десяти тысяч.
4. Я проверяю этот подход, но, как я понимаю из сообщения в блоге, только предложение where и предложение select переносятся в mongo. «Выдвижение предикатов — это оптимизация с помощью оптимизатора Catalyst в Spark SQL для переноса фильтров where и проекций select в источник данных»
Ответ №1:
Можете ли вы использовать .filter(doc => ids.contain(doc.id))
?
val ids = spark.sql("select ids from some_non_mongo_table").collect.map(r => r(0))
val mongoDocs = MongoSpark.load(spark.sparkContext, mongoConf)
.filter(doc => ids.contains(doc.id))
Комментарии:
1. Спасибо за предложение, я изучал этот подход, но у меня возникли некоторые проблемы с преобразованием результатов в DataFrame. результат имеет тип RDD[Document], и хотя IDE (intelij) завершает метод toDF(), он не компилируется.
2. @OdedRosenberg не могли бы вы показать какие-либо ошибки при компиляции?
3. Ошибка:(41, 8) значение toDF не является членом org.apache.spark.rdd.RDD[org.bson. Document] возможная причина: возможно, перед «значением toDF» отсутствует точка с запятой? .toDF()
4. @OdedRosenberg не могли бы вы попробовать импортировать
import spark.implicits._
и повторитьtoDF
попытку ..?5. Он импортируется в область кода, я думаю, это работает только для классов case
Ответ №2:
В конце я пошел с отправкой значений фильтрации в виде конвейера. Выглядело как предпочтительный способ, поскольку не удалось сбросить фильтры другими способами, а выборка всей коллекции слишком тяжелая.
// ids is a comma delimited string
val pipeline = "{$match: {id : {$in:[" ids "]}}}"
val mongoConf = mongoReadConfig
val existingSnapshots = MongoSpark
.load(spark.sparkContext, mongoConf)
.withPipeline(Seq(Document.parse(pipeline)))
.toDF()