#python #apache-spark #apache-kafka #kafka-consumer-api
#python #apache-spark #apache-kafka #kafka-consumer-api
Вопрос:
Мы используем приложение pyspark для обработки некоторых данных из исходного раздела в Kafka и записи обработанных данных в отдельный раздел. У нас есть другое приложение на python, которое использует Kafka-python для использования обработанной темы. Все идет правильно при первом запуске.
Позже мы решаем, что хотим добавить еще один столбец в обработанную тему. Исходная тема уже содержит эту информацию, поэтому мы останавливаем старый поток Kafka / Spark, запускаем новый, который делает то же самое, за исключением того, что включает новый столбец. Новый поток запускается с startingOffsets
установкой на earliest
новый столбец, который выводится на консоль, поэтому я предполагаю, что столбец теперь включен в обработанный раздел.
Приложение python с потребителем останавливает старого потребителя и также запускает нового. Этот запускается с startingOffsets
установленным значением latest
. Проблема в том, что потребитель, похоже, не использует новые обработанные данные. Каким-то образом переработанные данные не запускают использование с приложением python. Я что-то здесь упускаю?
Кстати: при использовании startingOffsets
set to earliest
в приложении python с помощью consumer он начинает использовать все старые данные, но не вновь обработанные данные с новым столбцом.
Пример:
Первый запуск
Исходный раздел содержит следующее:
|column1|column2|column3|column4|column5|column6|
Это обрабатывается приложением pyspark для обработанной темы:
|column1|column2|column4|column6|
И python с потребительским приложением потребляет его. Даже если через час поступают новые данные, они обрабатываются.
После запуска обновления
Исходный раздел содержит следующее:
|column1|column2|column3|column4|column5|column6|
Это обрабатывается приложением pyspark для обработанной темы:
|column1|column2|column4|column5|column6|
Это не используется приложением python.
Обновить
Код из приложения pyspark:
self.data_frame = self.spark_session.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_HOST)
.option("subscribe", self.compute_config.topic_sources[0])
.option("startingOffsets", "earliest" if reset_offset is True else "latest")
.option("failOnDataLoss", False)
.load()
...
self.ds = self.data_frame.select("key", from_json(col("value").cast("string"), self.schema).alias("value"))
.withColumn("value", col("value").cast(self.rename_schema))
.withColumn("value", to_json("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_HOST)
.option("topic", self.compute_config.topic_target)
.option("checkpointLocation", f"/app/checkpoints/{self.compute_config.topic_target str(self.compute_config.id)}")
.start()
Код из приложения python с потребителем:
self.consumer = KafkaConsumer(self.sink_config.topic_target,
bootstrap_servers=[KAFKA_HOST],
auto_offset_reset="latest",
enable_auto_commit=False)
for message in self.consumer:
...
Комментарии:
1. Хм, да, я не понимаю, какое это имеет отношение к его работе?
2. Добавлен код, показывающий, что происходит