#python #google-cloud-dataflow #apache-beam
#python #google-облако-поток данных #apache-beam
Вопрос:
Я использую beam-nuggets
библиотеку для чтения и записи данных в базу данных Postgres из конвейера Apache Beam. Я хочу последовательно выполнить следующие две задачи:
- Вставьте новую строку в
processing_info
таблицу. - Запрос первичного ключа вновь созданной
processing_info
записи таблицы. Затем значение запроса первичного ключа будет передано другомуDoFn
в качестве дополнительного ввода (где оно используется для заполнения столбца внешнего ключа связанной таблицы).
В настоящее время я создаю processing_info
запись перед выполнением конвейера Beam, но я хотел бы создать новую запись как часть выполнения конвейера (это упрощает работу при запуске конвейера в потоке данных Google). В идеале код должен выглядеть примерно так:
with beam.Pipeline(options=pipeline_options) as p:
# Executes first
proc_id_result = (p | 'Create Proc Info Record' >> beam.Create([{'pipeline_name': 'cleansed_data_pipeline'}])
| 'Make Processing Id' >> relational_db.Write(
source_config=source_config,
table_config=proc_table_config))
# Executes second
proc_id_record = p | relational_db.ReadFromDB(
source_config=source_config,
table_name='processing_info',
query='SELECT pi.id FROM processing_info pi WHERE processing_date_time = '
' (SELECT MAX(pi1.processing_date_time) from processing_info pi1 '
f' where pi1.pipeline_name = 'cleansed_data_pipeline')'
)
...
# This code executes later, and is automatically deferred until the side input is available
| 'Add 'processing_info_id'' >>
(beam.ParDo(AddKeyValuePairToDict(), 'processing_info_id', AsSingleton(proc_id_record)))
...
Я мог бы что-то взломать (например, неиспользуемый боковой ввод), чтобы отложить запрос до завершения операции вставки, но я хотел знать, есть ли более идиоматический подход (я новичок в Beam).
Спасибо.
Ответ №1:
Вы попали в правильную идею: вы можете сделать это с помощью неиспользуемого побочного ввода. Вы бы сделали что-то вроде этого (которое используется в самом Beam для ReadFromBigQuery
class PassThrough(beam.DoFn):
def process(self, element):
yield element
output = input | beam.ParDo(PassThrough()).with_outputs(
'cleanup_signal', main='main')
main_output = output['main']
cleanup_signal = output['cleanup_signal']
single_element = (
input.pipeline
| beam.Create([None])
| beam.Map(lambda x, nothing: x, beam.pvalue.AsSingleton(cleanup_signal)))
single_element | relational_db.ReadFromDB(...)
Теперь проблема заключается в том, чтобы заставить это работать с вашим ReadFromDB
преобразованием, которое, я полагаю, не принимает такие входные данные. Есть ли способ сделать это для этого преобразования?
Комментарии:
1. Спасибо за отзыв, Пабло. Я могу расширить преобразование ReadFromDB и использовать дополнительный ввод в методе init подкласса. Не уверен, что есть лучший способ, но он работает.