Потребитель Kafka не использует обработанные данные с помощью Spark

#python #apache-spark #apache-kafka #kafka-consumer-api

#python #apache-spark #apache-kafka #kafka-consumer-api

Вопрос:

Мы используем приложение pyspark для обработки некоторых данных из исходного раздела в Kafka и записи обработанных данных в отдельный раздел. У нас есть другое приложение на python, которое использует Kafka-python для использования обработанной темы. Все идет правильно при первом запуске.

Позже мы решаем, что хотим добавить еще один столбец в обработанную тему. Исходная тема уже содержит эту информацию, поэтому мы останавливаем старый поток Kafka / Spark, запускаем новый, который делает то же самое, за исключением того, что включает новый столбец. Новый поток запускается с startingOffsets установкой на earliest новый столбец, который выводится на консоль, поэтому я предполагаю, что столбец теперь включен в обработанный раздел.

Приложение python с потребителем останавливает старого потребителя и также запускает нового. Этот запускается с startingOffsets установленным значением latest . Проблема в том, что потребитель, похоже, не использует новые обработанные данные. Каким-то образом переработанные данные не запускают использование с приложением python. Я что-то здесь упускаю?

Кстати: при использовании startingOffsets set to earliest в приложении python с помощью consumer он начинает использовать все старые данные, но не вновь обработанные данные с новым столбцом.

Пример:

Первый запуск

Исходный раздел содержит следующее:

 |column1|column2|column3|column4|column5|column6|
 

Это обрабатывается приложением pyspark для обработанной темы:

 |column1|column2|column4|column6|
 

И python с потребительским приложением потребляет его. Даже если через час поступают новые данные, они обрабатываются.

После запуска обновления

Исходный раздел содержит следующее:

 |column1|column2|column3|column4|column5|column6|
 

Это обрабатывается приложением pyspark для обработанной темы:

 |column1|column2|column4|column5|column6|
 

Это не используется приложением python.

Обновить

Код из приложения pyspark:

 self.data_frame = self.spark_session.readStream 
            .format("kafka") 
            .option("kafka.bootstrap.servers", KAFKA_HOST) 
            .option("subscribe", self.compute_config.topic_sources[0]) 
            .option("startingOffsets", "earliest" if reset_offset is True else "latest") 
            .option("failOnDataLoss", False) 
            .load()

...

self.ds = self.data_frame.select("key", from_json(col("value").cast("string"), self.schema).alias("value")) 
            .withColumn("value", col("value").cast(self.rename_schema)) 
            .withColumn("value", to_json("value")) 
            .writeStream 
            .format("kafka") 
            .option("kafka.bootstrap.servers", KAFKA_HOST) 
            .option("topic", self.compute_config.topic_target) 
            .option("checkpointLocation", f"/app/checkpoints/{self.compute_config.topic_target str(self.compute_config.id)}") 
            .start()
 

Код из приложения python с потребителем:

  self.consumer = KafkaConsumer(self.sink_config.topic_target,
                                      bootstrap_servers=[KAFKA_HOST],
                                      auto_offset_reset="latest",
                                      enable_auto_commit=False)

for message in self.consumer:
    ...
 

Комментарии:

1. Хм, да, я не понимаю, какое это имеет отношение к его работе?

2. Добавлен код, показывающий, что происходит