#python #google-cloud-platform #google-cloud-dataflow #apache-beam
#python #google-облачная платформа #google-облако-поток данных #apache-beam
Вопрос:
Я использую конвейер потока данных, и я хотел бы иметь доступ к файлам на нем.
это строка, в которой я вызываю метод для получения файлов и извлечения модулей:
def run():
""" Run pipeline"""
options: PipelineOptions = PipelineOptions(
project='production-213911',
runner='DataflowRunner',
region='europe-west1',
streaming=True,
setup_file='dataflow/setup.py',
autoscaling_algorith='THROUGHPUT_BASED',
)
proto = Container().protobuf()
test = proto.get_proto_file('data/build')
proto.get_obj_from_file(test)
with beam.Pipeline(options=options) as pipeline:
...
это этап конвейера, на котором я хотел бы использовать список моего модуля:
status_records = (status | 'Proto to Dict' >> beam.Map(
lambda x: convert_proto_to_dict(x, proto.protos)))
и вот код, который просматривает каталог для получения файлов:
@staticmethod
def get_proto_file(dirname: str = 'python_protocol') -> List[object]:
"""Iterate threw dir w. build .proto
:param
dirname = name of the directory to browse
:return
List[object] with module
"""
protos: List[object] = []
for root, _, mod in os.walk(dirname):
for name in mod:
if 'pb2' in name and 'pyc' not in name:
print(name)
module = name[:-3]
if '/' in root:
dirname = root.replace('/', '.')
path = f'{dirname}.{module}'
imported_module = importlib.import_module(path)
protos.append(imported_module)
return protos
Однако для моей переменной ‘proto.protos’ всегда установлено значение None (значение test равно None, я уверен, что проблема на первом шаге)
Я попытался вызвать эту строку из файла на том же уровне, что и мой конвейер, и это работает:
test = proto.get_proto_file('data/build')
Итак, я предполагаю, что это связано с тем, что мои файлы не находятся в потоке данных, как в моем проекте..
Есть идеи, как это сделать?
спасибо 🙂
Ответ №1:
Это распространенная проблема.
-
Во-первых, вы должны понимать, как работает beam. Когда вы готовите свой конвейер, вы находитесь на главном сервере, где находится весь ваш код, все ваши файлы. Конвейер построен (да, ваш конвейер скомпилирован / переведен на Java по соображениям эффективности, Python работает слишком медленно (если новый бегун уже развернут, ваш код скомпилирован на C , но в любом случае Python исчезает во время выполнения) и отправьте его на рабочие серверы с опцией конвейера.
-
Тогда вы должны понять проблему: отправляется скомпилированный конвейер и параметры, а не ваши файлы!
Как это решить?
Поскольку опция конвейера отправляется с вашим скомпилированным конвейером, загрузите ваш файл на главный сервер (перед запуском конвейера) и сохраните содержимое в опции конвейера.
Прочитайте его из опции в вашем преобразовании
Комментарии:
1. Мне очень жаль, но я не понимаю, что вы подразумеваете под «загрузкой вашего файла на главном сервере», я попытался создать папку tmp, не работает, попробуйте загрузить модуль, тоже не работает..
2. Хорошо, это было непонятно. Итак, с помощью этих строк вы можете прочитать содержимое вашего файла,
test = proto.get_proto_file('data/build') proto.get_obj_from_file(test)
верно? Если это так, сразу после этих строк сохраните содержимое файла (прототип) в объект PipelineOption . Затем в вашем конвейере получите содержимое из PipelineOption. (Я знаю, как расширить тип PipelineOptions в Java, а не в Python, но принцип тот же)3. О, путаница от меня, с моими методами я просто получаю модуль и классы в нем. Поскольку мне не удается делать то, что я хочу, я думаю, я попытаюсь загрузить свой файл в корзину