# #python #google-bigquery #apache-beam #apache-beam-io
Вопрос:
Я получаю следующую ошибку при запуске DirectRunner: не удается выделить объекты генератора [при запуске «Назначить боковые столбцы»]
Ошибка появляется, когда луч записывает «сопоставленные строки» в BigQuery.
Есть идеи, откуда взялась эта ошибка?
def assign_columns(row, mapping): main_account = row['filepath_platform_id'] main_report = row['filepath_report_name'] for _mapping in mapping: _side_account = _mapping['account_id'] _side_report = _mapping['report_name'] if main_account == _side_account and main_report == _side_report: _cols = json.loads(_mapping['mapping']) row = renames_columns(row, _cols) yield row def run(argv=None, save_main_session=True): options = PipelineOptions(flags=argv) options.view_as(SetupOptions).save_main_session = save_main_session table_spec = 'SELECT * FROM `nextwork-staging.datalake.v_mapping_reports`' with beam.Pipeline(options=options) as p: common_options = options.view_as(CommonOptions) file_metadata = ( p | 'Create empty PCollection' gt;gt; beam.Create(['Start']) | 'Load Input Location' gt;gt; beam.ParDo(CreateFilesFromGlobByDate(common_options.input, None, None, None, common_options.starting_date, common_options.ending_date, common_options.only_last_file_from_input)) ) rows = ( file_metadata | 'GetIncomeAccessFn' gt;gt; beam.ParDo(GetIncomeAccessFn()) | 'Map to regular dictionary' gt;gt; beam.Map(lambda x: dict(x)) | 'TransformDate' gt;gt; beam.ParDo(TransformDateFromStringFn()) ) side_input = ( p | 'ReadTable' gt;gt; beam.io.ReadFromBigQuery(query=table_spec, use_standard_sql=True) | 'Map Columns' gt;gt; beam.Map(lambda x: dict(x)) ) mapped_rows = ( rows | 'Assign Side Columns' gt;gt; beam.Map(assign_columns, mapping=beam.pvalue.AsIter(side_input)) | 'Rename column' gt;gt; beam.Map(renames_columns, names={'site': 'campaign'}) | 'TransformForBigQuery' gt;gt; beam.ParDo(TransformForBigQueryFn()) ) gcp_options = options.view_as(GoogleCloudOptions) ( mapped_rows | 'WriteToBigQueryUsingParameter' gt;gt; bigquery_file_loads.BigQueryBatchFileLoads( destination=_get_table_name, schema=_get_schema, create_disposition='CREATE_IF_NEEDED', write_disposition='WRITE_TRUNCATE', additional_bq_parameters=_get_additional_bq_parameters, custom_gcs_temp_location=gcp_options.temp_location ) )