#airflow #airflow-scheduler
#воздушный поток #airflow-scheduler
Вопрос:
У меня есть 2 файла внутри каталога dag — dag_1.py и dag_2.py
dag_1.py создает статическую базу данных и dag_2.py создает динамические базы данных на основе внешних файлов json в некотором расположении.
Статическая база данных (созданная dag_1.py ) содержит задачу на более позднем этапе, которая генерирует некоторые из этих входных файлов json для dag_2.py и динамические базы данных создаются таким образом.
Раньше это нормально работало с версиями Airflow 1.x, где сериализация DAG не использовалась. Но с Airflow 2.0 сериализация DAG стала обязательной. Иногда я получаю следующее исключение в планировщике при появлении динамических баз данных —
[2021-01-02 06:17:39,493] {scheduler_job.py:1293} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1275, in _execute
self._run_scheduler_loop()
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1377, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1474, in _do_scheduling
self._create_dag_runs(query.all(), session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1557, in _create_dag_runs
dag = self.dagbag.get_dag(dag_model.dag_id, session=session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/utils/session.py", line 62, in wrapper
return func(*args, **kwargs)
File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 171, in get_dag
self._add_dag_from_db(dag_id=dag_id, session=session)
File "/global/packages/python/lib/python3.7/site-packages/airflow/models/dagbag.py", line 227, in _add_dag_from_db
raise SerializedDagNotFound(f"DAG '{dag_id}' not found in serialized_dag table")
airflow.exceptions.SerializedDagNotFound: DAG 'dynamic_dag_1' not found in serialized_dag table
После этого планировщик завершается, что и ожидалось.
Когда я проверяю таблицу вручную после этой ошибки, я могу увидеть в ней запись DAG.
Эта проблема не всегда воспроизводима. Что может быть вероятной причиной этого? Есть ли какая-либо конфигурация воздушного потока, которую я должен попробовать настроить?
Комментарии:
1. Трудно сказать, не видя твоего ДАГА
2. Действительно ли структура DAG здесь имеет значение? При перезапуске планировщика я могу запустить тот же динамически создаваемый DAG. Я хотел знать, имеет ли это какое-то отношение к параллелизму между синтаксическим анализом DAG и планированием. Поскольку я вижу запись DAG в таблице serialized_dag вручную, возможно ли, что эта запись была добавлена позже DagFileProcessor в таблицу после того, как планировщику не удалось найти в ней запись DAG?
3. В качестве быстрого исправления вы можете запустить airflow DB init еще раз, я думаю, это исправит состояние БД, и планировщик начнет работать.
Ответ №1:
У нас была та же проблема после обновления в следующем порядке:
- 1.10.12 -> 1.10.14
- 1.10.14 -> 2.0.0
Я следовал их руководству, и у нас не было проблем, пока в какой-то случайный момент через несколько часов планировщик не начал сбой, жалуясь на то, что случайные базы данных не найдены в базе данных.
Наша процедура развертывания включает /opt/airflow/dags
в себя очистку папки и выполнение чистой установки каждый раз (мы храним базы данных и вспомогательный код в пакетах python)
Поэтому время от времени в версии 1.10.x у нас были случаи, когда планировщик анализировал пустую папку и удалял сериализованные базы данных из базы данных, но он всегда мог восстановить изображение при следующем анализе
По-видимому, в 2.0, в рамках усилий по созданию scheduler HA, они полностью разделили процессор DAG и планировщик. Что приводит к состоянию гонки:
- если задание планировщика попадает в базу данных до того, как обработчик базы данных обновил значения таблицы serialized_dag, оно ничего не находит и вылетает
- если удача на вашей стороне, вышеупомянутое не произойдет, и вы не увидите это исключение
Чтобы избавиться от этой проблемы, я отключил планирование всех баз данных путем обновления is_paused
в базе данных, перезапустил планировщик и, как только он сгенерировал сериализованные базы данных, снова включил все базы данных
Комментарии:
1. Да, это имеет смысл. Я создал новую проблему в их официальном репозитории github. Они подтвердили это со средним приоритетом и добавили его для этапа Airflow 2.0.1. Надеюсь, эта ошибка будет устранена.
2. @DivyanshJamuaar у вас есть ссылка на эту проблему?
3. @TheNumenorean, Да — github.com/apache/airflow/issues/13504
Ответ №2:
Я исправил эту проблему в https://github.com/apache/airflow/pull/13893 который будет выпущен как часть для Airflow 2.0.1.
Выпустит Airflow 2.0.1 на следующей неделе (8 февраля 2021 года — скорее всего).
Комментарии:
1. Спасибо, kaxil. Исправление, похоже, работает нормально.
2. все еще происходит для меня даже в более поздних версиях!!
3. Это происходит и для меня в 2.1.4 @AramisNSR вам удалось найти решение? Хотя моя проблема возникает время от времени
4. @walker_4 к сожалению, наш самоуверенный мистер, Kaxil закрыл эту проблему везде и никогда никому не позволял ее комментировать, поэтому я помню только, что хотел прокомментировать свой ответ на github, связанный с этой проблемой, но он закрыл его до того, как я ответил, к сожалению, теперь я забыл..
5. Вы все равно можете опубликовать ответ на PR Aramis. Причина, по которой проблема закрыта, заключается в том, что PR, предназначенный для ее исправления, сработал в большинстве случаев. Ошибка «СЕРИАЛИЗОВАННЫЙ DAG не найден» — это очень общая ошибка, которая может возникнуть по многим причинам, в первую очередь из-за того, как создается DAG.
Ответ №3:
Недостаточно представителя для комментариев, поэтому приходится оставлять ответ, но:
- это чистая установка 2.0 или обновление вашего старого экземпляра 1.10.x? и
- вы перерабатываете имена?
Я буквально только что столкнулся с этой проблемой (я нашел этот вопрос в Google, чтобы узнать, кто еще был в той же лодке).
В моем случае это обновленная существующая установка 1.10.x, и хотя базы данных создавались динамически, имена были переработаны. Я получал ошибки, нажимая на dag в графическом интерфейсе, и это убивало планировщик.
Оказывается (TM), полное удаление баз данных с помощью кнопки «корзина» в обзоре графического интерфейса и разрешение их регенерации исправили это (например, проблема сразу исчезла и не повторялась за последние 30 минут).).
Мне кажется, что, возможно, какой-то аспект динамических баз данных не был должным образом перенесен на этом этапе, и их удаление и полное восстановление устранили проблему. db upgrade
Очевидно, что вы теряете всю свою историю и т. Д., Но (по крайней мере, в моем случае) Это не обязательно имеет большое значение.
Комментарии:
1. У меня чистая установка 2.0. Но я все еще сталкиваюсь с этой проблемой.
2. @argibbs эта проблема вернулась к вам / вы нашли способ решить? Это происходит только в одном из наших пакетов, который динамически запускает другие базы данных, а не другие
Ответ №4:
Выбранный ответ не сработал для меня (после того, как я несколько часов ломал голову). Вот что работает:
Просто перейдите в серверную базу данных (postgresql) и удалите все записи, касающиеся, logs, task instances, faild task and ...
но не удаляйте основные таблицы (если вы не можете указать разницу, просто удалите таблицы, о которых я упоминал)
затем перейдите и выполните airflow db init
похоже, что старые данные об устаревших и удаленных базах данных и задачах действительно могут превратить airflow в беспорядок. удалите беспорядок, заставьте его работать.