Структурированная потоковая передача Spark: как добиться одинакового порядка обработки темы Кафки и ее резервного копирования на S3

#apache-spark #amazon-s3 #apache-kafka #spark-structured-streaming

Вопрос:

Вопрос

Как достичь идемпотенции (одинакового порядка событий) между обработкой темы Кафки и резервным копированием it S3 с помощью структурированной потоковой передачи Spark?

См # QUESTION: .Комментарий к Примеру кода ниже.

Пример использования

Предположим, у вас есть точно такой же вариант использования, как обработка Uber — потока, которая используется как для анализа с низкой задержкой, так и для исторического анализа. Для примера из реальной жизни см. раздел «Мотивация» Разработки готовой к производству архитектуры Kappa для своевременной обработки потока данных:

Вы храните данные в Kafka, архивируете их в S3 с помощью приемника Kafka Connect AWS S3. А потоковая обработка выполняется с помощью структурированной потоковой передачи Spark. Вы решили решить проблему с помощью раздела «Комбинированный подход»: запустите код в режиме фоновой загрузки поверх S3 (разделы с резервным копированием Кафки).

Пример кода

 def some_processing(spark: SparkSession, stream: DataFrame, backfilling_mode: bool):
  input_stream = read(spark, 'input_topic', backfilling_mode)
  processed_stream = input_stream 
    .withWatermark('event_time', '10 seconds') 
    .groupBy(window('event_time', '10 seconds', '10 seconds'), 'user_id') 
    .agg(sum(col('price'))) 
    .join(...)
  write(processed_stream, 'output_topic', backfilling_mode)

def read(spark: SparkSession, topic_name: str, backfilling_mode: bool) -> DataFrame:
  """Reads data from Kafka topic or its S3 backup"""
  # QUESTION: how Spark achieve the same order on backfilling_mode=True and backfilling_mode=False
  if backfilling_mode:
    stream = spark 
      .readStream 
      .schema(input_schema)  
      .format('com.databricks.spark.avro') 
      .option('maxFilesPerTrigger', 20) 
      .load('/s3_base_bath/{}'.format(topic_name)) 
  else:
    stream = spark 
      .readStream 
      .format('kafka') 
      .option('kafka.bootstrap.servers', 'host1:port1,host2:port2') 
      .option('subscribe', topic_name) 
      .load() 


def write(stream: DataFrame, topic_name: str, backfilling_mode: bool):
  """Writes data to Kafka topic or its S3 backup"""
  if backfilling_mode:
    stream 
      .writeStream 
      .trigger(processingTime='60 seconds') 
      .format('com.databricks.spark.avro') 
      .option('path', '/s3_base_bath/{}'.format(topic_name)) 
      .start() 
      .awaitTermination()
  else:
    stream 
      .writeStream 
      .trigger(processingTime='10 seconds') 
      .option('kafka.bootstrap.servers', 'host1:port1,host2:port2') 
      .option('topic', 'output_topic')
      .format('kafka') 
      .start() 
      .awaitTermination()
 

Комментарии:

1. Данные в каждом файле Avro зависят от настроек разбиения на разделы в Kafka Connect… Используйте разделитель временных меток продолжительностью 10 или более секунд, после чего вы должны ожидать, что сможете суммировать каждый файл

2. @OneCricketeer спасибо за ответ! Предположим, я уже разделил тему Кафки на S3 на 1-минутное временное окно приема. Например, s3://base_path/input_topic/year=2021/month=03/day=10/w=1 или s3://base_path/input_topic/year=2021/month=03/day=10/w=1440 . Не уверен, нужен ли мне также раздел по ключу раздела темы, но это может создать проблему с небольшими файлами. В любом случае, основная часть сейчас заключается в том, как обрабатывать только одно окно за раз с помощью структурированной потоковой передачи Spark? Насколько я понимаю, maxFilesPerTrigger конфигурация вам мало чем поможет

3. @OneCricketeer если я правильно понял из статьи, основная причина засыпки в потоковом режиме Spark — это идемпотенция: пакетный режим Spark игнорирует водяные знаки и окна времени событий. Но, не отвечая на вопрос, как сделать раздел процесса в потоковом режиме Spark w=1 , прежде w=2 чем у него возникнет та же проблема с идемпотенцией, не так ли?

4. К сожалению, я не очень хорошо знаком со Спарком. Я просто комментировал, как вы структурировали файлы в корзине

5. Похоже, единственный способ добиться этого-написать пользовательский соединитель с API Spark SourceV2 (как это сделал Uber).