# #python #google-cloud-dataflow #apache-beam
Вопрос:
Интересно, можно ли объединить пакетные данные с потоковыми данными в apache-beam, как показано ниже:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.external.kafka import ReadFromKafka
def run():
with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner"])) as p:
batch_data = (
p
| 'ReadMyFile' >> beam.io.ReadFromText("s3://my_batch_data.txt")
| beam.Map(batch_processing_func)
)
streaming_data = (
p
| 'Read data' >> ReadFromKafka(
consumer_config={"bootstrap.servers": "localhost:9092"},
topics=["my-first-topic2"],
)
| beam.Map(streaming_processing_func)
)
joined_data = ({'batch_data': batch_data, 'streaming_data': streaming_data} | beam.CoGroupByKey())
if __name__ == "__main__":
run()
Причина, по которой мне это интересно, заключается в том, что, похоже, Google Dataflow поддерживает только один из них.
Ответ №1:
Это хороший вопрос. Ответ таков: да, вы можете объединить пакетные данные с потоковой передачей.
Для вашего конкретного конвейера, вероятно, самый простой способ-определить побочный ввод для ваших пакетных данных и использовать его для обогащения вашего потока:
def run():
with beam.Pipeline(options=PipelineOptions(["--runner=DirectRunner"])) as p:
batch_data_si = beam.pvalue.AsList(
p
| 'ReadMyFile' >> beam.io.ReadFromText("s3://my_batch_data.txt")
| beam.Map(batch_processing_func)
)
streaming_data = (
p
| 'Read data' >> ReadFromKafka(
consumer_config={"bootstrap.servers": "localhost:9092"},
topics=["my-first-topic2"],
)
| beam.Map(streaming_processing_func)
)
joined_data = (streaming_data
| beam.Map(enrich_stream, batch_data_si))
if __name__ == "__main__":
run()
Где ваша enrich_stream
функция выглядит примерно так:
def enrich_stream(element, batch_side_input):
element = dict(element) # make a copy of the first element
element['special_element'] = batch_side_input[elm['index']] # or something like that : )
return element
Комментарии:
1. Рад получить совет от выпускников СНУ! Я попробую с вашим кодом. Кстати
CoGroupByKey
, не сработает в этом случае?2. : ) — это верно. CoGBK не будет хорошо работать в этом случае — он лучше всего подходит для пакетных конвейеров. Боковые входы или dofn с отслеживанием состояния лучше всего работают для потоковых соединений
3. К сожалению, я столкнулся с ошибкой: issues.apache.org/jira/browse/BEAM-12586 🙁 Похоже, Кеннет отсылает вас к комментарию? лол
4. Поскольку DirectRunner не работал с потоковыми данными, я вместо этого использовал поток данных ,и это сработало. Кстати, есть ли способ передать
Dataframe
(сгенерированныйto_dataframe()
) в качестве побочного ввода? Причина в том, что я должен объединить данные поvalue
, но если я передам пакетные данные в виде списка, для каждой потоковой передачи потребуется O(n).. Поэтому я попытаюсь использовать операцию «панды».