#java #serialization #apache-spark #spark-streaming
#java #сериализация #apache-spark #потоковая передача
Вопрос:
В потоковой передаче spark я хочу запрашивать базу данных перед обработкой каждого пакета, сохранять результаты в хэш-карте, которую можно сериализовать и отправить по сети исполнителям.
class ExecutingClass implements Serializable {
init(DB db) {
try(JavaStreamingContext jsc = new JavaStreamingContext(...)) {
JavaPairInputDStream<String,String> kafkaStream = getKafkaStream(jsc);
kafkaStream.foreachRDD(rdd -> {
// this part is supposed to execute in the driver
Map<String, String> indexMap = db.getIndexMap();// connects to a db, queries the results as a map
JavaRDD<String> results = processRDD(rdd, indexMap);
...
}
}
JavaRDD<String> processRDD(JavaPairRDD<String, String> rdd, Map<String,String> indexMap) {
...
}
}
В приведенном выше коде предполагается, что карта индексов должна быть инициализирована в драйвере, полученная карта используется при обработке rdd. У меня нет проблем, когда я объявляю indexMap вне закрытия foreachRDD, но я получаю ошибки сериализации, когда я делаю это внутри. в чем причина этого?
Причина, по которой я хочу сделать что-то подобное, заключается в том, чтобы убедиться, что у меня есть последние значения из базы данных для каждого пакета. Я подозреваю, что это связано с закрытием foreachRDD, пытающегося сериализовать все, что находится за пределами закрытия.
Комментарии:
1. почему для этой цели нельзя использовать накопитель (чтение-запись) / широковещательную передачу (только для чтения)? В этом случае, поскольку это накопитель для чтения и записи, имеет смысл, не так ли?
2. Код внутри замыкания будет сериализован и отправлен исполнителям. Поэтому я бы предположил
db.getIndexMap()
, что он не сериализуем для этой цели.3. @LiMuBei Вот в чем загвоздка. Для каждого пакета данных мы сначала запрашиваем базу данных, чтобы получить карту индексов, а затем передаем только карту индексов для обработки.
Ответ №1:
Вы используете объект db (который является экземпляром DB) внутри foreachRDD, поэтому spark пытается сериализовать db, чтобы избежать этого, нам нужно создать соединение с БД внутри foreachRDD (или) вы можете использовать пулы объектов, как описано в статье ниже http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial /