#google-cloud-platform #google-cloud-dataflow #dataflow
#google-cloud-platform #google-cloud-поток данных #поток данных
Вопрос:
Я могу загрузить несколько CSV-файлов в bigquery с помощью потока данных, используя цикл for. Но в этом случае каждый раз запускается новый поток данных, что приводит к дополнительным накладным расходам.
Поток данных — часть моего кода:
def run(abs_csv_file_name="", table_name="", argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input_csv_file',
dest='input_csv_file',
default='gs://{0}/{1}'.format(bucket_name,abs_csv_file_name),
help='Input file to process.')
parser.add_argument('--output_stage_bq',
dest='output_stage_bq',
default='{0}:{1}.{2}'.format(project_id,stage_dataset_name,table_name),
help='Output file to write results to.')
parser.add_argument('--output_target_bq',
dest='output_target_bq',
default='{0}:{1}.{2}'.format(project_id,dataset_name,table_name),
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
# delete_a_bq_table(table_name)
table_spec = "{0}:{1}.{2}".format(project_id, stage_dataset_name, table_name)
with beam.Pipeline(options=pipeline_options) as p1:
data_csv = p1 | 'Read CSV file' >> ReadFromText(known_args.input_csv_file)
dict1 = (data_csv | 'Format to json' >> (beam.ParDo(Split())))
(dict1 | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
known_args.output_stage_bq,
schema=product_revenue_schema
))
fullTable = (p1 | 'ReadFromBQ' >> beam.io.Read(beam.io.BigQuerySource(table_spec)))
(fullTable | 'writeToBQ another dataset' >> beam.io.WriteToBigQuery(known_args.output_target_bq,
schema = product_revenue_schema))
Я уверен, что должен быть лучший способ, чем каждый раз вызывать функцию run.
for i in range(len(table_names)):
find_product_revenue_schema_and_column_name(table_name=table_names[i])
run(abs_csv_file_name=abs_file_names[i], table_name=table_names[i])
Мне нужно написать код, который может гарантировать, что последующие задания потока данных будут выполняться на том же компьютере, так что время настройки компьютера может быть сэкономлено.
Комментарии:
1. Я думаю, у вас неправильное представление об архитектуре потока данных. Никогда не должно быть случая, когда вы беспокоитесь о том, где выполняются части потока данных. Поток данных по определению является распределенной средой. Во фрагментах вашего кода нет никаких признаков потока данных. Я хотел бы предложить вам переработать свой вопрос и добавить гораздо больше деталей … как минимум, опишите, как вы взаимодействуете с потоком данных.
2. 1. У меня есть несколько таблиц, для которых нужно сделать ETL. 2. Есть несколько дополнительных пакетов, которые мне нужно пройти — требования. Я хочу выполнить этот run() для всех этих таблиц. Итак, какой будет лучший подход для экономии времени при настройке машины потока данных каждый раз, используя каждый раз одну и ту же настроенную машину.