#python-3.x #airflow
Вопрос:
Я хочу, чтобы мой DAG вышел из строя (во время работы), если значение cursor.rowcount > 0 равно true. Это и есть код:
def execute(self):
self.hook = PostgresHook(postgres_conn_id=self.dest_redshift_conn_id)
conn = self.hook.get_conn()
cursor = conn.cursor()
log.info("Connected with " self.dest_redshift_conn_id)
log.info('Starting Query')
print('Starting Query n')
cursor.execute("begin transaction;")
sql_comm = self.read_queries_file()
for command in sql_comm:
sql_statement = command
if sql_statement:
log.info(sql_statement)
log.info('n')
cursor.execute(sql_statement)
res = cursor.fetchmany(10)
if cursor.rowcount > 0:
raise Exception(res)
else:
log.info('nNo result found in the query')
cursor.execute(" end transaction;")
cursor.close()
log.info("Select process is completed".format(self.dest_table))
Я подтолкнул его к потоку воздуха и получил ошибку «Сломанный DAG» после перезапуска служб воздушного потока без запуска DAG. Экран печати с ошибкой
Как я могу вывести из строя DAG внутри моего кода и чтобы воздушный поток не нарушался? Уже пытался изменить тип исключения, но это не помогло.
Ответ №1:
Метод execute должен получить аргумент контекста. Попробуйте переписать свой код с помощью метода, известного как
def execute(self, context):
...
Комментарии:
1. Привет, я добавил в выполнение «контекст», и теперь я получаю ошибку от DAG, которая вызывает оператор «выполнить() отсутствует 1 требуемый позиционный аргумент: «контекст». Как называется эта функция «выполнить»? кто это называет и как? где я должен добавить «контекст» в DAG после добавления его в оператор?
2. Хорошо, сначала нам нужно узнать, находится ли ошибка внутри пользовательского оператора или где-то еще. Попробуйте изменить оператор для BashOperator или PythonOperator и посмотрите, исчезнет ли ошибка нарушенной группы доступности баз данных. Если это так, то мы уверены, что ошибка находится в вашем пользовательском операторе. Возможно, вам потребуется проверить все определение оператора. Если бы вы могли опубликовать всю декларацию класса, это было бы полезно
3. Чтобы ответить на ваш вопрос, метод execute запускается, когда DAG передается работнику, и работник начинает выполнять задание. Это полностью контролируется воздушным потоком для вас
Ответ №2:
Я наконец-то сменил def execute(self, context)
def execute(self, **context)
«на», и теперь это работает