воздушный поток.исключения.TaskNotFound: Пользователь, обрабатывающий задачу, не найден

#python #airflow

Вопрос:

Привет, я установил airflow в VirtualBox, во время тестирования приведенного ниже кода с помощью команды airflow tasks test user_processing processing_user 2020-01-02 я получаю предупреждающее сообщение и ошибку TaskNotFound. (Последовал курсу Марка Ламберти). заранее спасибо.

Мой код: создал таблицу с помощью оператора sqlite, использовал оператор http для загрузки пользователя из API, затем использовал оператор Python для обработки информации

 from airflow.models import DAG
from datetime import datetime
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
import json


from airflow.operators.python import PythonOperator
from pandas import json_normalize
default_args = {'start_date' : datetime(2021,10,11)}

def _processing_user(ti):
    users = ti.xcom_pull(task_id=['extracting_user'])
    if not len(users) or 'results' not in users[0]:
        raise ValueError('User is Empty')
    user = users[0]['results'][0]
    processed_user = json_normalize({
        'firstname':user['name']['first'],
        'lastname':user['name']['last'],
        'country':user['location']['country'],
        'username':user['login']['username'],
        'password':user['login']['password'],
        'email': user['email']
    })
    processed_user.to_csv('/tmp/processed_user.csv', index=None, header=False)


with DAG('user_processing',schedule_interval='@daily',default_args=default_args,catchup=False) as dag:
        creating_table = SqliteOperator(
            task_id='creating_table',
            sqlite_conn_id='db_sqlite',
            sql='''
                CREATE TABLE users (
                firstname TEXT NOT NULL,
                lastname TEXT NOT NULL,
                country TEXT NOT NULL,
                password TEXT NOT NULL,
                email TEXT NOT NULL PRIMARY KEY
               );
               '''
    )

        is_api_available = HttpSensor(
           task_id='is_api_available',
           http_conn_id='user_api',
           endpoint='api/')

        extracting_user = SimpleHttpOperator(
            task_id='extracting_user',
            http_conn_id='user_api',
            endpoint='api/',
            method='GET',
            response_filter=lambda response: json.loads(response.text),
            log_response=True
        )
    
        processing_user = PythonOperator(
           task_id='Processing_user',
           python_callable= _processing_user
        )
 

Предупреждающее сообщение:

 [2021-10-14 19:02:47,227] {dagbag.py:487} INFO - Filling up the DagBag from /home/airflow/airflow/dags
[2021-10-14 19:02:47,577] {baseoperator.py:1287} WARNING - Dependency <Task(_PythonDecoratedOperator): task_group_function.task_1>, task_group_function.task_2 already registered for DAG: example_task_group_decorator
[2021-10-14 19:02:47,579] {baseoperator.py:1287} WARNING - Dependency 
[2021-10-14 19:02:47,586] {baseoperator.py:1287} WARNING - Dependency <Task(_PythonDecoratedOperator): task_group_function.task_1>, task_group_function.task_2 already registered for DAG: example_task_group_decorator
[2021-10-14 19:02:47,588] {baseoperator.py:1287} WARNING - Dependency 
[2021-10-14 19:02:47,594] {baseoperator.py:1287} WARNING - Dependency 
[2021-10-14 19:02:47,602] {baseoperator.py:1287} WARNING - Dependency <Task(_PythonDecoratedOperator): task_group_function.task_3>, task_group_function.task_2 already registered for DAG: example_task_group_decorator
[2021-10-14 19:02:47,604] {baseoperator.py:1287} WARNING - Dependency <Task(_PythonDecoratedOperator): task_group_function.task_2>, task_group_function.task_3 already registered for DAG: example_task_group_decorator
[2021-10-14 19:02:47,607] {baseoperator.py:1287} WARNING - Dependency <Task(_PythonDecoratedOperator): task_group_function.task_3>, task_group_function.task_2 already registered for DAG: example_task_group_decorator
[2021-10-14 19:02:47,609] {baseoperator.py:1287} WARNING - Dependency <Task(_PythonDecoratedOperator): task_group_function.task_2>, task_group_function.task_3 already registered for DAG: example_task_group_decorator
[2021-10-14 19:02:47,612] {baseoperator.py:1287} WARNING - Dependency <Task(_PythonDecoratedOperator): task_group_function.task_3>, task_group_function.task_2 already registered for DAG: example_task_group_decorator
[2021-10-14 19:02:47,615] {baseoperator.py:1287} WARNING - Dependency 
 

сообщение об ошибке:

  Traceback (most recent call last):
      File "/home/airflow/sandbox/bin/airflow", line 8, in <module>
        sys.exit(main())
      File "/home/airflow/sandbox/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
        args.func(args)
      File "/home/airflow/sandbox/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
        return func(*args, **kwargs)
      File "/home/airflow/sandbox/lib/python3.8/site-packages/airflow/utils/cli.py", line 91, in wrapper
        return f(*args, **kwargs)
      File "/home/airflow/sandbox/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 381, in task_test
        task = dag.get_task(task_id=args.task_id)
      File "/home/airflow/sandbox/lib/python3.8/site-packages/airflow/models/dag.py", line 1546, in get_task
        raise TaskNotFound(f"Task {task_id} not found")
    airflow.exceptions.TaskNotFound: Task processing_user not found
 

Ответ №1:

Для исключения похоже task_id , что вы пытаетесь ссылаться на «Processing_user» (как написано в DAG), а не на «processing_user» (как написано в команде CLI); разница только в небольшой капитализации. Идентификатор задачи зависит от регистра.

Попробуйте сделать task_id значение в команде CLI таким же, как написано в DAG (или наоборот).