#scala #apache-spark #apache-spark-sql #spark-structured-streaming
#scala #apache-spark #apache-spark-sql #spark-structured-streaming
Вопрос:
Мне нужно разбить мой поток Kafka на временные окна по 10 минут каждое, а затем запустить для него некоторую пакетную обработку.
Примечание: записи ниже имеют поле метки времени
val records = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerPool)
.option("subscribe", topic)
.option("startingOffsets", kafkaOffset)
.load()
Я добавляю временное окно к каждой записи, используя,
.withColumn("window", window($"timing", windowDuration))
Я создал несколько вспомогательных классов, таких как
case class TimingWindow(
start: java.sql.Timestamp,
end: java.sql.Timestamp
)
case class RecordWithWindow(
record: MyRecord,
groupingWindow: TimingWindow
)
Теперь у меня есть DF типа [RecordWithWindow]
Все это работает очень хорошо.
Далее,
metricsWithWindow
.groupByKey(_.groupingWindow)
//By grouping, I get several records per time window
//resulting an object of the below type which I write out to HDFS
case class WindowWithRecords(
records: Seq[MyRecord],
window: TimingWindow
)
Где я исследую HDFS,
Пример:
Ожидаемо : каждый WindowWithRecords объект, имеющий уникальный TimingWindow
WindowWithRecordsA(TimingWindowA, Seq(MyRecordA, MyRecordB, MyRecordC))
Актуально :
Более одного объекта WindowWithRecords с одинаковым временным окном
WindowWithRecordsA(TimingWindowA, Seq(MyRecordA, MyRecordB))
WindowWithRecordsB(TimingWindowA, Seq(MyRecordC))
Похоже, логика groupByKey работает неправильно.
Я надеюсь, что мой вопрос ясен. Любые указания были бы полезны.
Комментарии:
1. Вы уверены, что 3 ожидаемых элемента прибудут в одной микропакете? Записи HDFS не обновляются, поэтому для каждого микропакета он записывает запись TimingWindowA со всеми значениями, которые поступили в этом выполнении, или ни одного, если у него их нет.
Ответ №1:
Обнаружена проблема:
Я не использовал явный триггер при обработке окна. В результате Spark создавал микропакеты так быстро, как только мог, в отличие от выполнения этого в конце окна.
streamingQuery
.writeStream
.trigger(Trigger.ProcessingTime(windowDuration))
...
.start
Это было результатом того, что я неправильно понял документацию Spark.
Примечание: groupByKey использует хэш-код объекта. Важно убедиться, что хэш-код объекта является согласованным.
Комментарии:
1. Не уверен, что я понял, явный триггер?
2. В принципе, я добавил это .trigger (триггер. Время обработки (windowDuration)) Это заставляет обрабатывать все мое окно одновременно.