#python #templates #google-bigquery #google-cloud-dataflow #apache-beam
#python #шаблоны #google-bigquery #google-облако-поток данных #apache-beam
Вопрос:
У меня есть 2 вопроса по моей разработке.
Вопрос 1
Я пытаюсь создать шаблон из кода python, который состоит из чтения из таблиц BigQuery, применения некоторых преобразований и записи в другую таблицу BigQuery (которая может существовать или нет).
Дело в том, что мне нужно отправить целевую таблицу в качестве параметра, но, похоже, я не могу использовать параметры в конвейерном методе WriteToBigQuery, поскольку он выдает следующее сообщение об ошибке: apache_beam.error.Ошибка runtimevalueprovider: RuntimeValueProvider (опция: project_target, тип: str, default_value: ‘Test’).get() не вызывается из контекста среды выполнения
Подход 1
with beam.Pipeline(options=options) as pipeline:
logging.info("Start logic process...")
kpis_report = (
pipeline
| "Process start" >> Create(["1"])
| "Delete previous data" >> ParDo(preTasks())
| "Read table" >> ParDo(readTable())
....
| 'Write table 2' >> Write(WriteToBigQuery(
table=custom_options.project_target.get() ":" custom_options.dataset_target.get() "." custom_options.table_target.get(),
schema=custom_options.target_schema.get(),
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED)
Подход 2
Я создал функцию ParDo, чтобы получить там переменную и установить метод WriteToBigQuery. Однако, несмотря на успешное завершение выполнения конвейера и то, что на выходе возвращаются строки (теоретически записанные), я не вижу ни таблицы, ни вставленных в нее данных.
with beam.Pipeline(options=options) as pipeline:
logging.info("Start logic process...")
kpis_report = (
pipeline
| "Process start" >> Create(["1"])
| "Pre-tasks" >> ParDo(preTasks())
| "Read table" >> ParDo(readTable())
....
| 'Write table 2' >> Write(WriteToBigQuery())
Я пробовал использовать 2 метода, но ни один из них не работает: BigQueryBatchFileLoads и WriteToBigQuery
class writeTable(beam.DoFn):
def process(self, element):
try:
#Load first here the parameters from the custom_options variable (Here we can do it)
result1 = Write(BigQueryBatchFileLoads(destination=target_table,
schema=target_schema,
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED))
result2 = WriteToBigQuery(table=target_table,
schema=target_schema,
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
method="FILE_LOADS"
)
Вопрос 2
У меня есть еще одно сомнение в том, что в этом последнем классе ParDo мне нужно вернуть что-то в качестве элемента или result1 или result2, поскольку мы находимся на последнем шаге конвейера.
Ценю вашу помощь в этом.
Комментарии:
1. * Более подробная информация об успешном выполнении: смотрите Ссылку ниже, чтобы увидеть, что выполнение конвейера в сценарии 2 работает нормально и возвращает строки, однако таблица или данные недоступны в BigQuery. i.stack.imgur.com/GCueP.png
2. * Подробнее о подходе 2: я где-то читал, что мне нужно выполнить следующий шаг, но не уверен, как это сделать: «Как только вы переместите его из DoFn, вам нужно применить PTransform beam.io.gcp.bigquery. WriteToBigQuery в PCollection, чтобы это имело какой-либо эффект «. Это правильно?
Ответ №1:
Наиболее рекомендуемый способ сделать это похож на # 1, но передает поставщика значений без вызова get
и передает лямбда-выражение для таблицы:
with beam.Pipeline(options=options) as pipeline:
logging.info("Start logic process...")
kpis_report = (
pipeline
| "Process start" >> Create(["1"])
| "Delete previous data" >> ParDo(preTasks())
| "Read table" >> ParDo(readTable())
....
| 'Write table 2' >> WriteToBigQuery(
table=lambda x: custom_options.project_target.get() ":" custom_options.dataset_target.get() "." custom_options.table_target.get(),
schema=custom_options.target_schema,
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED)
Это должно сработать.