#python #protocol-buffers #google-cloud-dataflow #apache-beam
# #python #протокол-буферы #google-облако-поток данных #apache-beam
Вопрос:
Я хочу настроить конвейер каждый час для анализа 2000 необработанных файлов формата protobuf в разных папках групп GCS и загрузки данных в большой запрос. пока я могу успешно анализировать прото-данные.
Я знаю метод подстановочных знаков для чтения всех файлов в папке, но я не хочу этого сейчас, поскольку у меня есть данные из разных папок, и я хочу запускать это быстрее, как параллелизм, а не последовательно
как показано ниже
for x,filename enumerate(file_separted_comma):
--read data from prto
--load data to bigquery
Теперь я хочу знать, является ли приведенный ниже подход лучшим или рекомендуемым способом анализа нескольких файлов из разных папок в apache beam и загрузки данных в большой запрос.
еще одна вещь, каждая запись после синтаксического анализа из proto, я превращаю ее в запись JSON для загрузки в большой запрос и не знаю, что это также хороший способ загрузить данные в большой запрос вместо прямой загрузки десериализованных (проанализированных) данных proto.
Я перехожу от задания Hadoop к потоку данных, чтобы снизить затраты за счет настройки этого конвейера.
Я новичок в apache-beam, не знаю, каковы минусы и плюсы, поэтому может кто-нибудь взглянуть на код и помочь мне здесь, чтобы сделать лучший подход к производству
import time
import sys
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import csv
import base64
import rtbtracker_log_pb2
from google.protobuf import timestamp_pb2
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import MessageToJson
import io
from apache_beam.io.filesystems import FileSystems
def get_deserialized_log(serialized_log):
log = rtbtracker_log_pb2.RtbTrackerLogProto()
log.ParseFromString(serialized_log)
return log
def print_row(message):
message=message[3]
message = message.replace('_', '/');
message = message.replace('*', '=');
message = message.replace('-', ' ');
#finalbunary=base64.b64decode(message.decode('UTF-8'))
finalbunary=base64.b64decode(message)
msg=get_deserialized_log(finalbunary)
jsonObj = MessageToDict(msg)
#jsonObj = MessageToJson(msg)
return jsonObj
def parse_file(element):
for line in csv.reader([element], quotechar='"', delimiter='t', quoting=csv.QUOTE_ALL, skipinitialspace=True):
return line
def run():
parser = argparse.ArgumentParser()
parser.add_argument("--input", dest="input", required=False)
parser.add_argument("--output", dest="output", required=False)
app_args, pipeline_args = parser. parse_known_args()
with beam.Pipeline(options=PipelineOptions()) as p:
input_list=app_args.input
file_list = input_list.split(",")
res_list = ["/home/file_{}-00000-of-00001.json".format(i) for i in range(len(file_list))]
for i,file in enumerate(file_list):
onesec=p | "Read Text {}".format(i) >> beam.io.textio.ReadFromText(file)
parsingProtoFile=onesec | 'Parse file{}'.format(i) >> beam.Map(parse_file)
printFileConetent=parsingProtoFile | 'Print output {}'.format(i) >>beam.Map(print_row)
#i want to load to bigquery here
##LOAD DATA TO BIGQUERY
#secondsec=printFileConetent | "Write TExt {}".format(i) >> ##beam.io.WriteToText("/home/file_{}".format(i),file_name_suffix=".json",
###num_shards=1 ,
##append_trailing_newlines = True)
if __name__ == '__main__':
run()
запуск кода ниже в локальном
python3 another_main.py --input=tracker_one.gz,tracker_two.gz
путь вывода, который я не упомянул, поскольку я не хочу сохранять данные в gcs, поскольку я буду загружать их в bigquery
и, как показано ниже, работает в dataflowrunner
python3 final_beam_v1.py --input gs://bucket/folder/2020/12/23/00/00/fileread.gz --output gs://bucket/beamoutput_four/ --runner DataflowRunner --project PROJECT --staging_location gs://bucket/staging_four --temp_location gs://bucket/temp_four --region us-east1 --setup_file ./setup.py --job_name testing
заметил, что два задания будут выполняться для одного входного файла с тем же именем задания, и dnt знает, почему это происходит, и скриншот PFA для того же
Ответ №1:
Этот метод чтения файлов подходит (если количество входных файлов не слишком велико). Однако, если вы можете выразить набор файлов, которые хотите прочитать, в виде выражения с подстановочными знаками (которое может совпадать с несколькими папками), это, вероятно, будет работать лучше, и поток данных будет параллельно считывать все файлы, соответствующие шаблону.
Для записи в BigQuery лучше всего использовать встроенный приемник BigQuery. Поведение по умолчанию заключается в создании временных файлов в формате JSON, а затем загрузке их в BigQuery, но вы также можете использовать вместо этого Avro, что может быть более эффективным. Вы также можете объединить все свои входные данные в одну PCollection с помощью Flatten, так что вам понадобится только один приемник BigQuery в вашем конвейере.
Комментарии:
1. Привет @danieim, список файлов для обработки я получу из таблицы и что я не знаю, как сопоставить их с помощью подстановочного знака, так как это будет динамическим. входной разделитель gcs будет выглядеть следующим образом gs:// bucket/ГГГГ / ММ / ДД / ЧЧ / МИН /, МИН будет 00 или 30, так как данные, поступающие из compoenets, разделяются на две получасовые папки
2. а также можете ли вы, пожалуйста, объяснить мне, почему два задания выполняются с одним и тем же именем задания
3. Для динамического набора файлов вы можете использовать github.com/apache/beam/blob/master/sdks/python/apache_beam/io /… преобразование, которое принимает PCollection имен файлов в качестве входных данных. Для вопроса о нескольких заданиях: скорее всего, у вас где-то есть два вызова p.run() в вашем коде
4. да, вы правы, я сделал это через ReadAllFromText и теперь работает нормально, и для нескольких заданий причиной являются два вызова p.run(), и это также теперь исправлено и работает нормально. Большое спасибо