как убедиться, что последующие задания потока данных будут выполняться на том же компьютере

#google-cloud-platform #google-cloud-dataflow #dataflow

#google-cloud-platform #google-cloud-поток данных #поток данных

Вопрос:

Я могу загрузить несколько CSV-файлов в bigquery с помощью потока данных, используя цикл for. Но в этом случае каждый раз запускается новый поток данных, что приводит к дополнительным накладным расходам.

Поток данных — часть моего кода:

 def run(abs_csv_file_name="", table_name="", argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--input_csv_file',
                        dest='input_csv_file',
               default='gs://{0}/{1}'.format(bucket_name,abs_csv_file_name),
                        help='Input file to process.')
    parser.add_argument('--output_stage_bq',
                        dest='output_stage_bq',
                        default='{0}:{1}.{2}'.format(project_id,stage_dataset_name,table_name),
                        help='Output file to write results to.')

    parser.add_argument('--output_target_bq',
                        dest='output_target_bq',
                        default='{0}:{1}.{2}'.format(project_id,dataset_name,table_name),
                        help='Output file to write results to.')

    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)

    # delete_a_bq_table(table_name)
    table_spec = "{0}:{1}.{2}".format(project_id, stage_dataset_name, table_name)

    with beam.Pipeline(options=pipeline_options) as p1:
        data_csv = p1 | 'Read CSV file' >> ReadFromText(known_args.input_csv_file)
        dict1 = (data_csv | 'Format to json' >> (beam.ParDo(Split())))
        (dict1 | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                                            known_args.output_stage_bq,
                                            schema=product_revenue_schema
                                            ))
        fullTable = (p1 | 'ReadFromBQ' >> beam.io.Read(beam.io.BigQuerySource(table_spec)))
        (fullTable | 'writeToBQ another dataset' >> beam.io.WriteToBigQuery(known_args.output_target_bq,
                                schema = product_revenue_schema))
  

Я уверен, что должен быть лучший способ, чем каждый раз вызывать функцию run.

 for i in range(len(table_names)):
    find_product_revenue_schema_and_column_name(table_name=table_names[i])

    run(abs_csv_file_name=abs_file_names[i], table_name=table_names[i])
  

Мне нужно написать код, который может гарантировать, что последующие задания потока данных будут выполняться на том же компьютере, так что время настройки компьютера может быть сэкономлено.

Комментарии:

1. Я думаю, у вас неправильное представление об архитектуре потока данных. Никогда не должно быть случая, когда вы беспокоитесь о том, где выполняются части потока данных. Поток данных по определению является распределенной средой. Во фрагментах вашего кода нет никаких признаков потока данных. Я хотел бы предложить вам переработать свой вопрос и добавить гораздо больше деталей … как минимум, опишите, как вы взаимодействуете с потоком данных.

2. 1. У меня есть несколько таблиц, для которых нужно сделать ETL. 2. Есть несколько дополнительных пакетов, которые мне нужно пройти — требования. Я хочу выполнить этот run() для всех этих таблиц. Итак, какой будет лучший подход для экономии времени при настройке машины потока данных каждый раз, используя каждый раз одну и ту же настроенную машину.