Как создать подключения к источнику данных в Spark Streaming для поиска

#apache-spark #redis #spark-streaming #spark-streaming-kafka

#apache-spark #redis #spark-streaming #spark-streaming-кафка

Вопрос:

У меня есть вариант использования, когда мы транслируем события, и для каждого события я должен выполнить несколько поисковых запросов. Поисковые запросы находятся в Redis, и мне интересно, каков наилучший способ создания подключений. Потоковая передача spark будет запускать 40 исполнителей, и у меня есть 5 таких потоковых заданий, все из которых подключаются к одному и тому же кластеру Redis. Итак, я в замешательстве, какой подход я должен использовать для создания соединения Redis

  1. Создайте объект connection в драйвере и передайте его исполнителям (не уверен, действительно ли это работает, поскольку я должен сделать этот объект сериализуемым). Могу ли я сделать это с широковещательными переменными?

  2. Создайте соединение Redis для каждого раздела, однако у меня есть код, написанный таким образом

    val update = xyz.transform(rdd => {
    // on driver
    if (xyz.isNewDay) {
    .....
    }
    rdd
    })
    update.foreachRDD(rdd => {
    rdd.foreachPartition(partition => {
    partition.foreach(Key_trans => {
    // perform some lookups logic here
    }
    }
    })

Итак, теперь, если я создам соединение внутри каждого раздела, это будет означать, что для каждого RDD и для каждого раздела в этом RDD я буду создавать новое соединение.

Есть ли способ, которым я могу поддерживать одно соединение для каждого раздела и кэшировать этот объект, чтобы мне не приходилось создавать соединения снова и снова?

Я могу добавить больше контекста / информации, если требуется.

Ответ №1:

1. Создайте объект connection в драйвере и передайте его исполнителям (не уверен, действительно ли это работает, поскольку я должен сделать этот объект сериализуемым). Могу ли я сделать это с широковещательными переменными?

Ответ — Нет. Большинство объектов подключения не сериализуемы из-за машинно-зависимых данных, связанных с подключением.

2. Есть ли способ, которым я могу поддерживать одно соединение для каждого раздела и кэшировать этот объект, чтобы мне не приходилось создавать соединения снова и снова?

Ответ- Да, создайте пул подключений и используйте его в разделе. вот стиль. Вы можете создать пул подключений следующим образом https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala

а затем использовать его

 dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}
  

Пожалуйста, проверьте это:
шаблон разработки для использования foreachRDD