#pyspark #spark-streaming
#pyspark #искровая потоковая передача
Вопрос:
Я пытаюсь записать поток, используя опцию добавления, но я получаю сообщение об ошибке.
Код:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window
from pyspark.sql.functions import col, column, count, when
spark = SparkSession
.builder
.appName("get_sensor_data")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
Sensor = lines.select(lines.value.alias('Sensor'),
lines.timestamp)
windowedCounts = Sensor.withWatermark('timestamp', '10 seconds').groupBy(
window(Sensor.timestamp, windowDuration, slideDuration)).
agg(count(when(col('Sensor')=="LR1 On",True)).alias('LR1'),
count(when(col('Sensor')=="LR2 On",True)).alias('LR2'),
count(when(col('Sensor')=="LD On",True)).alias('LD')).
orderBy('window')
query = windowedCounts
.writeStream
.outputMode('append')
.format("console")
.start()
Ошибка:
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark
Причина использования опции добавления заключается в последующем сохранении в виде CSV-файла.
Я думаю, что эта проблема вызвана функцией окна, но я не знаю, как ее решить.
Комментарии:
1. Привет, Шин, добро пожаловать в SO. Не могли бы вы, пожалуйста, задать актуальный вопрос, чтобы мы могли помочь вам прийти к ответу?
2. Спасибо за вашу команду. Прошу прощения за мой неясный вопрос. Я пока не очень хорошо пишу по-английски. Но чтение идет хорошо. Я был бы очень признателен, если бы вы могли помочь мне с моей проблемой. Спасибо