Потоковая передача искр — Kinesis — Java

#java #apache-spark #lazy-loading #spark-streaming #amazon-kinesis

#java #apache-spark #отложенная загрузка #искровая потоковая передача #amazon-kinesis

Вопрос:

Возможно ли обработать или запустить метод / действие только один раз для каждого пакета записей в Spark Streaming?

Мой способ использования — вызывать loadConfigurations() один раз для каждого пакета DStream, даже если имеется от 1 до n записей. Загруженная конфигурация должна быть доступна в драйвере для дальнейшей обработки.

Пример:

пакет-1: 0 записей в потоке kinesis — нет триггера loadConfiguration()

пакетная запись 2: 1 в потоке kinesis — loadConfiguration() вызывается один раз, и переменные обновляются на уровне драйвера

пакет-3: 100 записей в потоке kinesis — loadConfiguration() вызывается один раз, и переменные обновляются на уровне драйвера

Заранее спасибо.

Комментарии:

1. есть ли конкретная причина не использовать структурированную потоковую передачу? Также почему бы не загрузить конфигурацию перед загрузкой в драйвер, а затем транслировать с использованием широковещательной переменной? продолжает ли конфигурация меняться для каждого пакета?

2. вариант использования имеет сложные вычисления, а не обычный ETL. В настоящее время конфигурация загружается на уровне драйвера и транслируется для onetiime. Но необходимо динамически изменять его при каждом обновлении конфигурации. Примечание: Обработка записей имеет другой входной поток, и событие запуска конфигурации будет прослушиваться в другом потоке. итак, для потока ввода конфигурации, даже если в данном пакете 100 событий, я хочу обновить конфигурацию только один раз. (надеюсь, причина этого обновления один раз за пакет понятна).

Ответ №1:

Не совсем уверен, понял ли я точное требование. Однако, основываясь на описании вопроса и вашем объяснении в комментариях, это то, что может сработать:

 dstream.foreachRDD { rdd =>
  val config = loadConfiguration() //  executed at the driver
  rdd.foreach { record =>
   // do stuff here. e.g. config.get(). This code is executed at the worker.
  }
}
  

Здесь важно отметить, что Config класс должен быть сериализуемым, поскольку он будет отправлен workers из драйвера.

Также обратите внимание, что это может быть антишаблоном в зависимости от вашего варианта использования. например, для каждого пакета объект конфигурации будет сериализован и отправлен рабочим, что увеличит нагрузку на сеть в зависимости от размера объекта конфигурации.

Я бы настоятельно рекомендовал проверить рекомендуемые шаблоны проектирования для forEachRDD construct и разумно выбрать свой подход. Вот ссылка на то же самое: https://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

Комментарии:

1. Мне не нужно, чтобы loadConfiguration() выполнялась при каждой обработке dstream. это должно происходить по требованию при прослушивании событий из другого потока. Требование: input stream => process records ;amp;; config stream => load configuration once per batch if any records (events come to this stream represents a configurations refresh needed). Однако, если в каждый пакет в этот поток конфигурации поступило более 1 записи, выполнение dstream.foreachRDD или foreach records приводит к многократной перезагрузке конфигурации, что не требуется. Поэтому мне нужно обновлять конфигурацию только один раз для каждого пакета записей потока конфигурации.