#java #apache-spark #lazy-loading #spark-streaming #amazon-kinesis
#java #apache-spark #отложенная загрузка #искровая потоковая передача #amazon-kinesis
Вопрос:
Возможно ли обработать или запустить метод / действие только один раз для каждого пакета записей в Spark Streaming?
Мой способ использования — вызывать loadConfigurations() один раз для каждого пакета DStream, даже если имеется от 1 до n записей. Загруженная конфигурация должна быть доступна в драйвере для дальнейшей обработки.
Пример:
пакет-1: 0 записей в потоке kinesis — нет триггера loadConfiguration()
пакетная запись 2: 1 в потоке kinesis — loadConfiguration() вызывается один раз, и переменные обновляются на уровне драйвера
пакет-3: 100 записей в потоке kinesis — loadConfiguration() вызывается один раз, и переменные обновляются на уровне драйвера
Заранее спасибо.
Комментарии:
1. есть ли конкретная причина не использовать структурированную потоковую передачу? Также почему бы не загрузить конфигурацию перед загрузкой в драйвер, а затем транслировать с использованием широковещательной переменной? продолжает ли конфигурация меняться для каждого пакета?
2. вариант использования имеет сложные вычисления, а не обычный ETL. В настоящее время конфигурация загружается на уровне драйвера и транслируется для onetiime. Но необходимо динамически изменять его при каждом обновлении конфигурации. Примечание: Обработка записей имеет другой входной поток, и событие запуска конфигурации будет прослушиваться в другом потоке. итак, для потока ввода конфигурации, даже если в данном пакете 100 событий, я хочу обновить конфигурацию только один раз. (надеюсь, причина этого обновления один раз за пакет понятна).
Ответ №1:
Не совсем уверен, понял ли я точное требование. Однако, основываясь на описании вопроса и вашем объяснении в комментариях, это то, что может сработать:
dstream.foreachRDD { rdd =>
val config = loadConfiguration() // executed at the driver
rdd.foreach { record =>
// do stuff here. e.g. config.get(). This code is executed at the worker.
}
}
Здесь важно отметить, что Config
класс должен быть сериализуемым, поскольку он будет отправлен workers из драйвера.
Также обратите внимание, что это может быть антишаблоном в зависимости от вашего варианта использования. например, для каждого пакета объект конфигурации будет сериализован и отправлен рабочим, что увеличит нагрузку на сеть в зависимости от размера объекта конфигурации.
Я бы настоятельно рекомендовал проверить рекомендуемые шаблоны проектирования для forEachRDD
construct и разумно выбрать свой подход. Вот ссылка на то же самое: https://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
Комментарии:
1. Мне не нужно, чтобы loadConfiguration() выполнялась при каждой обработке dstream. это должно происходить по требованию при прослушивании событий из другого потока. Требование:
input stream => process records
;amp;;config stream => load configuration once per batch if any records (events come to this stream represents a configurations refresh needed).
Однако, если в каждый пакет в этот поток конфигурации поступило более 1 записи, выполнение dstream.foreachRDD или foreach records приводит к многократной перезагрузке конфигурации, что не требуется. Поэтому мне нужно обновлять конфигурацию только один раз для каждого пакета записей потока конфигурации.