# #python #google-bigquery #google-cloud-dataflow #apache-beam
Вопрос:
У меня есть поток данных заданий, который использует bigquery для чтения некоторых данных из pubsub для потоковой передачи (или gcs для пакетной передачи), а затем вставляет их в bigquery.
Для пакетной обработки я использую следующий шаг:
(processed_rows | "Write Full to Bigquery" >> apache_beam.io.WriteToBigQuery(
table_fn,
schema=schema_fn,
additional_bq_parameters=additional_bq_parameters_fn,
write_disposition=apache_beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=apache_beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
method=apache_beam.io.WriteToBigQuery.Method.FILE_LOADS
)
)
Для потоковой передачи у меня есть следующий:
(processed_rows | "Write to Bigquery Streaming" >> apache_beam.io.WriteToBigQuery(
table_fn,
schema=schema_fn,
additional_bq_parameters=additional_bq_parameters_fn,
write_disposition=apache_beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=apache_beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
method=apache_beam.io.WriteToBigQuery.Method.STREAMING_INSERTS)
)
Моя функция additional_bq_parameters_fn выглядит следующим образом:
def additional_bq_parameters_fn(table_reference_name):
table_name = table_reference_name.split('.')[-1]
bq_param = MAP_TABLE_BQ_PARAM[table_name]
return bq_param
До сих пор он работал для пакетного задания, но не для потокового, где я получаю следующую ошибку:
Аргумент объекта _MessageClass после ** должен быть отображением, а не функцией
Когда я углубился в код луча apache, я нашел это в файлах BigQueryBatchFileLoads:
if callable(self._additional_bq_parameters):
additional_parameters = self._additional_bq_parameters(destination)
Однако я не могу найти ничего эквивалентного в потоковой части.
Итак, мой вопрос: я что-то пропустил или в коде apache beam есть ошибка (или неполная функция)?
Если я правильно понял, то я открою билет в apache beam Jira.
Комментарии:
1. Я видел этот документ, связанный с этим, не хотели бы вы его проверить? cloud.google.com/dataflow/docs/guides/…
2. 1) Какой тип данных
additional_bq_parameters_fn
возвращает? 2) Можете ли вы попробовать жестко закодировать значенияadditional_bq_parameters
в переменной вместо того, чтобы возвращать их из функции? 3) Когда вы вызываете функцию additional_bq_parameters_fn. Как вы передаетеtable_reference_name
необходимый параметр в своем коде?3. К сожалению, дополнительные параметры_bq_parameters поддерживаются только для пакетной обработки, как вы правильно поняли. Какие параметры вы пытаетесь передать?
4. @Ricco : Я пытался жестко закодировать, и, конечно, это работает. Мне нужно что-то в зависимости от сообщения. имя table_reference_name передается самим потоком данных в процессе луча ( аналогично работает для схемы и имени таблицы ). Я открою для него билет на луч джира. Спасибо за ваши комментарии
5. @Pablo : В моем json у меня есть поле, представляющее имя таблицы. Я хочу взять это поле ( может отличаться для 2 сообщений ) и использовать его для вставки в соответствующую таблицу. Проблема в том, что когда этой таблицы не существует, я хочу создать ее с помощью правильного ключа кластеризации и поля partition_date_field, что на самом деле невозможно в потоковой передаче.