Потоковая запись с опцией добавления и оконной функцией

#pyspark #spark-streaming

#pyspark #искровая потоковая передача

Вопрос:

Я пытаюсь записать поток, используя опцию добавления, но я получаю сообщение об ошибке.

Код:

 from pyspark.sql import SparkSession
from pyspark.sql.functions import window
from pyspark.sql.functions import col, column, count, when

spark = SparkSession
        .builder
        .appName("get_sensor_data")
        .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

Sensor = lines.select(lines.value.alias('Sensor'),
        lines.timestamp)

windowedCounts = Sensor.withWatermark('timestamp', '10 seconds').groupBy(
        window(Sensor.timestamp, windowDuration, slideDuration)).
        agg(count(when(col('Sensor')=="LR1 On",True)).alias('LR1'),
        count(when(col('Sensor')=="LR2 On",True)).alias('LR2'),
        count(when(col('Sensor')=="LD On",True)).alias('LD')).
        orderBy('window')

query = windowedCounts
        .writeStream
        .outputMode('append')
        .format("console")
        .start()
  

Ошибка:

 Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark
  

Причина использования опции добавления заключается в последующем сохранении в виде CSV-файла.
Я думаю, что эта проблема вызвана функцией окна, но я не знаю, как ее решить.

Комментарии:

1. Привет, Шин, добро пожаловать в SO. Не могли бы вы, пожалуйста, задать актуальный вопрос, чтобы мы могли помочь вам прийти к ответу?

2. Спасибо за вашу команду. Прошу прощения за мой неясный вопрос. Я пока не очень хорошо пишу по-английски. Но чтение идет хорошо. Я был бы очень признателен, если бы вы могли помочь мне с моей проблемой. Спасибо