#apache-spark #amazon-s3 #apache-kafka #spark-structured-streaming
Вопрос:
Вопрос
Как достичь идемпотенции (одинакового порядка событий) между обработкой темы Кафки и резервным копированием it S3 с помощью структурированной потоковой передачи Spark?
См # QUESTION:
.Комментарий к Примеру кода ниже.
Пример использования
Предположим, у вас есть точно такой же вариант использования, как обработка Uber — потока, которая используется как для анализа с низкой задержкой, так и для исторического анализа. Для примера из реальной жизни см. раздел «Мотивация» Разработки готовой к производству архитектуры Kappa для своевременной обработки потока данных:
Вы храните данные в Kafka, архивируете их в S3 с помощью приемника Kafka Connect AWS S3. А потоковая обработка выполняется с помощью структурированной потоковой передачи Spark. Вы решили решить проблему с помощью раздела «Комбинированный подход»: запустите код в режиме фоновой загрузки поверх S3 (разделы с резервным копированием Кафки).
Пример кода
def some_processing(spark: SparkSession, stream: DataFrame, backfilling_mode: bool):
input_stream = read(spark, 'input_topic', backfilling_mode)
processed_stream = input_stream
.withWatermark('event_time', '10 seconds')
.groupBy(window('event_time', '10 seconds', '10 seconds'), 'user_id')
.agg(sum(col('price')))
.join(...)
write(processed_stream, 'output_topic', backfilling_mode)
def read(spark: SparkSession, topic_name: str, backfilling_mode: bool) -> DataFrame:
"""Reads data from Kafka topic or its S3 backup"""
# QUESTION: how Spark achieve the same order on backfilling_mode=True and backfilling_mode=False
if backfilling_mode:
stream = spark
.readStream
.schema(input_schema)
.format('com.databricks.spark.avro')
.option('maxFilesPerTrigger', 20)
.load('/s3_base_bath/{}'.format(topic_name))
else:
stream = spark
.readStream
.format('kafka')
.option('kafka.bootstrap.servers', 'host1:port1,host2:port2')
.option('subscribe', topic_name)
.load()
def write(stream: DataFrame, topic_name: str, backfilling_mode: bool):
"""Writes data to Kafka topic or its S3 backup"""
if backfilling_mode:
stream
.writeStream
.trigger(processingTime='60 seconds')
.format('com.databricks.spark.avro')
.option('path', '/s3_base_bath/{}'.format(topic_name))
.start()
.awaitTermination()
else:
stream
.writeStream
.trigger(processingTime='10 seconds')
.option('kafka.bootstrap.servers', 'host1:port1,host2:port2')
.option('topic', 'output_topic')
.format('kafka')
.start()
.awaitTermination()
Комментарии:
1. Данные в каждом файле Avro зависят от настроек разбиения на разделы в Kafka Connect… Используйте разделитель временных меток продолжительностью 10 или более секунд, после чего вы должны ожидать, что сможете суммировать каждый файл
2. @OneCricketeer спасибо за ответ! Предположим, я уже разделил тему Кафки на S3 на 1-минутное временное окно приема. Например,
s3://base_path/input_topic/year=2021/month=03/day=10/w=1
илиs3://base_path/input_topic/year=2021/month=03/day=10/w=1440
. Не уверен, нужен ли мне также раздел по ключу раздела темы, но это может создать проблему с небольшими файлами. В любом случае, основная часть сейчас заключается в том, как обрабатывать только одно окно за раз с помощью структурированной потоковой передачи Spark? Насколько я понимаю,maxFilesPerTrigger
конфигурация вам мало чем поможет3. @OneCricketeer если я правильно понял из статьи, основная причина засыпки в потоковом режиме Spark — это идемпотенция: пакетный режим Spark игнорирует водяные знаки и окна времени событий. Но, не отвечая на вопрос, как сделать раздел процесса в потоковом режиме Spark
w=1
, преждеw=2
чем у него возникнет та же проблема с идемпотенцией, не так ли?4. К сожалению, я не очень хорошо знаком со Спарком. Я просто комментировал, как вы структурировали файлы в корзине
5. Похоже, единственный способ добиться этого-написать пользовательский соединитель с API Spark SourceV2 (как это сделал Uber).