#python #airflow
Вопрос:
Привет, я установил airflow в VirtualBox, во время тестирования приведенного ниже кода с помощью команды airflow tasks test user_processing processing_user 2020-01-02
я получаю предупреждающее сообщение и ошибку TaskNotFound. (Последовал курсу Марка Ламберти). заранее спасибо.
Мой код: создал таблицу с помощью оператора sqlite, использовал оператор http для загрузки пользователя из API, затем использовал оператор Python для обработки информации
from airflow.models import DAG
from datetime import datetime
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
import json
from airflow.operators.python import PythonOperator
from pandas import json_normalize
default_args = {'start_date' : datetime(2021,10,11)}
def _processing_user(ti):
users = ti.xcom_pull(task_id=['extracting_user'])
if not len(users) or 'results' not in users[0]:
raise ValueError('User is Empty')
user = users[0]['results'][0]
processed_user = json_normalize({
'firstname':user['name']['first'],
'lastname':user['name']['last'],
'country':user['location']['country'],
'username':user['login']['username'],
'password':user['login']['password'],
'email': user['email']
})
processed_user.to_csv('/tmp/processed_user.csv', index=None, header=False)
with DAG('user_processing',schedule_interval='@daily',default_args=default_args,catchup=False) as dag:
creating_table = SqliteOperator(
task_id='creating_table',
sqlite_conn_id='db_sqlite',
sql='''
CREATE TABLE users (
firstname TEXT NOT NULL,
lastname TEXT NOT NULL,
country TEXT NOT NULL,
password TEXT NOT NULL,
email TEXT NOT NULL PRIMARY KEY
);
'''
)
is_api_available = HttpSensor(
task_id='is_api_available',
http_conn_id='user_api',
endpoint='api/')
extracting_user = SimpleHttpOperator(
task_id='extracting_user',
http_conn_id='user_api',
endpoint='api/',
method='GET',
response_filter=lambda response: json.loads(response.text),
log_response=True
)
processing_user = PythonOperator(
task_id='Processing_user',
python_callable= _processing_user
)
Предупреждающее сообщение:
[2021-10-14 19:02:47,227] {dagbag.py:487} INFO - Filling up the DagBag from /home/airflow/airflow/dags
[2021-10-14 19:02:47,577] {baseoperator.py:1287} WARNING - Dependency <Task(_PythonDecoratedOperator): task_group_function.task_1>, task_group_function.task_2 already registered for DAG: example_task_group_decorator
[2021-10-14 19:02:47,579] {baseoperator.py:1287} WARNING - Dependency
[2021-10-14 19:02:47,586] {baseoperator.py:1287} WARNING - Dependency <Task(_PythonDecoratedOperator): task_group_function.task_1>, task_group_function.task_2 already registered for DAG: example_task_group_decorator
[2021-10-14 19:02:47,588] {baseoperator.py:1287} WARNING - Dependency
[2021-10-14 19:02:47,594] {baseoperator.py:1287} WARNING - Dependency
[2021-10-14 19:02:47,602] {baseoperator.py:1287} WARNING - Dependency <Task(_PythonDecoratedOperator): task_group_function.task_3>, task_group_function.task_2 already registered for DAG: example_task_group_decorator
[2021-10-14 19:02:47,604] {baseoperator.py:1287} WARNING - Dependency <Task(_PythonDecoratedOperator): task_group_function.task_2>, task_group_function.task_3 already registered for DAG: example_task_group_decorator
[2021-10-14 19:02:47,607] {baseoperator.py:1287} WARNING - Dependency <Task(_PythonDecoratedOperator): task_group_function.task_3>, task_group_function.task_2 already registered for DAG: example_task_group_decorator
[2021-10-14 19:02:47,609] {baseoperator.py:1287} WARNING - Dependency <Task(_PythonDecoratedOperator): task_group_function.task_2>, task_group_function.task_3 already registered for DAG: example_task_group_decorator
[2021-10-14 19:02:47,612] {baseoperator.py:1287} WARNING - Dependency <Task(_PythonDecoratedOperator): task_group_function.task_3>, task_group_function.task_2 already registered for DAG: example_task_group_decorator
[2021-10-14 19:02:47,615] {baseoperator.py:1287} WARNING - Dependency
сообщение об ошибке:
Traceback (most recent call last):
File "/home/airflow/sandbox/bin/airflow", line 8, in <module>
sys.exit(main())
File "/home/airflow/sandbox/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
args.func(args)
File "/home/airflow/sandbox/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/home/airflow/sandbox/lib/python3.8/site-packages/airflow/utils/cli.py", line 91, in wrapper
return f(*args, **kwargs)
File "/home/airflow/sandbox/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 381, in task_test
task = dag.get_task(task_id=args.task_id)
File "/home/airflow/sandbox/lib/python3.8/site-packages/airflow/models/dag.py", line 1546, in get_task
raise TaskNotFound(f"Task {task_id} not found")
airflow.exceptions.TaskNotFound: Task processing_user not found
Ответ №1:
Для исключения похоже task_id
, что вы пытаетесь ссылаться на «Processing_user» (как написано в DAG), а не на «processing_user» (как написано в команде CLI); разница только в небольшой капитализации. Идентификатор задачи зависит от регистра.
Попробуйте сделать task_id
значение в команде CLI таким же, как написано в DAG (или наоборот).