#airflow
#поток воздуха
Вопрос:
Он успешно создает таблицы с airflow initdb
, но зависает на шаге:
INFO [alembic.runtime.migration] Running upgrade 08364691d074 -> fe461863935f, increase_length_for_connection_password
Обновление перегонного кубика зависает, потому что у меня в моей базе данных есть этот код
session = settings.Session()
conns: Iterable[Connection] = (
session.query(Connection.conn_id)
.filter(and_(
Connection.conn_id.ilike(f'{CONN_PREFIX}%'),
Connection.conn_type == CONN_TYPE,
))
.all()
return [conn.conn_id for conn in conns]
Я использую его для создания задач «на лету» на основе соединений Airflow со специальным префиксом.
Но Airflow запускает код DAG внутри initdb
команды. Таким образом, моя таблица блокировки кода connection
и сценарий обновления перегонного кубика не могут ее изменить и зависнуть. Взаимоблокировка.
Насколько я понимаю, мне нужно каким-то образом снять блокировку в моем коде DAG. Повторно открыть транзакцию? Как это сделать?
Ответ №1:
Хорошо, когда я наконец понял проблему, решение довольно простое
from typing import Iterable
from airflow import settings
from airflow.models import Connection
from sqlalchemy import and_
CONN_TYPE = 'fs'
CONN_PREFIX = 'my_special_conn_'
session = settings.Session()
try:
conns: Iterable[Connection] = (
session.query(Connection.conn_id)
.filter(and_(
Connection.conn_id.ilike(f'{CONN_PREFIX}%'),
Connection.conn_type == CONN_TYPE,
))
.all()
)
conn_ids = [conn.conn_id for conn in conns]
finally:
session.commit()