#python #mongodb #apache-spark #pyspark
#python #mongodb #apache-spark #pyspark
Вопрос:
Попытка установить определение конвейера перед использованием pyspark
filter_users="[{'$and': [{'user': {'$in': ['player','npc']}}]}]"
spark.read.format("com.mongodb.spark.sql.DefaultSource").
option("spark.mongodb.input.uri", 'data_users').
option('pipeline',filter_users).load()
Возвращает ошибку:
pyspark.sql.utils.IllegalArgumentException:requirement failed: Invalid Aggregation map Map(uri -> mongodb://localhost:27017/local.pii_val?readPreference=primaryPreferred, pipeline -> ["[{$and: [{'user': {'$in': ['player','npc']}}]}]"]):
Также пытался удалить then » между операторами и или в.
Спасибо.
Ответ №1:
Ваш запрос не является конвейером агрегации. смотрите документ здесь
Вы должны поместить свой запрос в $match
объект
filter_users="[{'$match: {'$and': [{'user': {'$in': ['player','npc']}}]}}]"