Автоматически регистрировать новые потоки префектов?

#python #etl #prefect

#python #etl #префект

Вопрос:

Существует ли механизм автоматической регистрации потоков / новых потоков, если запущен локальный агент, без необходимости ручного запуска, например flow.register(...) , для каждого из них?

Я полагаю, что в airflow у них есть процесс, который регулярно сканирует любые файлы с dag именем в указанной домашней папке airflow, а затем ищет в них объекты DAG. И если он находит их, он загружает их, чтобы они были доступны через пользовательский интерфейс без необходимости вручную «регистрировать» их.

Существует ли что-то подобное для префекта. Так, например, если я только что создал следующий файл test_flow.py , не обязательно запуская его или добавляя flow.run_agent() , есть ли способ, чтобы он был просто волшебным образом зарегистрирован и доступен через пользовательский интерфейс 🙂 — просто потому, что он просто существует в нужном месте?

 # prefect_home_folder/test_flow.py
import prefect
from prefect import task, Flow

@task
def hello_task():
    logger = prefect.context.get("logger")
    logger.info("Hello, Cloud!")

flow = Flow("hello-flow", tasks=[hello_task])

flow.register(project_name='main')
 

Я мог бы написать скрипт, поведение которого аналогично поведению процесса airflow, для сканирования папки и регистрации потоков через регулярные промежутки времени, но мне интересно, не слишком ли это банально или есть лучшее решение, и я просто слишком много думаю о airflow?

Ответ №1:

Отличный вопрос (и потрясающее имя пользователя!) — короче говоря, я полагаю, вы слишком много думаете о потоке воздуха. Есть несколько причин, по которым это в настоящее время недоступно в префекте:

  • явное лучше, чем неявное
  • Потоки префектов не ограничены проживанием в одном месте и не ограничены одинаковыми средами выполнения; это усложняет как автоматическое обнаружение потока, так и его повторную сериализацию из одного процесса агента (который не обязан использовать ту же среду выполнения, что и потоки, которые он отправляет)
  • агенты лучше рассматривать как параметризуемые инфраструктурой развертывания, а не хранилищем потоков

В идеале для производственных рабочих процессов вы должны использовать процесс CI / CD, чтобы при каждом изменении кода запускалось автоматическое задание, которое повторно регистрирует поток. Несколько комментариев, которые могут быть полезны:

  • на самом деле вам не нужно повторно регистрировать поток для каждого возможного изменения кода; например, если вы изменили сообщение о том, что ваши hello_task журналы в вашем примере, вы можете просто повторно сохранить поток в исходное местоположение (как это выглядит, зависит от типа используемого вами хранилища). В конечном счете, вам нужно только повторно зарегистрироваться, если какие-либо метаданные о вашем потоке изменятся (настройки повторных попыток, имена задач, отношения зависимостей и т.д.).
  • вы можете использовать flow.register("My Project", idempotency_key=flow.serialized_hash()) для автоматического захвата этого; этот шаблон будет регистрировать новую версию только в том случае, если внутреннее представление потока каким-либо образом изменится

Комментарии:

1. Большое спасибо за комментарий, это очень полезно. Я помню, что читал об этой idempotency_key=flow.serialized_hash() опции в документах, но теперь это имеет гораздо больше смысла (понимание регистрации потока как хранения метаданных).