apache-beam считывает несколько файлов из нескольких папок пакетов GCS и загружает их biquery python

#python #protocol-buffers #google-cloud-dataflow #apache-beam

# #python #протокол-буферы #google-облако-поток данных #apache-beam

Вопрос:

Я хочу настроить конвейер каждый час для анализа 2000 необработанных файлов формата protobuf в разных папках групп GCS и загрузки данных в большой запрос. пока я могу успешно анализировать прото-данные.

Я знаю метод подстановочных знаков для чтения всех файлов в папке, но я не хочу этого сейчас, поскольку у меня есть данные из разных папок, и я хочу запускать это быстрее, как параллелизм, а не последовательно

как показано ниже

 for x,filename enumerate(file_separted_comma):
    --read data from prto
    --load data to bigquery 
 

Теперь я хочу знать, является ли приведенный ниже подход лучшим или рекомендуемым способом анализа нескольких файлов из разных папок в apache beam и загрузки данных в большой запрос.

еще одна вещь, каждая запись после синтаксического анализа из proto, я превращаю ее в запись JSON для загрузки в большой запрос и не знаю, что это также хороший способ загрузить данные в большой запрос вместо прямой загрузки десериализованных (проанализированных) данных proto.

Я перехожу от задания Hadoop к потоку данных, чтобы снизить затраты за счет настройки этого конвейера.

Я новичок в apache-beam, не знаю, каковы минусы и плюсы, поэтому может кто-нибудь взглянуть на код и помочь мне здесь, чтобы сделать лучший подход к производству

 import time
import sys
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import csv
import base64
import rtbtracker_log_pb2
from google.protobuf import timestamp_pb2
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import MessageToJson
import io
from apache_beam.io.filesystems import FileSystems


def get_deserialized_log(serialized_log):
    log = rtbtracker_log_pb2.RtbTrackerLogProto()
    log.ParseFromString(serialized_log)
    return log


def print_row(message):
    message=message[3]
    message = message.replace('_', '/');
    message = message.replace('*', '=');
    message = message.replace('-', ' ');
    #finalbunary=base64.b64decode(message.decode('UTF-8'))
    finalbunary=base64.b64decode(message)
    msg=get_deserialized_log(finalbunary)

    jsonObj = MessageToDict(msg)
    #jsonObj = MessageToJson(msg)
    return jsonObj

def parse_file(element):
  for line in csv.reader([element], quotechar='"', delimiter='t', quoting=csv.QUOTE_ALL, skipinitialspace=True):
    return line



def run():
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", dest="input", required=False)
    parser.add_argument("--output", dest="output", required=False)
    app_args, pipeline_args = parser. parse_known_args()

    with beam.Pipeline(options=PipelineOptions()) as p:
        input_list=app_args.input
        file_list = input_list.split(",")
        res_list = ["/home/file_{}-00000-of-00001.json".format(i) for i in range(len(file_list))]

        for i,file in enumerate(file_list):
            onesec=p | "Read Text {}".format(i) >> beam.io.textio.ReadFromText(file)
            parsingProtoFile=onesec | 'Parse file{}'.format(i) >> beam.Map(parse_file)
            printFileConetent=parsingProtoFile | 'Print output {}'.format(i) >>beam.Map(print_row)
        
            #i want to load to bigquery here
            ##LOAD DATA TO BIGQUERY

            #secondsec=printFileConetent | "Write TExt {}".format(i) >> ##beam.io.WriteToText("/home/file_{}".format(i),file_name_suffix=".json", 
###num_shards=1 , 
##append_trailing_newlines = True)
        

if __name__ == '__main__':
    run()
 

запуск кода ниже в локальном

 python3 another_main.py --input=tracker_one.gz,tracker_two.gz
 

путь вывода, который я не упомянул, поскольку я не хочу сохранять данные в gcs, поскольку я буду загружать их в bigquery

и, как показано ниже, работает в dataflowrunner

 python3 final_beam_v1.py --input gs://bucket/folder/2020/12/23/00/00/fileread.gz --output gs://bucket/beamoutput_four/ --runner DataflowRunner --project PROJECT --staging_location gs://bucket/staging_four --temp_location gs://bucket/temp_four --region us-east1 --setup_file ./setup.py --job_name testing
 

заметил, что два задания будут выполняться для одного входного файла с тем же именем задания, и dnt знает, почему это происходит, и скриншот PFA для того же введите описание изображения здесь

Ответ №1:

Этот метод чтения файлов подходит (если количество входных файлов не слишком велико). Однако, если вы можете выразить набор файлов, которые хотите прочитать, в виде выражения с подстановочными знаками (которое может совпадать с несколькими папками), это, вероятно, будет работать лучше, и поток данных будет параллельно считывать все файлы, соответствующие шаблону.

Для записи в BigQuery лучше всего использовать встроенный приемник BigQuery. Поведение по умолчанию заключается в создании временных файлов в формате JSON, а затем загрузке их в BigQuery, но вы также можете использовать вместо этого Avro, что может быть более эффективным. Вы также можете объединить все свои входные данные в одну PCollection с помощью Flatten, так что вам понадобится только один приемник BigQuery в вашем конвейере.

Комментарии:

1. Привет @danieim, список файлов для обработки я получу из таблицы и что я не знаю, как сопоставить их с помощью подстановочного знака, так как это будет динамическим. входной разделитель gcs будет выглядеть следующим образом gs:// bucket/ГГГГ / ММ / ДД / ЧЧ / МИН /, МИН будет 00 или 30, так как данные, поступающие из compoenets, разделяются на две получасовые папки

2. а также можете ли вы, пожалуйста, объяснить мне, почему два задания выполняются с одним и тем же именем задания

3. Для динамического набора файлов вы можете использовать github.com/apache/beam/blob/master/sdks/python/apache_beam/io /… преобразование, которое принимает PCollection имен файлов в качестве входных данных. Для вопроса о нескольких заданиях: скорее всего, у вас где-то есть два вызова p.run() в вашем коде

4. да, вы правы, я сделал это через ReadAllFromText и теперь работает нормально, и для нескольких заданий причиной являются два вызова p.run(), и это также теперь исправлено и работает нормально. Большое спасибо