#airflow #airflow-scheduler
#воздушный поток #воздушный поток-планировщик
Вопрос:
У меня есть база данных Apache Airflow, которая выполняется ежечасно, но «случайным образом» завершается сбоем, что означает, что она выполняется примерно в половине случаев. Dag состоит ровно из одной задачи, которая использует BashOperator для выполнения одного скручивания. Проблема возникает с запланированными и ручными триггерами.
Версия воздушного потока: 1.10.13.
Исполнитель: Сельдерей
Env: Kubernetes с диаграммой управления воздушным потоком Bitnami (1 Воздушный поток-веб, 1 воздушный поток-планировщик, 1 Воздушный поток-рабочий, я могу дать больше информации об этом, если это поможет)
DB: PSQL
Redis: экземпляр Redis предоставляется для настройки воздушного потока, но ключи отсутствуют.
DAG: DAG определяются внутри конфигурационной карты K8s и регулярно обновляются Airflow
Что я пробовал:
- Сбросил базу данных и два раза настраивал все с нуля
- Изменены параметры, такие как start_date, повторные попытки и т. Д.
- Отслеживаемое поведение в течение 2 дней
Код
from builtins import range
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
args = {
'owner': 'Airflow',
'start_date': datetime(2021, 1, 8, 8, 30, 0),
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id='persist__events_dag',
default_args=args,
schedule_interval='0 * * * *',
tags=['SparkJobs']
)
run_job = BashOperator(
task_id='curl_to_spark_api',
bash_command="""
//Here comes a curl command
""",
dag=dag,
)
run_job
Журналы
От работника воздушного потока при неудачных запусках:
[2021-01-08 10:36:13,313: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[c6e3fdfb-5474-47aa-b333-be8c69b23ebe]
[2021-01-08 10:36:13,314: INFO/ForkPoolWorker-15] Executing command in Celery: ['airflow', 'run', 'persist_events_dag', 'task_id', '2021-01-08T09:00:00 00:00', '--local', '--pool', 'default_pool', '-sd', '/opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py']
[2021-01-08 10:36:19,279] {__init__.py:50} INFO - Using executor CeleryExecutor
[2021-01-08 10:36:19,356] {dagbag.py:417} INFO - Filling up the DagBag from /opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py
/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/models/dag.py:1342: PendingDeprecationWarning: The requested task could not be added to the DAG because a task with task_id create_tag_template_field_result is already in the DAG. Starting in Airflow 2.0, trying to overwrite a task will raise an exception.
category=PendingDeprecationWarning)
Traceback (most recent call last):
File "/opt/bitnami/airflow/venv/bin/airflow", line 37, in <module>
args.func(args)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/utils/cli.py", line 76, in wrapper
return f(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/bin/cli.py", line 538, in run
dag = get_dag(args)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/bin/cli.py", line 164, in get_dag
'parse.'.format(args.dag_id))
airflow.exceptions.AirflowException: dag_id could not be found: persist_events_dag. Either the dag did not exist or it failed to parse.
[2021-01-08 10:36:20,373: ERROR/ForkPoolWorker-15] execute_command encountered a CalledProcessError
Traceback (most recent call last):
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 78, in execute_command
close_fds=True, env=env)
File "/opt/bitnami/python/lib/python3.6/subprocess.py", line 311, in check_call
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['airflow', 'run', 'persist_events_dag', 'task_id', '2021-01-08T09:00:00 00:00', '--local', '--pool', 'default_pool', '-sd', '/opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py']' returned non-zero exit status 1.
[2021-01-08 10:36:20,373: ERROR/ForkPoolWorker-15] None
[2021-01-08 10:36:20,526: ERROR/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[c6e3fdfb-5474-47aa-b333-be8c69b23ebe] raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 78, in execute_command
close_fds=True, env=env)
File "/opt/bitnami/python/lib/python3.6/subprocess.py", line 311, in check_call
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['airflow', 'run', 'persist_events_dag', 'task_id', '2021-01-08T09:00:00 00:00', '--local', '--pool', 'default_pool', '-sd', '/opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py']' returned non-zero exit status 1.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/celery/app/trace.py", line 412, in trace_task
R = retval = fun(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/celery/app/trace.py", line 704, in __protected_call__
return self.run(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 83, in execute_command
raise AirflowException('Celery command failed')
airflow.exceptions.AirflowException: Celery command failed
[2021-01-08 10:36:43,230: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[8ba7956f-6b96-48f8-b112-3a4d7baa8bf7]
[2021-01-08 10:36:43,231: INFO/ForkPoolWorker-15] Executing command in Celery: ['airflow', 'run', 'persist_events_dag', 'task_id', '2021-01-08T10:36:40.516529 00:00', '--local', '--pool', 'default_pool', '-sd', '/opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py']
[2021-01-08 10:36:49,157] {__init__.py:50} INFO - Using executor CeleryExecutor
[2021-01-08 10:36:49,158] {dagbag.py:417} INFO - Filling up the DagBag from /opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py
/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/models/dag.py:1342: PendingDeprecationWarning: The requested task could not be added to the DAG because a task with task_id create_tag_template_field_result is already in the DAG. Starting in Airflow 2.0, trying to overwrite a task will raise an exception.
category=PendingDeprecationWarning)
Traceback (most recent call last):
File "/opt/bitnami/airflow/venv/bin/airflow", line 37, in <module>
args.func(args)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/utils/cli.py", line 76, in wrapper
return f(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/bin/cli.py", line 538, in run
dag = get_dag(args)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/bin/cli.py", line 164, in get_dag
'parse.'.format(args.dag_id))
airflow.exceptions.AirflowException: dag_id could not be found: persist_events_dag. Either the dag did not exist or it failed to parse.
[2021-01-08 10:36:50,560: ERROR/ForkPoolWorker-15] execute_command encountered a CalledProcessError
Traceback (most recent call last):
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 78, in execute_command
close_fds=True, env=env)
File "/opt/bitnami/python/lib/python3.6/subprocess.py", line 311, in check_call
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['airflow', 'run', 'persist_events_dag', 'task_id', '2021-01-08T10:36:40.516529 00:00', '--local', '--pool', 'default_pool', '-sd', '/opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py']' returned non-zero exit status 1.
[2021-01-08 10:36:50,561: ERROR/ForkPoolWorker-15] None
[2021-01-08 10:36:50,649: ERROR/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[8ba7956f-6b96-48f8-b112-3a4d7baa8bf7] raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 78, in execute_command
close_fds=True, env=env)
File "/opt/bitnami/python/lib/python3.6/subprocess.py", line 311, in check_call
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['airflow', 'run', 'persist_events_dag', 'task_id', '2021-01-08T10:36:40.516529 00:00', '--local', '--pool', 'default_pool', '-sd', '/opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py']' returned non-zero exit status 1.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/celery/app/trace.py", line 412, in trace_task
R = retval = fun(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/celery/app/trace.py", line 704, in __protected_call__
return self.run(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 83, in execute_command
raise AirflowException('Celery command failed')
airflow.exceptions.AirflowException: Celery command failed
[2021-01-08 10:37:23,414: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[10478bca-c21d-4cf5-b366-5ca1f66c0fe1]
[2021-01-08 10:37:23,415: INFO/ForkPoolWorker-15] Executing command in Celery: ['airflow', 'run', 'persist_events_dag', 'task_id', '2021-01-08T09:00:00 00:00', '--local', '--pool', 'default_pool', '-sd', '/opt/bitnami/airflow/dags/external/..data/persist_screenviews.py']
[2021-01-08 10:37:29,457] {__init__.py:50} INFO - Using executor CeleryExecutor
[2021-01-08 10:37:29,458] {dagbag.py:417} INFO - Filling up the DagBag from /opt/bitnami/airflow/dags/external/..data/persist_screenviews.py
Running %s on host %s <TaskInstance: persist_events_dag.task_id 2021-01-08T09:00:00 00:00 [queued]> airflow-worker-0.airflow-worker-headless.default.svc.cluster.local
[2021-01-08 10:37:36,294: INFO/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[10478bca-c21d-4cf5-b366-5ca1f66c0fe1] succeeded in 12.8788606680464s: None
Если запуск завершается успешно, я, тем не менее, получаю следующую ошибку в журналах планировщика воздушного потока:
Process DagFileProcessor144631-Process:
Traceback (most recent call last):
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "dag_pkey"
DETAIL: Key (dag_id)=(persist_events_dag) already exists.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/bitnami/python/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/opt/bitnami/python/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 159, in _run_file_processor
pickle_dags)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1609, in process_file
dag.sync_to_db()
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/models/dag.py", line 1535, in sync_to_db
orm_dag.tags = self.get_dagtags(session=session)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in wrapper
return func(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/models/dag.py", line 1574, in get_dagtags
session.commit()
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1042, in commit
self.transaction.commit()
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 504, in commit
self._prepare_impl()
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl
self.session.flush()
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2536, in flush
self._flush(objects)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2678, in _flush
transaction.rollback(_capture_exception=True)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
with_traceback=exc_tb,
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2638, in _flush
flush_context.execute()
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
rec.execute(self)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
uow,
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
insert,
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 1083, in _emit_insert_statements
c = cached_connections[connection].execute(statement, multiparams)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
distilled_params,
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
e, statement, parameters, cursor, context
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "dag_pkey"
DETAIL: Key (dag_id)=(persist_events_dag) already exists.
[SQL: INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expired, scheduler_lock, pickle_id, fileloc, owners, description, default_view, schedule_interval) VALUES (%(dag_id)s, %(root_dag_id)s, %(is_paused)s, %(is_subdag)s, %(is_active)s, %(last_scheduler_run)s, %(last_pickled)s, %(last_expired)s, %(scheduler_lock)s, %(pickle_id)s, %(fileloc)s, %(owners)s, %(description)s, %(default_view)s, %(schedule_interval)s)]
[parameters: {'dag_id': 'persist_events_dag', 'root_dag_id': None, 'is_paused': True, 'is_subdag': False, 'is_active': True, 'last_scheduler_run': datetime.datetime(2021, 1, 8, 10, 55, 4, 297819, tzinfo=<Timezone [UTC]>), 'last_pickled': None, 'last_expired': None, 'scheduler_lock': None, 'pickle_id': None, 'fileloc': '/opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py', 'owners': 'Airflow', 'description': None, 'default_view': None, 'schedule_interval': '"0 * * * *"'}]
(Background on this error at: http://sqlalche.me/e/13/gkpj)
Process DagFileProcessor144689-Process:
Traceback (most recent call last):
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "dag_pkey"
DETAIL: Key (dag_id)=(persist_events_dag) already exists.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/bitnami/python/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/opt/bitnami/python/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 159, in _run_file_processor
pickle_dags)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1609, in process_file
dag.sync_to_db()
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/models/dag.py", line 1535, in sync_to_db
orm_dag.tags = self.get_dagtags(session=session)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in wrapper
return func(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/models/dag.py", line 1574, in get_dagtags
session.commit()
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1042, in commit
self.transaction.commit()
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 504, in commit
self._prepare_impl()
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl
self.session.flush()
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2536, in flush
self._flush(objects)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2678, in _flush
transaction.rollback(_capture_exception=True)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
with_traceback=exc_tb,
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2638, in _flush
flush_context.execute()
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
rec.execute(self)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
uow,
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
insert,
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 1083, in _emit_insert_statements
c = cached_connections[connection].execute(statement, multiparams)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
distilled_params,
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
e, statement, parameters, cursor, context
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
sqlalchemy_exception, with_traceback=exc_info[2], from_=e
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
cursor, statement, parameters, context
File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "dag_pkey"
DETAIL: Key (dag_id)=(persist_events_dag) already exists.
[SQL: INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expired, scheduler_lock, pickle_id, fileloc, owners, description, default_view, schedule_interval) VALUES (%(dag_id)s, %(root_dag_id)s, %(is_paused)s, %(is_subdag)s, %(is_active)s, %(last_scheduler_run)s, %(last_pickled)s, %(last_expired)s, %(scheduler_lock)s, %(pickle_id)s, %(fileloc)s, %(owners)s, %(description)s, %(default_view)s, %(schedule_interval)s)]
[parameters: {'dag_id': 'persist_events_dag', 'root_dag_id': None, 'is_paused': True, 'is_subdag': False, 'is_active': True, 'last_scheduler_run': datetime.datetime(2021, 1, 8, 10, 55, 33, 439514, tzinfo=<Timezone [UTC]>), 'last_pickled': None, 'last_expired': None, 'scheduler_lock': None, 'pickle_id': None, 'fileloc': '/opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py', 'owners': 'Airflow', 'description': None, 'default_view': None, 'schedule_interval': '"0 * * * *"'}]
(Background on this error at: http://sqlalche.me/e/13/gkpj)
[2021-01-08 10:57:22,392] {scheduler_job.py:963} INFO - 1 tasks up for execution:
<TaskInstance: persist_events_dag.task_id 2021-01-08 09:00:00 00:00 [scheduled]>
[2021-01-08 10:57:22,402] {scheduler_job.py:997} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 1 task instances ready to be queued
[2021-01-08 10:57:22,403] {scheduler_job.py:1025} INFO - DAG persist_events_dag has 0/16 running and queued tasks
[2021-01-08 10:57:22,407] {scheduler_job.py:1085} INFO - Setting the following tasks to queued state:
<TaskInstance: persist_events_dag.task_id 2021-01-08 09:00:00 00:00 [scheduled]>
[2021-01-08 10:57:22,420] {scheduler_job.py:1159} INFO - Setting the following 1 tasks to queued state:
<TaskInstance: persist_events_dag.task_id 2021-01-08 09:00:00 00:00 [queued]>
[2021-01-08 10:57:22,420] {scheduler_job.py:1195} INFO - Sending ('persist_events_dag', 'task_id', datetime.datetime(2021, 1, 8, 9, 0, tzinfo=<TimezoneInfo [UTC, GMT, 00:00:00, STD]>), 1) to executor with priority 1 and queue default
[2021-01-08 10:57:22,424] {base_executor.py:58} INFO - Adding to queue: ['airflow', 'run', 'persist_events_dag', 'task_id', '2021-01-08T09:00:00 00:00', '--local', '--pool', 'default_pool', '-sd', '/opt/bitnami/airflow/dags/external/persist_screenviews.py']
[2021-01-08 10:57:36,579] {scheduler_job.py:1334} INFO - Executor reports execution of persist_events_dag.task_id execution_date=2021-01-08 09:00:00 00:00 exited with status success for try_number 1
Комментарии:
1. Интересно, что dag_id в коде — это «persist__events_dag» с подчеркиванием 2, но в журнале это «persist_events_dag»
2. Я бы изменил пару вещей: schedule_interval=’@hourly’, start_date=datetime(2021, 1, 8), catchup= False, max_active_runs = 1. Аргументы: ‘повторные попытки’: 1