Воздушный поток — Исправление функции во время инициализации

#python-3.x #airflow #python-dataclasses

Вопрос:

У меня есть проект воздушного потока с запущенными dag.

В моем коде Python у меня есть класс данных, который включает в себя defaultdict. когда я пытаюсь использовать метод asdict, я получаю исключение(сведения об исключении включены в описание ошибки ниже).


Похоже, что это открытая ошибка: https://bugs.python.org/issue35540.

В качестве решения я написал функцию исправления, которая заменяет функцию asdict. Я пытаюсь найти место, где я могу зацепить это изменение во время инициализации воздушного потока(до того, как мои dag запустятся).:

 import copy
from collections import defaultdict
from dataclasses import _is_dataclass_instance, fields, asdict


def my_asdict(obj, dict_factory=dict):
    if _is_dataclass_instance(obj):
        result = []
        for f in fields(obj):
            value = my_asdict(getattr(obj, f.name), dict_factory)
            result.append((f.name, value))
        return dict_factory(result)
    elif isinstance(obj, (list, tuple)):
        return type(obj)(my_asdict(v, dict_factory) for v in obj)
    elif isinstance(obj, defaultdict):
        # This is the patch          
        obj = dict(obj)
    if isinstance(obj, dict):
        return type(obj)((my_asdict(k, dict_factory), my_asdict(v, dict_factory))
                         for k, v in obj.items())
    else:
        return copy.deepcopy(obj)


asdict = my_asdict
 

Есть ли способ сделать это?

Комментарии:

1. Этот вопрос упускает контекст того, в чем у вас проблема с воздушным потоком? Если вы исправили функцию, то в чем проблема? Пожалуйста, поделитесь минимальным примером воспроизведения вашей DAG и тем, что именно не работает/не работает.

2. @Elad — проблема описана в ссылке на ошибку python. Это НЕ проблема воздушного потока. Это проблема класса данных.

3. @balderman Я в курсе, но Оп упомянул, что проблема в том, как реализовать код, созданный для устранения ошибки с помощью DAG в Airflow. Вот почему я прошу посмотреть код DAG Airflow, так как вопрос trying to find a place where I can hook this change during airflow initializtion(before my dags will run) в том, что мне пока неясно, зачем это нужно, поэтому я попросил посмотреть код, который объединяет DAG с патчем, представленным в вопросе, и какую обратную связь он создает в Airflow. Воздушный поток-это оркестровка. если у операции есть обходной путь, она должна легко работать с воздушным потоком.

4. Насколько я понимаю, Op использует dataclass и заинтересован в исправлении «asdict» из-за упомянутой ошибки. Операционная система ищет место, где патч может быть применен во время ввода воздушного потока. Вам интересно увидеть фактическое исключение, вызванное ошибкой? Пожалуйста, объясните, чего не хватает.

5. @balderman Ах, теперь понял. Я думал, что есть проблема с конкретным процессом, который пытается использовать обходной путь, но, похоже, проблема в том, что операционная система хочет, чтобы все способы использования функции с ошибками были перенаправлены на измененную — если это действительно так, это не то, что нужно решать на уровне воздушного потока.