Воздушный поток: можно ли повторно использовать и выполнять один и тот же экземпляр operator много раз, поддерживая состояние между запусками?

#python #airflow

#python #воздушный поток

Вопрос:

Не могли бы вы объяснить, возможно ли при определенных обстоятельствах, что экземпляр operator можно использовать повторно, и execute() метод будет выполняться много раз, а состояние сохраняется между execute() запусками?

Другими словами, возможен ли этот сценарий в Airflow:

  1. переменная self в Operator инициализируется в init .
  2. метод execute() считывает переменную self и изменяет ее.
  3. execute() выполняется еще раз на том же экземпляре operator, например, из-за перезапуска или чего-то еще, и может считывать переменную self, измененную предыдущим запуском execute?
     class MyOperator(BaseOperator):
    
      def __init__(self,
                 param_1
                 ...
                 param_n):
    
          self.var1=param_1
    
      def execute(self, context):
          #do some logic with self variable
          self.var1   = 1 #
     

Комментарии:

1. Вы не можете получить доступ к предыдущим запускам таким образом. Можете ли вы объяснить саму проблему? Вы описали здесь свой подход к решению, однако могут быть и другие идеи, если вы можете поделиться самой проблемой.

2. Я просто хочу знать, безопасно ли изменять собственные переменные в методе execute и иметь на его основе некоторую логику, или экземпляр можно повторно использовать в airflow, тогда это небезопасно, и я не должен их трогать и использовать локальные переменные, полученные из переменных экземпляра

3. @Elad Я не пытаюсь получить доступ к явному состоянию экземпляра таким образом. Я просто хочу получить подтверждение того, что такой сценарий вообще невозможен.

Ответ №1:

Описанный вами сценарий невозможен по следующей причине.

Когда планировщик воздушного потока отправляет ваш экземпляр задачи в очередь, задача инициализируется при каждом ударе сердца в работнике.

Это связано с тем, что при каждом заполнении DagBag инициализируются экземпляры operator.

Любое значение, сохраненное там между запусками, сбрасывается при повторной инициализации.

Если вам нужно сохранять значения между запусками, вы можете использовать Variable модель для хранения таких значений.

 from airflow.models import Variable

def execute(self, context):
    #do some logic with self variable 
    var1 = Variable.get(
            "count", 
            deserialize_json=True,
            default_var=0
        )
    var1  = 1    
    Variable.set("count", var1)