дополнительные параметры_bq_parameters как вызываемые не работают при потоковой передаче?

# #python #google-bigquery #google-cloud-dataflow #apache-beam

Вопрос:

У меня есть поток данных заданий, который использует bigquery для чтения некоторых данных из pubsub для потоковой передачи (или gcs для пакетной передачи), а затем вставляет их в bigquery.

Для пакетной обработки я использую следующий шаг:

 (processed_rows | "Write Full to Bigquery" >> apache_beam.io.WriteToBigQuery(
    table_fn,
    schema=schema_fn,
    additional_bq_parameters=additional_bq_parameters_fn,
    write_disposition=apache_beam.io.BigQueryDisposition.WRITE_APPEND,
    create_disposition=apache_beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    method=apache_beam.io.WriteToBigQuery.Method.FILE_LOADS
    )
 )
 

Для потоковой передачи у меня есть следующий:

 (processed_rows | "Write to Bigquery Streaming" >> apache_beam.io.WriteToBigQuery(
    table_fn,
    schema=schema_fn,
    additional_bq_parameters=additional_bq_parameters_fn,
    write_disposition=apache_beam.io.BigQueryDisposition.WRITE_APPEND,
    create_disposition=apache_beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    method=apache_beam.io.WriteToBigQuery.Method.STREAMING_INSERTS)
 )
 

Моя функция additional_bq_parameters_fn выглядит следующим образом:

 def additional_bq_parameters_fn(table_reference_name):
    table_name = table_reference_name.split('.')[-1]
    bq_param = MAP_TABLE_BQ_PARAM[table_name]
    return bq_param
 

До сих пор он работал для пакетного задания, но не для потокового, где я получаю следующую ошибку:

Аргумент объекта _MessageClass после ** должен быть отображением, а не функцией

Когда я углубился в код луча apache, я нашел это в файлах BigQueryBatchFileLoads:

 if callable(self._additional_bq_parameters):
  additional_parameters = self._additional_bq_parameters(destination)
 

Однако я не могу найти ничего эквивалентного в потоковой части.

Итак, мой вопрос: я что-то пропустил или в коде apache beam есть ошибка (или неполная функция)?

Если я правильно понял, то я открою билет в apache beam Jira.

Комментарии:

1. Я видел этот документ, связанный с этим, не хотели бы вы его проверить? cloud.google.com/dataflow/docs/guides/…

2. 1) Какой тип данных additional_bq_parameters_fn возвращает? 2) Можете ли вы попробовать жестко закодировать значения additional_bq_parameters в переменной вместо того, чтобы возвращать их из функции? 3) Когда вы вызываете функцию additional_bq_parameters_fn. Как вы передаете table_reference_name необходимый параметр в своем коде?

3. К сожалению, дополнительные параметры_bq_parameters поддерживаются только для пакетной обработки, как вы правильно поняли. Какие параметры вы пытаетесь передать?

4. @Ricco : Я пытался жестко закодировать, и, конечно, это работает. Мне нужно что-то в зависимости от сообщения. имя table_reference_name передается самим потоком данных в процессе луча ( аналогично работает для схемы и имени таблицы ). Я открою для него билет на луч джира. Спасибо за ваши комментарии

5. @Pablo : В моем json у меня есть поле, представляющее имя таблицы. Я хочу взять это поле ( может отличаться для 2 сообщений ) и использовать его для вставки в соответствующую таблицу. Проблема в том, что когда этой таблицы не существует, я хочу создать ее с помощью правильного ключа кластеризации и поля partition_date_field, что на самом деле невозможно в потоковой передаче.