Как рассчитать скользящую среднюю в структурированной потоковой передаче spark?

#apache-spark #apache-spark-sql #spark-structured-streaming

Вопрос:

Я пытаюсь рассчитать скользящую среднюю в структурированном потоковом потоке spark с точки зрения строк, предшествующих, а не основанных на событиях времени.

У Кафки есть строковые сообщения, подобные этому: device1@227.92@2021-08-19T12:15:13.540Z

и есть такой код

 Dataset<Row> lines = sparkSession.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "users")
                .load()
                .selectExpr("CAST(value AS STRING)")
                .map((MapFunction<Row, Row>) row -> {
                    String message = row.getAs("value");
                    String[] newRow = message.split("@");
                    return RowFactory.create(newRow);
                    }, RowEncoder.apply(structType))
                .selectExpr("CAST(item AS STRING)", "CAST(value AS DOUBLE)", "CAST(timestamp AS TIMESTAMP)");
 

Приведенный выше код считывает поток из кафки и преобразует строковые сообщения в строки.

Когда я пытаюсь сделать что-то подобное:

 WindowSpec threeRowWindow = Window.partitionBy("item").orderBy("timestamp").rowsBetween(Window.currentRow(), -3);

Dataset<Row> testWindow = 
lines.withColumn("avg",  functions.avg("value").over(threeRowWindow));
 

Я получаю эту ошибку:
org.apache.spark.sql.AnalysisException Исключение: Окна, не основанные на времени, не поддерживаются в потоковых кадрах данных/наборах данных;

Есть ли какой — либо другой способ рассчитать скользящую среднюю по мере поступления каждого сообщения и обновлять ее по мере поступления новых данных из потока? Или любая операция, не основанная на времени, по умолчанию не поддерживается для запуска структурированной потоковой передачи?

Спасибо

Комментарии:

1. Ошибка гласит, что нет. Это ограничение по техническим причинам.

2. Я думаю (и не тестировал), что вам придется написать потоковый запрос с flatMapGroupsWithState отслеживанием состояния .