Не работает Spark Structured Streaming groupByKey в окне времени

#scala #apache-spark #apache-spark-sql #spark-structured-streaming

#scala #apache-spark #apache-spark-sql #spark-structured-streaming

Вопрос:

Мне нужно разбить мой поток Kafka на временные окна по 10 минут каждое, а затем запустить для него некоторую пакетную обработку.

Примечание: записи ниже имеют поле метки времени

    val records = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerPool)
      .option("subscribe", topic)
      .option("startingOffsets", kafkaOffset)
      .load()
  

Я добавляю временное окно к каждой записи, используя,

 .withColumn("window", window($"timing", windowDuration))
  

Я создал несколько вспомогательных классов, таких как

   case class TimingWindow(
      start: java.sql.Timestamp,
      end: java.sql.Timestamp
  )
  case class RecordWithWindow(
      record: MyRecord,
      groupingWindow: TimingWindow
  )
  

Теперь у меня есть DF типа [RecordWithWindow]

Все это работает очень хорошо.

Далее,

 metricsWithWindow
  .groupByKey(_.groupingWindow)
  //By grouping, I get several records per time window
  //resulting an object of the below type which I write out to HDFS

  case class WindowWithRecords(
      records: Seq[MyRecord],
      window: TimingWindow
  )
  

Где я исследую HDFS,

Пример:

Ожидаемо : каждый WindowWithRecords объект, имеющий уникальный TimingWindow

 WindowWithRecordsA(TimingWindowA, Seq(MyRecordA, MyRecordB, MyRecordC))
  

Актуально :
Более одного объекта WindowWithRecords с одинаковым временным окном

 WindowWithRecordsA(TimingWindowA, Seq(MyRecordA, MyRecordB))
WindowWithRecordsB(TimingWindowA, Seq(MyRecordC))
  

Похоже, логика groupByKey работает неправильно.

Я надеюсь, что мой вопрос ясен. Любые указания были бы полезны.

Комментарии:

1. Вы уверены, что 3 ожидаемых элемента прибудут в одной микропакете? Записи HDFS не обновляются, поэтому для каждого микропакета он записывает запись TimingWindowA со всеми значениями, которые поступили в этом выполнении, или ни одного, если у него их нет.

Ответ №1:

Обнаружена проблема:

Я не использовал явный триггер при обработке окна. В результате Spark создавал микропакеты так быстро, как только мог, в отличие от выполнения этого в конце окна.

 streamingQuery
.writeStream
.trigger(Trigger.ProcessingTime(windowDuration))
...
.start
  

Это было результатом того, что я неправильно понял документацию Spark.

Примечание: groupByKey использует хэш-код объекта. Важно убедиться, что хэш-код объекта является согласованным.

Комментарии:

1. Не уверен, что я понял, явный триггер?

2. В принципе, я добавил это .trigger (триггер. Время обработки (windowDuration)) Это заставляет обрабатывать все мое окно одновременно.