#python-3.x #airflow #python-dataclasses
Вопрос:
У меня есть проект воздушного потока с запущенными dag.
В моем коде Python у меня есть класс данных, который включает в себя defaultdict. когда я пытаюсь использовать метод asdict, я получаю исключение(сведения об исключении включены в описание ошибки ниже).
Похоже, что это открытая ошибка: https://bugs.python.org/issue35540.
В качестве решения я написал функцию исправления, которая заменяет функцию asdict. Я пытаюсь найти место, где я могу зацепить это изменение во время инициализации воздушного потока(до того, как мои dag запустятся).:
import copy
from collections import defaultdict
from dataclasses import _is_dataclass_instance, fields, asdict
def my_asdict(obj, dict_factory=dict):
if _is_dataclass_instance(obj):
result = []
for f in fields(obj):
value = my_asdict(getattr(obj, f.name), dict_factory)
result.append((f.name, value))
return dict_factory(result)
elif isinstance(obj, (list, tuple)):
return type(obj)(my_asdict(v, dict_factory) for v in obj)
elif isinstance(obj, defaultdict):
# This is the patch
obj = dict(obj)
if isinstance(obj, dict):
return type(obj)((my_asdict(k, dict_factory), my_asdict(v, dict_factory))
for k, v in obj.items())
else:
return copy.deepcopy(obj)
asdict = my_asdict
Есть ли способ сделать это?
Комментарии:
1. Этот вопрос упускает контекст того, в чем у вас проблема с воздушным потоком? Если вы исправили функцию, то в чем проблема? Пожалуйста, поделитесь минимальным примером воспроизведения вашей DAG и тем, что именно не работает/не работает.
2. @Elad — проблема описана в ссылке на ошибку python. Это НЕ проблема воздушного потока. Это проблема класса данных.
3. @balderman Я в курсе, но Оп упомянул, что проблема в том, как реализовать код, созданный для устранения ошибки с помощью DAG в Airflow. Вот почему я прошу посмотреть код DAG Airflow, так как вопрос
trying to find a place where I can hook this change during airflow initializtion(before my dags will run)
в том, что мне пока неясно, зачем это нужно, поэтому я попросил посмотреть код, который объединяет DAG с патчем, представленным в вопросе, и какую обратную связь он создает в Airflow. Воздушный поток-это оркестровка. если у операции есть обходной путь, она должна легко работать с воздушным потоком.4. Насколько я понимаю, Op использует dataclass и заинтересован в исправлении «asdict» из-за упомянутой ошибки. Операционная система ищет место, где патч может быть применен во время ввода воздушного потока. Вам интересно увидеть фактическое исключение, вызванное ошибкой? Пожалуйста, объясните, чего не хватает.
5. @balderman Ах, теперь понял. Я думал, что есть проблема с конкретным процессом, который пытается использовать обходной путь, но, похоже, проблема в том, что операционная система хочет, чтобы все способы использования функции с ошибками были перенаправлены на измененную — если это действительно так, это не то, что нужно решать на уровне воздушного потока.