«Не удается рассолить генераторные объекты» в Apache Beam — Python

# #python #google-bigquery #apache-beam #apache-beam-io

Вопрос:

Я получаю следующую ошибку при запуске DirectRunner: не удается выделить объекты генератора [при запуске «Назначить боковые столбцы»]

Ошибка появляется, когда луч записывает «сопоставленные строки» в BigQuery.

Есть идеи, откуда взялась эта ошибка?

 def assign_columns(row, mapping):  main_account = row['filepath_platform_id']  main_report = row['filepath_report_name']  for _mapping in mapping:  _side_account = _mapping['account_id']  _side_report = _mapping['report_name']  if main_account == _side_account and main_report == _side_report:  _cols = json.loads(_mapping['mapping'])  row = renames_columns(row, _cols)  yield row   def run(argv=None, save_main_session=True):  options = PipelineOptions(flags=argv)  options.view_as(SetupOptions).save_main_session = save_main_session  table_spec = 'SELECT * FROM `nextwork-staging.datalake.v_mapping_reports`'  with beam.Pipeline(options=options) as p:   common_options = options.view_as(CommonOptions)   file_metadata = (  p  | 'Create empty PCollection' gt;gt; beam.Create(['Start'])  | 'Load Input Location' gt;gt; beam.ParDo(CreateFilesFromGlobByDate(common_options.input, None, None, None, common_options.starting_date, common_options.ending_date, common_options.only_last_file_from_input))  )   rows = (  file_metadata | 'GetIncomeAccessFn' gt;gt; beam.ParDo(GetIncomeAccessFn())  | 'Map to regular dictionary' gt;gt; beam.Map(lambda x: dict(x))  | 'TransformDate' gt;gt; beam.ParDo(TransformDateFromStringFn())   )   side_input = (  p  | 'ReadTable' gt;gt; beam.io.ReadFromBigQuery(query=table_spec, use_standard_sql=True)  | 'Map Columns' gt;gt; beam.Map(lambda x: dict(x))  )   mapped_rows = (  rows | 'Assign Side Columns' gt;gt; beam.Map(assign_columns, mapping=beam.pvalue.AsIter(side_input))  | 'Rename column' gt;gt; beam.Map(renames_columns, names={'site': 'campaign'})  | 'TransformForBigQuery' gt;gt; beam.ParDo(TransformForBigQueryFn())  )   gcp_options = options.view_as(GoogleCloudOptions)   (  mapped_rows  | 'WriteToBigQueryUsingParameter' gt;gt; bigquery_file_loads.BigQueryBatchFileLoads(  destination=_get_table_name,  schema=_get_schema,  create_disposition='CREATE_IF_NEEDED',  write_disposition='WRITE_TRUNCATE',  additional_bq_parameters=_get_additional_bq_parameters,  custom_gcs_temp_location=gcp_options.temp_location  )  )