Apache beam — Поток данных Google — WriteToBigQuery — Python — Параметры — Шаблоны — Конвейеры

#python #templates #google-bigquery #google-cloud-dataflow #apache-beam

#python #шаблоны #google-bigquery #google-облако-поток данных #apache-beam

Вопрос:

У меня есть 2 вопроса по моей разработке.

Вопрос 1

Я пытаюсь создать шаблон из кода python, который состоит из чтения из таблиц BigQuery, применения некоторых преобразований и записи в другую таблицу BigQuery (которая может существовать или нет).

Дело в том, что мне нужно отправить целевую таблицу в качестве параметра, но, похоже, я не могу использовать параметры в конвейерном методе WriteToBigQuery, поскольку он выдает следующее сообщение об ошибке: apache_beam.error.Ошибка runtimevalueprovider: RuntimeValueProvider (опция: project_target, тип: str, default_value: ‘Test’).get() не вызывается из контекста среды выполнения

Подход 1

 with beam.Pipeline(options=options) as pipeline:
    logging.info("Start logic process...")
    kpis_report = (
            pipeline
            | "Process start" >> Create(["1"])
            | "Delete previous data" >> ParDo(preTasks())
            | "Read table" >> ParDo(readTable())
            ....
            | 'Write table 2' >> Write(WriteToBigQuery(
               table=custom_options.project_target.get()   ":"   custom_options.dataset_target.get()   "."   custom_options.table_target.get(),
        schema=custom_options.target_schema.get(),
        write_disposition=BigQueryDisposition.WRITE_APPEND,
        create_disposition=BigQueryDisposition.CREATE_IF_NEEDED)
  

Подход 2

Я создал функцию ParDo, чтобы получить там переменную и установить метод WriteToBigQuery. Однако, несмотря на успешное завершение выполнения конвейера и то, что на выходе возвращаются строки (теоретически записанные), я не вижу ни таблицы, ни вставленных в нее данных.

     with beam.Pipeline(options=options) as pipeline:
    logging.info("Start logic process...")
    kpis_report = (
            pipeline
            | "Process start" >> Create(["1"])
            | "Pre-tasks" >> ParDo(preTasks())
            | "Read table" >> ParDo(readTable())
            ....

            | 'Write table 2' >> Write(WriteToBigQuery())
  

Я пробовал использовать 2 метода, но ни один из них не работает: BigQueryBatchFileLoads и WriteToBigQuery

 class writeTable(beam.DoFn):
def process(self, element):
    try:
        #Load first here the parameters from the custom_options variable (Here we can do it)

        result1 = Write(BigQueryBatchFileLoads(destination=target_table,
                                     schema=target_schema,
                                     write_disposition=BigQueryDisposition.WRITE_APPEND,
                                     create_disposition=BigQueryDisposition.CREATE_IF_NEEDED))
        
        result2 = WriteToBigQuery(table=target_table,
                        schema=target_schema,
                        write_disposition=BigQueryDisposition.WRITE_APPEND,
                        create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                        method="FILE_LOADS"
                        ) 
  

Вопрос 2

У меня есть еще одно сомнение в том, что в этом последнем классе ParDo мне нужно вернуть что-то в качестве элемента или result1 или result2, поскольку мы находимся на последнем шаге конвейера.

Ценю вашу помощь в этом.

Комментарии:

1. * Более подробная информация об успешном выполнении: смотрите Ссылку ниже, чтобы увидеть, что выполнение конвейера в сценарии 2 работает нормально и возвращает строки, однако таблица или данные недоступны в BigQuery. i.stack.imgur.com/GCueP.png

2. * Подробнее о подходе 2: я где-то читал, что мне нужно выполнить следующий шаг, но не уверен, как это сделать: «Как только вы переместите его из DoFn, вам нужно применить PTransform beam.io.gcp.bigquery. WriteToBigQuery в PCollection, чтобы это имело какой-либо эффект «. Это правильно?

Ответ №1:

Наиболее рекомендуемый способ сделать это похож на # 1, но передает поставщика значений без вызова get и передает лямбда-выражение для таблицы:

 with beam.Pipeline(options=options) as pipeline:
    logging.info("Start logic process...")
    kpis_report = (
            pipeline
            | "Process start" >> Create(["1"])
            | "Delete previous data" >> ParDo(preTasks())
            | "Read table" >> ParDo(readTable())
            ....
            | 'Write table 2' >> WriteToBigQuery(
               table=lambda x: custom_options.project_target.get()   ":"   custom_options.dataset_target.get()   "."   custom_options.table_target.get(),
        schema=custom_options.target_schema,
        write_disposition=BigQueryDisposition.WRITE_APPEND,
        create_disposition=BigQueryDisposition.CREATE_IF_NEEDED)
  

Это должно сработать.