#python #unit-testing #airflow
#python #модульное тестирование #воздушный поток
Вопрос:
У меня есть модуль Python, который структурирован следующим образом:
my_module/
...
tests/
__init__.py
my_test.py
...
где my_test.py
определяется следующим образом:
from __future__ import print_function, unicode_literals
import os
import unittest
from datetime import timedelta, date
from airflow import configuration
from airflow.models import TaskInstance as TI, DAG, DagRun
from airflow.operators.python_operator import PythonOperator
from airflow.settings import Session
from airflow.utils import timezone
from airflow.utils.state import State
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
END_DATE = timezone.datetime(2016, 1, 2)
INTERVAL = timedelta(hours=12)
FROZEN_NOW = timezone.datetime(2016, 1, 2, 12, 1, 1)
TI_CONTEXT_ENV_VARS = ['AIRFLOW_CTX_DAG_ID',
'AIRFLOW_CTX_TASK_ID',
'AIRFLOW_CTX_EXECUTION_DATE',
'AIRFLOW_CTX_DAG_RUN_ID']
class Call:
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def build_recording_function(calls_collection):
"""
We can not use a Mock instance as a PythonOperator callable function or some tests fail with a
TypeError: Object of type Mock is not JSON serializable
Then using this custom function recording custom Call objects for further testing
(replacing Mock.assert_called_with assertion method)
"""
def recording_function(*args, **kwargs):
calls_collection.append(Call(*args, **kwargs))
return recording_function
class PythonOperatorTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
super(PythonOperatorTest, cls).setUpClass()
session = Session()
session.query(DagRun).delete()
session.query(TI).delete()
session.commit()
session.close()
def setUp(self):
super(PythonOperatorTest, self).setUp()
configuration.load_test_config()
self.dag = DAG(
'test_dag',
default_args={
'owner': 'airflow',
'start_date': DEFAULT_DATE},
schedule_interval=INTERVAL)
self.addCleanup(self.dag.clear)
self.clear_run()
self.addCleanup(self.clear_run)
def tearDown(self):
super(PythonOperatorTest, self).tearDown()
session = Session()
session.query(DagRun).delete()
session.query(TI).delete()
print(len(session.query(DagRun).all()))
session.commit()
session.close()
for var in TI_CONTEXT_ENV_VARS:
if var in os.environ:
del os.environ[var]
def do_run(self):
self.run = True
def clear_run(self):
self.run = False
def is_run(self):
return self.run
def _assertCallsEqual(self, first, second):
self.assertIsInstance(first, Call)
self.assertIsInstance(second, Call)
self.assertTupleEqual(first.args, second.args)
self.assertDictEqual(first.kwargs, second.kwargs)
def test_python_callable_arguments_are_templatized(self):
"""Test PythonOperator op_args are templatized"""
recorded_calls = []
task = PythonOperator(
task_id='python_operator',
# a Mock instance cannot be used as a callable function or test fails with a
# TypeError: Object of type Mock is not JSON serializable
python_callable=(build_recording_function(recorded_calls)),
op_args=[
4,
date(2019, 1, 1),
"dag {{dag.dag_id}} ran on {{ds}}."
],
dag=self.dag)
self.dag.create_dagrun(
run_id='manual__' DEFAULT_DATE.isoformat(),
execution_date=DEFAULT_DATE,
start_date=DEFAULT_DATE,
state=State.RUNNING
)
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
self.assertEqual(1, len(recorded_calls))
self._assertCallsEqual(
recorded_calls[0],
Call(4,
date(2019, 1, 1),
"dag {} ran on {}.".format(self.dag.dag_id, DEFAULT_DATE.date().isoformat()))
)
В терминале, когда я запускаю nosetests test/my_test.py
, тест завершается неудачей, потому что шаблоны Jinja отображаются некорректно. Полный журнал приведен ниже.
======================================================================
FAIL: Test PythonOperator op_args are templatized
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/user/my_module/tests/my_test.py", line 120, in test_python_callable_arguments_are_templatized
"dag {} ran on {}.".format(self.dag.dag_id, DEFAULT_DATE.date().isoformat()))
File "/home/user/my_module/tests/my_test.py", line 88, in _assertCallsEqual
self.assertTupleEqual(first.args, second.args)
AssertionError: Tuples differ: (4, datetime.date(2019, 1, 1), 'dag {{dag.dag_id}} ran on {{ds}}.') != (4, datetime.date(2019, 1, 1), 'dag test_dag ran on 2016-01-01.')
First differing element 2:
'dag {{dag.dag_id}} ran on {{ds}}.'
'dag test_dag ran on 2016-01-01.'
- (4, datetime.date(2019, 1, 1), 'dag {{dag.dag_id}} ran on {{ds}}.')
? ^^ --------- ^^^^^^
(4, datetime.date(2019, 1, 1), 'dag test_dag ran on 2016-01-01.')
? ^^^^^ ^^^^^^^^^^
-------------------- >> begin captured logging << --------------------
airflow.utils.log.logging_mixin.LoggingMixin: INFO: Reading the config from /home/user/airflow/airflow.cfg
airflow.settings: INFO: Configured default timezone <Timezone [UTC]>
airflow.logging_config: DEBUG: Unable to load custom logging, using default config instead
--------------------- >> end captured logging << ---------------------
Однако в коде нет ничего плохого, my_test.py
поскольку это всего лишь подмножество файла test_python_operator.py файл из репозитория Airflow github ( v1-10-stable
ветка). Наивно, я ожидал бы, что этот тест будет выполняться просто отлично, но это не так.
Чего мне не хватает?
РЕДАКТИРОВАТЬ: я использую apache-airflow 1.10.2, Python 3.6.8 и nose 1.3.7.
Ответ №1:
Это потому, что поля 'op_args'
, 'op_kwargs'
не были шаблонизированными полями в Airflow 1.10.2 для PythonOperator
. Ссылка, указанная в вашем вопросе, относится к главной ветви репозитория Airflow.
'op_args'
'op_kwargs'
были добавлены после выхода Airflow 1.10.2.
Фиксация, которая включала эти поля в template_fields
(это все еще в master и не включено ни в одну версию выпуска): https://github.com/apache/airflow/commit/7ab245b296efc73db3ce4ce0edbae473e357698c
Для Airflow 1.10.2: проверьте этот файл — https://github.com/apache/airflow/blob/1.10.2/tests/operators/test_python_operator.py
Также не используйте v1-10-stable
ветку, поскольку она содержит код для предстоящего выпуска 1.10.3. Вместо этого вам следует использовать тег 1.10.2: https://github.com/apache/airflow/tree/1.10.2
PythonOperator (1.10.2): https://github.com/apache/airflow/blob/1.10.2/airflow/operators/python_operator.py#L65
class PythonOperator(BaseOperator):
template_fields = ('templates_dict',)
template_ext = tuple()
ui_color = '#ffefeb'
@apply_defaults
def __init__(
self,
python_callable,
op_args=None,
op_kwargs=None,
provide_context=False,
templates_dict=None,
templates_exts=None,
*args, **kwargs):
...
PythonOperator (ветка master — development):
https://github.com/apache/airflow/blob/master/airflow/operators/python_operator.py#L72
class PythonOperator(BaseOperator):
template_fields = ('templates_dict', 'op_args', 'op_kwargs')
ui_color = '#ffefeb'
# since we won't mutate the arguments, we should just do the shallow copy
# there are some cases we can't deepcopy the objects(e.g protobuf).
shallow_copy_attrs = ('python_callable', 'op_kwargs',)
@apply_defaults
def __init__(
self,
python_callable, # type: Callable
op_args=None, # type: Optional[Iterable]
op_kwargs=None, # type: Optional[Dict]
provide_context=False, # type: bool
templates_dict=None, # type: Optional[Dict]
templates_exts=None, # type: Optional[Iterable[str]]
*args,
**kwargs
):
Ответ №2:
Скорее всего, вы используете 1.10
или более ранние версии для запуска тестов. В этих версиях op_args
в PythonOperator
небыло шаблонов. Но в master
, тесты из которого вы, скорее всего, используете, op_args
являются шаблонными и соответствующим образом тестируются. Если вы действительно хотите использовать airflows test в качестве примера, вам следует взять их из любой ветки, соответствующей вашей установленной версии.
Комментарии:
1. Спасибо за ваш ответ. Я отредактировал свой пост. Я использую apache-airflow 1.10.2, а код для тестового файла взят из ветки v1-10-stable. Я думаю, что проблема где-то есть…
2. изменение было объединено с
v1-10-stable
27 марта. Я считаю, что это не является частью1.10.2
. Вы можете посмотреть на код вашего локальногоPythonOperator
и посмотреть, есть ли у вас:template_fields = ('templates_dict', 'op_args', 'op_kwargs')