#apache-spark #apache-kafka #spark-structured-streaming #spark-kafka-integration
#apache-spark #apache-kafka #spark-structured-streaming #spark-kafka-интеграция
Вопрос:
Нам нужно использовать maxOffsetsPerTrigger
в исходном коде Kafka со Trigger.Once()
структурированной потоковой передачей, но, исходя из этой проблемы, кажется, что она читается allAvailable
в spark 3. Есть ли способ достичь предела скорости в этой ситуации?
Вот пример кода в spark 3:
def options: Map[String, String] = Map(
"kafka.bootstrap.servers" -> conf.getStringSeq("bootstrapServers").mkString(","),
"subscribe" -> conf.getString("topic")
)
Option(conf.getLong("maxOffsetsPerTrigger")).map("maxOffsetsPerTrigger" -> _.toString)
val streamingQuery = sparkSession.readStream.format("kafka").options(options)
.load
.writeStream
.trigger(Trigger.Once)
.start()
Ответ №1:
Другого способа правильно установить ограничение скорости нет. Если maxOffsetsPerTrigger
это не применимо для потоковых заданий с Once
триггером, вы можете сделать следующее, чтобы добиться идентичного результата:
- Выберите другой триггер и используйте
maxOffsetsPerTrigger
для ограничения скорости и завершения этого задания вручную после завершения обработки всех данных. - Используйте опции
startingOffsets
иendingOffsets
, делая задание пакетным заданием. Повторяйте, пока не обработаете все данные в теме. Однако есть причина, по которой «потоковая передача в режиме RunOnce лучше, чем пакетная», как подробно описано здесь .
Последним вариантом было бы изучить связанный запрос на извлечение и скомпилировать Spark самостоятельно.
Ответ №2:
Вот как мы «решили» это. По сути, это подход mike
, описанный в принятом ответе.
В нашем случае размер сообщения менялся очень мало, и поэтому мы знали, сколько времени занимает обработка пакета. Итак, в двух словах мы:
- изменен
Trigger.Once()
сTrigger.ProcessingTime(<ms>)
, такmaxOffsetsPerTrigger
как работает с этим режимом - уничтожил этот запущенный запрос, вызвав
awaitTermination(<ms>)
mimicTrigger.Once()
- установите интервал обработки большим, чем интервал завершения, чтобы для обработки подходил ровно один «пакет».
val kafkaOptions = Map[String, String](
"kafka.bootstrap.servers" -> "localhost:9092",
"failOnDataLoss" -> "false",
"subscribePattern" -> "testTopic",
"startingOffsets" -> "earliest",
"maxOffsetsPerTrigger" -> "10", // "batch" size
)
val streamWriterOptions = Map[String, String](
"checkpointLocation" -> "path/to/checkpoints",
)
val processingInterval = 30000L
val terminationInterval = 15000L
sparkSession
.readStream
.format("kafka")
.options(kafkaOptions)
.load()
.writeStream
.options(streamWriterOptions)
.format("Console")
.trigger(Trigger.ProcessingTime(processingInterval))
.start()
.awaitTermination(terminationInterval)
Это работает, потому что первый пакет будет считан и обработан с соблюдением maxOffsetsPerTrigger
ограничения. Скажем, через 10 секунд. Затем начинается обработка второго пакета, но он завершается в середине операции через ~ 5 секунд и никогда не достигает установленной отметки в 30 секунд. Но он правильно сохраняет смещения. подбирает и обрабатывает этот «убитый» пакет в следующем запуске.
Недостатком этого подхода является то, что вы должны примерно знать, сколько времени требуется для обработки одного «пакета» — если вы установите terminationInterval
слишком низкое значение, результат задания всегда будет нулевым.
Конечно, если вас не волнует точное количество пакетов, которые вы обрабатываете за один прогон, вы можете легко настроить processingInterval
их так, чтобы они были в разы меньше terminationInterval
. В этом случае вы можете обрабатывать разное количество пакетов за один раз, но при этом учитывать значение maxOffsetsPerTrigger
.