#apache-spark #redis #spark-streaming #spark-streaming-kafka
#apache-spark #redis #spark-streaming #spark-streaming-кафка
Вопрос:
У меня есть вариант использования, когда мы транслируем события, и для каждого события я должен выполнить несколько поисковых запросов. Поисковые запросы находятся в Redis, и мне интересно, каков наилучший способ создания подключений. Потоковая передача spark будет запускать 40 исполнителей, и у меня есть 5 таких потоковых заданий, все из которых подключаются к одному и тому же кластеру Redis. Итак, я в замешательстве, какой подход я должен использовать для создания соединения Redis
-
Создайте объект connection в драйвере и передайте его исполнителям (не уверен, действительно ли это работает, поскольку я должен сделать этот объект сериализуемым). Могу ли я сделать это с широковещательными переменными?
-
Создайте соединение Redis для каждого раздела, однако у меня есть код, написанный таким образом
val update = xyz.transform(rdd => {
// on driver
if (xyz.isNewDay) {
.....
}
rdd
})
update.foreachRDD(rdd => {
rdd.foreachPartition(partition => {
partition.foreach(Key_trans => {
// perform some lookups logic here
}
}
})
Итак, теперь, если я создам соединение внутри каждого раздела, это будет означать, что для каждого RDD и для каждого раздела в этом RDD я буду создавать новое соединение.
Есть ли способ, которым я могу поддерживать одно соединение для каждого раздела и кэшировать этот объект, чтобы мне не приходилось создавать соединения снова и снова?
Я могу добавить больше контекста / информации, если требуется.
Ответ №1:
1. Создайте объект connection в драйвере и передайте его исполнителям (не уверен, действительно ли это работает, поскольку я должен сделать этот объект сериализуемым). Могу ли я сделать это с широковещательными переменными?
Ответ — Нет. Большинство объектов подключения не сериализуемы из-за машинно-зависимых данных, связанных с подключением.
2. Есть ли способ, которым я могу поддерживать одно соединение для каждого раздела и кэшировать этот объект, чтобы мне не приходилось создавать соединения снова и снова?
Ответ- Да, создайте пул подключений и используйте его в разделе. вот стиль. Вы можете создать пул подключений следующим образом https://github.com/RedisLabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/ConnectionPool.scala
а затем использовать его
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
Пожалуйста, проверьте это:
шаблон разработки для использования foreachRDD