#prefect
#префект
Вопрос:
Я пытаюсь найти способ обновить параметры begin, используемые для каждой итерации потока расписания.
Например, скажем, у меня есть поток, который планируется запускать один раз каждый понедельник в течение года. В первый понедельник поток должен выполняться с параметром, скажем, 5. Следующий понедельник должен быть запущен с параметром 7 и т.д. Параметр, необходимый для выполнения каждой недели, будет меняться на постоянное число.
Судя по документам, похоже, что я мог бы создать часы с соответствующим параметром для каждого запуска, но это кажется чрезмерным для потоков, которые запланированы для многих запусков.
Есть ли более простой способ сделать это в префекте?
Комментарии:
1. Добавление некоторого кода к вопросу увеличило бы шансы получить ответ 🙂
Ответ №1:
Похоже, вы хотите инкапсулировать некоторую форму бизнес-логики в свои Parameter
вычисления, что является отличным вариантом использования для добавления нового Task
в ваш поток:
import prefect
from prefect import task, Flow, Parameter
varying_param = Parameter("param", default=None) # none as the default
@task(name="Varying Parameter")
def param_calculation(p):
time = prefect.context.scheduled_start_time
# if a value was provided, use it
if p is not None:
return p
# do some calculations to decide what value is appropriate
# and return it
with Flow("Minimal example") as flow:
param_value = param_calculation(varying_param)
# now use this value in downstream tasks