Вручную зафиксируйте смещение в каталоге контрольных точек в структурированной потоковой передаче spark

#apache-spark #apache-kafka #apache-spark-sql #spark-streaming

Вопрос:

Я пишу потоковое приложение spark, читающее Кафку. Мой кафка получает по меньшей мере миллион событий в день. Мне приходится выполнять много вычислений для каждого получаемого события, и мне нужно проверять каждое обрабатываемое событие. чтобы в случае каких-либо сбоев он не обрабатывал какое-либо событие снова, и я начинаю с моего последнего неудачного события

Кроме того, поскольку моя кафка уже имеет миллиарды значений, поэтому она не будет фиксироваться в контрольной точке до тех пор, пока не завершится первая партия, и начнется сначала, так как в каталоге контрольных точек нет фиксаций.

Нужен какой-то способ, чтобы я мог проверять каждое событие, которое я обрабатываю.

 df
    .writeStream
    .foreachBatch {
      (batchDF: Dataset[CoreDBVersion], batchId: Long) => {
        batchDF.collect.foreach {
          implicit value=> {
            //do all the processing and get required metadata
            }
//           //as I have processed a event here need to checkpoint this event with offset on checkpoint location
   
          }
        }
      }
    }
    .trigger(Trigger.ProcessingTime("1 second"))
    .option("checkpointLocation", "path to checkpoint dir")
    .start().awaitTermination()
 

Комментарии:

1. Вы думали о переходе от структурированной потоковой передачи к прямой потоковой передаче? Преимущество, которое я вижу, заключается в том, что в прямой потоковой передаче вы можете легко вручную фиксировать обработанные записи, когда захотите это сделать.