#apache-spark #apache-spark-sql #spark-structured-streaming
Вопрос:
Я пытаюсь рассчитать скользящую среднюю в структурированном потоковом потоке spark с точки зрения строк, предшествующих, а не основанных на событиях времени.
У Кафки есть строковые сообщения, подобные этому: device1@227.92@2021-08-19T12:15:13.540Z
и есть такой код
Dataset<Row> lines = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "users")
.load()
.selectExpr("CAST(value AS STRING)")
.map((MapFunction<Row, Row>) row -> {
String message = row.getAs("value");
String[] newRow = message.split("@");
return RowFactory.create(newRow);
}, RowEncoder.apply(structType))
.selectExpr("CAST(item AS STRING)", "CAST(value AS DOUBLE)", "CAST(timestamp AS TIMESTAMP)");
Приведенный выше код считывает поток из кафки и преобразует строковые сообщения в строки.
Когда я пытаюсь сделать что-то подобное:
WindowSpec threeRowWindow = Window.partitionBy("item").orderBy("timestamp").rowsBetween(Window.currentRow(), -3);
Dataset<Row> testWindow =
lines.withColumn("avg", functions.avg("value").over(threeRowWindow));
Я получаю эту ошибку:
org.apache.spark.sql.AnalysisException Исключение: Окна, не основанные на времени, не поддерживаются в потоковых кадрах данных/наборах данных;
Есть ли какой — либо другой способ рассчитать скользящую среднюю по мере поступления каждого сообщения и обновлять ее по мере поступления новых данных из потока? Или любая операция, не основанная на времени, по умолчанию не поддерживается для запуска структурированной потоковой передачи?
Спасибо
Комментарии:
1. Ошибка гласит, что нет. Это ограничение по техническим причинам.
2. Я думаю (и не тестировал), что вам придется написать потоковый запрос с
flatMapGroupsWithState
отслеживанием состояния .