Динамический доступ к файлу в конвейере потока данных

#python #google-cloud-platform #google-cloud-dataflow #apache-beam

#python #google-облачная платформа #google-облако-поток данных #apache-beam

Вопрос:

Я использую конвейер потока данных, и я хотел бы иметь доступ к файлам на нем.

это строка, в которой я вызываю метод для получения файлов и извлечения модулей:

 def run():
    """ Run pipeline"""
    options: PipelineOptions = PipelineOptions(
        project='production-213911',
        runner='DataflowRunner',
        region='europe-west1',
        streaming=True,
        setup_file='dataflow/setup.py',
        autoscaling_algorith='THROUGHPUT_BASED',
    )
    proto = Container().protobuf()
    test = proto.get_proto_file('data/build')    
    proto.get_obj_from_file(test)

    with beam.Pipeline(options=options) as pipeline:
        ...
  

это этап конвейера, на котором я хотел бы использовать список моего модуля:

         status_records = (status | 'Proto to Dict' >> beam.Map(
        lambda x: convert_proto_to_dict(x, proto.protos)))
  

и вот код, который просматривает каталог для получения файлов:

 @staticmethod
def get_proto_file(dirname: str = 'python_protocol') -> List[object]:
    """Iterate threw dir w. build .proto
    :param
        dirname = name of the directory to browse
    :return
        List[object] with module
    """
    protos: List[object] = []
    for root, _, mod in os.walk(dirname):
        for name in mod:
            if 'pb2' in name and 'pyc' not in name:
                print(name)
                module = name[:-3]
                if '/' in root:
                    dirname = root.replace('/', '.')
                path = f'{dirname}.{module}'
                imported_module = importlib.import_module(path)
                protos.append(imported_module)
    return protos
  

Однако для моей переменной ‘proto.protos’ всегда установлено значение None (значение test равно None, я уверен, что проблема на первом шаге)

Я попытался вызвать эту строку из файла на том же уровне, что и мой конвейер, и это работает:

 test = proto.get_proto_file('data/build')
  

Итак, я предполагаю, что это связано с тем, что мои файлы не находятся в потоке данных, как в моем проекте..
Есть идеи, как это сделать?

спасибо 🙂

Ответ №1:

Это распространенная проблема.

  • Во-первых, вы должны понимать, как работает beam. Когда вы готовите свой конвейер, вы находитесь на главном сервере, где находится весь ваш код, все ваши файлы. Конвейер построен (да, ваш конвейер скомпилирован / переведен на Java по соображениям эффективности, Python работает слишком медленно (если новый бегун уже развернут, ваш код скомпилирован на C , но в любом случае Python исчезает во время выполнения) и отправьте его на рабочие серверы с опцией конвейера.

  • Тогда вы должны понять проблему: отправляется скомпилированный конвейер и параметры, а не ваши файлы!

Как это решить?

Поскольку опция конвейера отправляется с вашим скомпилированным конвейером, загрузите ваш файл на главный сервер (перед запуском конвейера) и сохраните содержимое в опции конвейера.

Прочитайте его из опции в вашем преобразовании

Комментарии:

1. Мне очень жаль, но я не понимаю, что вы подразумеваете под «загрузкой вашего файла на главном сервере», я попытался создать папку tmp, не работает, попробуйте загрузить модуль, тоже не работает..

2. Хорошо, это было непонятно. Итак, с помощью этих строк вы можете прочитать содержимое вашего файла, test = proto.get_proto_file('data/build') proto.get_obj_from_file(test) верно? Если это так, сразу после этих строк сохраните содержимое файла (прототип) в объект PipelineOption . Затем в вашем конвейере получите содержимое из PipelineOption. (Я знаю, как расширить тип PipelineOptions в Java, а не в Python, но принцип тот же)

3. О, путаница от меня, с моими методами я просто получаю модуль и классы в нем. Поскольку мне не удается делать то, что я хочу, я думаю, я попытаюсь загрузить свой файл в корзину