#python #etl #prefect
#python #etl #префект
Вопрос:
Существует ли механизм автоматической регистрации потоков / новых потоков, если запущен локальный агент, без необходимости ручного запуска, например flow.register(...)
, для каждого из них?
Я полагаю, что в airflow у них есть процесс, который регулярно сканирует любые файлы с dag
именем в указанной домашней папке airflow, а затем ищет в них объекты DAG. И если он находит их, он загружает их, чтобы они были доступны через пользовательский интерфейс без необходимости вручную «регистрировать» их.
Существует ли что-то подобное для префекта. Так, например, если я только что создал следующий файл test_flow.py , не обязательно запуская его или добавляя flow.run_agent()
, есть ли способ, чтобы он был просто волшебным образом зарегистрирован и доступен через пользовательский интерфейс 🙂 — просто потому, что он просто существует в нужном месте?
# prefect_home_folder/test_flow.py
import prefect
from prefect import task, Flow
@task
def hello_task():
logger = prefect.context.get("logger")
logger.info("Hello, Cloud!")
flow = Flow("hello-flow", tasks=[hello_task])
flow.register(project_name='main')
Я мог бы написать скрипт, поведение которого аналогично поведению процесса airflow, для сканирования папки и регистрации потоков через регулярные промежутки времени, но мне интересно, не слишком ли это банально или есть лучшее решение, и я просто слишком много думаю о airflow?
Ответ №1:
Отличный вопрос (и потрясающее имя пользователя!) — короче говоря, я полагаю, вы слишком много думаете о потоке воздуха. Есть несколько причин, по которым это в настоящее время недоступно в префекте:
- явное лучше, чем неявное
- Потоки префектов не ограничены проживанием в одном месте и не ограничены одинаковыми средами выполнения; это усложняет как автоматическое обнаружение потока, так и его повторную сериализацию из одного процесса агента (который не обязан использовать ту же среду выполнения, что и потоки, которые он отправляет)
- агенты лучше рассматривать как параметризуемые инфраструктурой развертывания, а не хранилищем потоков
В идеале для производственных рабочих процессов вы должны использовать процесс CI / CD, чтобы при каждом изменении кода запускалось автоматическое задание, которое повторно регистрирует поток. Несколько комментариев, которые могут быть полезны:
- на самом деле вам не нужно повторно регистрировать поток для каждого возможного изменения кода; например, если вы изменили сообщение о том, что ваши
hello_task
журналы в вашем примере, вы можете просто повторно сохранить поток в исходное местоположение (как это выглядит, зависит от типа используемого вами хранилища). В конечном счете, вам нужно только повторно зарегистрироваться, если какие-либо метаданные о вашем потоке изменятся (настройки повторных попыток, имена задач, отношения зависимостей и т.д.). - вы можете использовать
flow.register("My Project", idempotency_key=flow.serialized_hash())
для автоматического захвата этого; этот шаблон будет регистрировать новую версию только в том случае, если внутреннее представление потока каким-либо образом изменится
Комментарии:
1. Большое спасибо за комментарий, это очень полезно. Я помню, что читал об этой
idempotency_key=flow.serialized_hash()
опции в документах, но теперь это имеет гораздо больше смысла (понимание регистрации потока как хранения метаданных).