Можно ли объединить пакетные данные с потоковыми данными в Apache beam?

# #python #google-cloud-dataflow #apache-beam

Вопрос:

Интересно, можно ли объединить пакетные данные с потоковыми данными в apache-beam, как показано ниже:

 import apache_beam as beam

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.external.kafka import ReadFromKafka


def run():
    with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner"])) as p:
        batch_data = (
            p
            | 'ReadMyFile' >> beam.io.ReadFromText("s3://my_batch_data.txt")
            | beam.Map(batch_processing_func)
        )
        streaming_data = (
            p
            | 'Read data' >> ReadFromKafka(
                consumer_config={"bootstrap.servers": "localhost:9092"},
                topics=["my-first-topic2"],
            )
            | beam.Map(streaming_processing_func)
        )
        joined_data = ({'batch_data': batch_data, 'streaming_data': streaming_data} | beam.CoGroupByKey())


if __name__ == "__main__":
    run()


 

Причина, по которой мне это интересно, заключается в том, что, похоже, Google Dataflow поддерживает только один из них.

Ответ №1:

Это хороший вопрос. Ответ таков: да, вы можете объединить пакетные данные с потоковой передачей.

Для вашего конкретного конвейера, вероятно, самый простой способ-определить побочный ввод для ваших пакетных данных и использовать его для обогащения вашего потока:

 def run():
    with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner"])) as p:
        batch_data_si = beam.pvalue.AsList(
            p
            | 'ReadMyFile' >> beam.io.ReadFromText("s3://my_batch_data.txt")
            | beam.Map(batch_processing_func)
        )
        streaming_data = (
            p
            | 'Read data' >> ReadFromKafka(
                consumer_config={"bootstrap.servers": "localhost:9092"},
                topics=["my-first-topic2"],
            )
            | beam.Map(streaming_processing_func)
        )
        joined_data = (streaming_data 
                       | beam.Map(enrich_stream, batch_data_si))


if __name__ == "__main__":
    run()
 

Где ваша enrich_stream функция выглядит примерно так:

 def enrich_stream(element, batch_side_input):
  element = dict(element)  # make a copy of the first element
  element['special_element'] = batch_side_input[elm['index']]  # or something like that : )
  return element
 

Комментарии:

1. Рад получить совет от выпускников СНУ! Я попробую с вашим кодом. Кстати CoGroupByKey , не сработает в этом случае?

2. : ) — это верно. CoGBK не будет хорошо работать в этом случае — он лучше всего подходит для пакетных конвейеров. Боковые входы или dofn с отслеживанием состояния лучше всего работают для потоковых соединений

3. К сожалению, я столкнулся с ошибкой: issues.apache.org/jira/browse/BEAM-12586 🙁 Похоже, Кеннет отсылает вас к комментарию? лол

4. Поскольку DirectRunner не работал с потоковыми данными, я вместо этого использовал поток данных ,и это сработало. Кстати, есть ли способ передать Dataframe (сгенерированный to_dataframe() ) в качестве побочного ввода? Причина в том, что я должен объединить данные по value , но если я передам пакетные данные в виде списка, для каждой потоковой передачи потребуется O(n).. Поэтому я попытаюсь использовать операцию «панды».