Есть ли способ обновлять параметры при каждом запуске запланированного потока?

#prefect

#префект

Вопрос:

Я пытаюсь найти способ обновить параметры begin, используемые для каждой итерации потока расписания.

Например, скажем, у меня есть поток, который планируется запускать один раз каждый понедельник в течение года. В первый понедельник поток должен выполняться с параметром, скажем, 5. Следующий понедельник должен быть запущен с параметром 7 и т.д. Параметр, необходимый для выполнения каждой недели, будет меняться на постоянное число.

Судя по документам, похоже, что я мог бы создать часы с соответствующим параметром для каждого запуска, но это кажется чрезмерным для потоков, которые запланированы для многих запусков.

Есть ли более простой способ сделать это в префекте?

Комментарии:

1. Добавление некоторого кода к вопросу увеличило бы шансы получить ответ 🙂

Ответ №1:

Похоже, вы хотите инкапсулировать некоторую форму бизнес-логики в свои Parameter вычисления, что является отличным вариантом использования для добавления нового Task в ваш поток:

 import prefect
from prefect import task, Flow, Parameter


varying_param = Parameter("param", default=None) # none as the default

@task(name="Varying Parameter")
def param_calculation(p):
    time = prefect.context.scheduled_start_time 

    # if a value was provided, use it
    if p is not None:
        return p
    
    # do some calculations to decide what value is appropriate
    # and return it


with Flow("Minimal example") as flow:
    param_value = param_calculation(varying_param)
    # now use this value in downstream tasks