#apache-spark #pyspark #apache-spark-sql #spark-structured-streaming
#apache-spark #pyspark #apache-spark-sql #spark-structured-streaming
Вопрос:
Я выполняю агрегацию для потокового фрейма данных и пытаюсь записать результат в выходной каталог. Но я получаю исключение, говорящее
pyspark.sql.utils.AnalysisException: 'Data source json does not support Update output mode;
Я получаю аналогичную ошибку в режиме вывода «complete».
Это мой код:
grouped_df = logs_df.groupBy('host', 'timestamp').agg(count('host').alias('total_count'))
result_host = grouped_df.filter(col('total_count') > threshold)
writer_query = result_host.writeStream
.format("json")
.queryName("JSON Writer")
.outputMode("update")
.option("path", "output")
.option("checkpointLocation", "chk-point-dir")
.trigger(processingTime="1 minute")
.start()
writer_query.awaitTermination()
Ответ №1:
Ссылки на файлы поддерживают только режим «добавления» в соответствии с документацией по ссылкам вывода, см. «Поддерживаемые режимы вывода» в таблице ниже.