Использование подключения к БД внутри foreachRDD в потоковой передаче apache spark

#java #serialization #apache-spark #spark-streaming

#java #сериализация #apache-spark #потоковая передача

Вопрос:

В потоковой передаче spark я хочу запрашивать базу данных перед обработкой каждого пакета, сохранять результаты в хэш-карте, которую можно сериализовать и отправить по сети исполнителям.

 class ExecutingClass implements Serializable {
 init(DB db) {

   try(JavaStreamingContext jsc = new JavaStreamingContext(...)) {

   JavaPairInputDStream<String,String> kafkaStream = getKafkaStream(jsc);

   kafkaStream.foreachRDD(rdd -> {
   // this part is supposed to execute in the driver
  Map<String, String> indexMap = db.getIndexMap();// connects to a db, queries the results as a map

  JavaRDD<String> results = processRDD(rdd, indexMap);

  ...  

 }


  }
    JavaRDD<String> processRDD(JavaPairRDD<String, String> rdd,       Map<String,String> indexMap) {
 ... 
    }
    }
  

В приведенном выше коде предполагается, что карта индексов должна быть инициализирована в драйвере, полученная карта используется при обработке rdd. У меня нет проблем, когда я объявляю indexMap вне закрытия foreachRDD, но я получаю ошибки сериализации, когда я делаю это внутри. в чем причина этого?

Причина, по которой я хочу сделать что-то подобное, заключается в том, чтобы убедиться, что у меня есть последние значения из базы данных для каждого пакета. Я подозреваю, что это связано с закрытием foreachRDD, пытающегося сериализовать все, что находится за пределами закрытия.

Комментарии:

1. почему для этой цели нельзя использовать накопитель (чтение-запись) / широковещательную передачу (только для чтения)? В этом случае, поскольку это накопитель для чтения и записи, имеет смысл, не так ли?

2. Код внутри замыкания будет сериализован и отправлен исполнителям. Поэтому я бы предположил db.getIndexMap() , что он не сериализуем для этой цели.

3. @LiMuBei Вот в чем загвоздка. Для каждого пакета данных мы сначала запрашиваем базу данных, чтобы получить карту индексов, а затем передаем только карту индексов для обработки.

Ответ №1:

Вы используете объект db (который является экземпляром DB) внутри foreachRDD, поэтому spark пытается сериализовать db, чтобы избежать этого, нам нужно создать соединение с БД внутри foreachRDD (или) вы можете использовать пулы объектов, как описано в статье ниже http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial /