Фильтровать поток kafka по темам

#apache-kafka #spark-streaming

#apache-kafka #spark-streaming

Вопрос:

Фильтровать Kafka steam из нескольких тем по темам.

 kafka_stream = KafkaUtils.createStream(ssc, zookeeper_server, groupId='group-0', topics={'topic1': 1,'topic2': 1}, valueDecoder=lambda v: json.loads(v))
  

Мне нравится фильтровать поток по topic1 и topic2, скажем, kafka_stream_topic1 и kafka_steeam_topic2, а затем обрабатывать его отдельно.

  kafka_stream_topic1.foreachRDD(lambda rdd: rdd.foreach(process_func_for_topic1))
 kafka_stream_topic2.foreachRDD(lambda rdd: rdd.foreach(process_func_for_topic2))
  

Фильтровать поток по каждой теме, а затем обрабатывать отдельно.

Комментарии:

1. Я думаю, вы должны задать вопрос здесь.

2. @MukeshPrajapati Возникла некоторая путаница с новым шаблоном, мне нравится добиваться фильтрации по темам.