Apache Airflow зависает в initdb из-за подключения кода DAG к Airflow DB

#airflow

#поток воздуха

Вопрос:

Он успешно создает таблицы с airflow initdb , но зависает на шаге:

 INFO  [alembic.runtime.migration] Running upgrade 08364691d074 -> fe461863935f, increase_length_for_connection_password
  

Обновление перегонного кубика зависает, потому что у меня в моей базе данных есть этот код

     session = settings.Session()
    conns: Iterable[Connection] = (
       session.query(Connection.conn_id)
       .filter(and_(
         Connection.conn_id.ilike(f'{CONN_PREFIX}%'),
         Connection.conn_type == CONN_TYPE,
       ))
       .all()
    return [conn.conn_id for conn in conns]
  

Я использую его для создания задач «на лету» на основе соединений Airflow со специальным префиксом.

Но Airflow запускает код DAG внутри initdb команды. Таким образом, моя таблица блокировки кода connection и сценарий обновления перегонного кубика не могут ее изменить и зависнуть. Взаимоблокировка.

Насколько я понимаю, мне нужно каким-то образом снять блокировку в моем коде DAG. Повторно открыть транзакцию? Как это сделать?

Ответ №1:

Хорошо, когда я наконец понял проблему, решение довольно простое

     from typing import Iterable
    from airflow import settings
    from airflow.models import Connection
    from sqlalchemy import and_

    CONN_TYPE = 'fs'
    CONN_PREFIX = 'my_special_conn_'

    session = settings.Session()
    try:
        conns: Iterable[Connection] = (
            session.query(Connection.conn_id)
            .filter(and_(
                Connection.conn_id.ilike(f'{CONN_PREFIX}%'),
                Connection.conn_type == CONN_TYPE,
            ))
            .all()
        )
        conn_ids = [conn.conn_id for conn in conns]
    finally:
        session.commit()