Последовательно выполнять задачи записи и чтения базы данных в Apache Beam

#python #google-cloud-dataflow #apache-beam

#python #google-облако-поток данных #apache-beam

Вопрос:

Я использую beam-nuggets библиотеку для чтения и записи данных в базу данных Postgres из конвейера Apache Beam. Я хочу последовательно выполнить следующие две задачи:

  1. Вставьте новую строку в processing_info таблицу.
  2. Запрос первичного ключа вновь созданной processing_info записи таблицы. Затем значение запроса первичного ключа будет передано другому DoFn в качестве дополнительного ввода (где оно используется для заполнения столбца внешнего ключа связанной таблицы).

В настоящее время я создаю processing_info запись перед выполнением конвейера Beam, но я хотел бы создать новую запись как часть выполнения конвейера (это упрощает работу при запуске конвейера в потоке данных Google). В идеале код должен выглядеть примерно так:

 with beam.Pipeline(options=pipeline_options) as p:
    # Executes first
    proc_id_result = (p | 'Create Proc Info Record' >> beam.Create([{'pipeline_name': 'cleansed_data_pipeline'}])
                          | 'Make Processing Id' >> relational_db.Write(
                                source_config=source_config,
                                table_config=proc_table_config))
    # Executes second
    proc_id_record = p | relational_db.ReadFromDB(
            source_config=source_config,
            table_name='processing_info',
            query='SELECT pi.id FROM processing_info pi WHERE processing_date_time = '
                  '   (SELECT MAX(pi1.processing_date_time) from processing_info pi1 '
                  f'      where pi1.pipeline_name = 'cleansed_data_pipeline')'
        )
    ...
    # This code executes later, and is automatically deferred until the side input is available
    | 'Add 'processing_info_id'' >>
                (beam.ParDo(AddKeyValuePairToDict(), 'processing_info_id', AsSingleton(proc_id_record)))
    ...

  

Я мог бы что-то взломать (например, неиспользуемый боковой ввод), чтобы отложить запрос до завершения операции вставки, но я хотел знать, есть ли более идиоматический подход (я новичок в Beam).

Спасибо.

Ответ №1:

Вы попали в правильную идею: вы можете сделать это с помощью неиспользуемого побочного ввода. Вы бы сделали что-то вроде этого (которое используется в самом Beam для ReadFromBigQuery

 class PassThrough(beam.DoFn):
  def process(self, element):
    yield element

output = input | beam.ParDo(PassThrough()).with_outputs(
    'cleanup_signal', main='main')
main_output = output['main']
cleanup_signal = output['cleanup_signal']

single_element = (
    input.pipeline
    | beam.Create([None])
    | beam.Map(lambda x, nothing: x, beam.pvalue.AsSingleton(cleanup_signal)))

single_element | relational_db.ReadFromDB(...)

  

Теперь проблема заключается в том, чтобы заставить это работать с вашим ReadFromDB преобразованием, которое, я полагаю, не принимает такие входные данные. Есть ли способ сделать это для этого преобразования?

Комментарии:

1. Спасибо за отзыв, Пабло. Я могу расширить преобразование ReadFromDB и использовать дополнительный ввод в методе init подкласса. Не уверен, что есть лучший способ, но он работает.