#apache-kafka #spark-streaming
#apache-kafka #spark-streaming
Вопрос:
Фильтровать Kafka steam из нескольких тем по темам.
kafka_stream = KafkaUtils.createStream(ssc, zookeeper_server, groupId='group-0', topics={'topic1': 1,'topic2': 1}, valueDecoder=lambda v: json.loads(v))
Мне нравится фильтровать поток по topic1 и topic2, скажем, kafka_stream_topic1 и kafka_steeam_topic2, а затем обрабатывать его отдельно.
kafka_stream_topic1.foreachRDD(lambda rdd: rdd.foreach(process_func_for_topic1))
kafka_stream_topic2.foreachRDD(lambda rdd: rdd.foreach(process_func_for_topic2))
Фильтровать поток по каждой теме, а затем обрабатывать отдельно.
Комментарии:
1. Я думаю, вы должны задать вопрос здесь.
2. @MukeshPrajapati Возникла некоторая путаница с новым шаблоном, мне нравится добиваться фильтрации по темам.