#apache-spark #apache-kafka #apache-spark-sql #spark-streaming
Вопрос:
Я пишу потоковое приложение spark, читающее Кафку. Мой кафка получает по меньшей мере миллион событий в день. Мне приходится выполнять много вычислений для каждого получаемого события, и мне нужно проверять каждое обрабатываемое событие. чтобы в случае каких-либо сбоев он не обрабатывал какое-либо событие снова, и я начинаю с моего последнего неудачного события
Кроме того, поскольку моя кафка уже имеет миллиарды значений, поэтому она не будет фиксироваться в контрольной точке до тех пор, пока не завершится первая партия, и начнется сначала, так как в каталоге контрольных точек нет фиксаций.
Нужен какой-то способ, чтобы я мог проверять каждое событие, которое я обрабатываю.
df
.writeStream
.foreachBatch {
(batchDF: Dataset[CoreDBVersion], batchId: Long) => {
batchDF.collect.foreach {
implicit value=> {
//do all the processing and get required metadata
}
// //as I have processed a event here need to checkpoint this event with offset on checkpoint location
}
}
}
}
.trigger(Trigger.ProcessingTime("1 second"))
.option("checkpointLocation", "path to checkpoint dir")
.start().awaitTermination()
Комментарии:
1. Вы думали о переходе от структурированной потоковой передачи к прямой потоковой передаче? Преимущество, которое я вижу, заключается в том, что в прямой потоковой передаче вы можете легко вручную фиксировать обработанные записи, когда захотите это сделать.