«Случайный» сбой почасовых задач

#airflow #airflow-scheduler

#воздушный поток #воздушный поток-планировщик

Вопрос:

У меня есть база данных Apache Airflow, которая выполняется ежечасно, но «случайным образом» завершается сбоем, что означает, что она выполняется примерно в половине случаев. Dag состоит ровно из одной задачи, которая использует BashOperator для выполнения одного скручивания. Проблема возникает с запланированными и ручными триггерами.

Версия воздушного потока: 1.10.13.

Исполнитель: Сельдерей

Env: Kubernetes с диаграммой управления воздушным потоком Bitnami (1 Воздушный поток-веб, 1 воздушный поток-планировщик, 1 Воздушный поток-рабочий, я могу дать больше информации об этом, если это поможет)

DB: PSQL

Redis: экземпляр Redis предоставляется для настройки воздушного потока, но ключи отсутствуют.

DAG: DAG определяются внутри конфигурационной карты K8s и регулярно обновляются Airflow

Что я пробовал:

  • Сбросил базу данных и два раза настраивал все с нуля
  • Изменены параметры, такие как start_date, повторные попытки и т. Д.
  • Отслеживаемое поведение в течение 2 дней

Код

   from builtins import range
  from datetime import datetime, timedelta

  from airflow.models import DAG
  from airflow.operators.bash_operator import BashOperator

  args = {
      'owner': 'Airflow',
      'start_date': datetime(2021, 1, 8, 8, 30, 0),
      'retries': 3,
      'retry_delay': timedelta(minutes=5),
  }

  dag = DAG(
      dag_id='persist__events_dag',
      default_args=args,
      schedule_interval='0 * * * *',
      tags=['SparkJobs']
  )

  run_job = BashOperator(
    task_id='curl_to_spark_api',
    bash_command="""
    //Here comes a curl command
    """,
    dag=dag,
  )
  
  run_job
 

Журналы

От работника воздушного потока при неудачных запусках:

 [2021-01-08 10:36:13,313: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[c6e3fdfb-5474-47aa-b333-be8c69b23ebe]
[2021-01-08 10:36:13,314: INFO/ForkPoolWorker-15] Executing command in Celery: ['airflow', 'run', 'persist_events_dag', 'task_id', '2021-01-08T09:00:00 00:00', '--local', '--pool', 'default_pool', '-sd', '/opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py']
[2021-01-08 10:36:19,279] {__init__.py:50} INFO - Using executor CeleryExecutor
[2021-01-08 10:36:19,356] {dagbag.py:417} INFO - Filling up the DagBag from /opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py
/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/models/dag.py:1342: PendingDeprecationWarning: The requested task could not be added to the DAG because a task with task_id create_tag_template_field_result is already in the DAG. Starting in Airflow 2.0, trying to overwrite a task will raise an exception.
  category=PendingDeprecationWarning)
Traceback (most recent call last):
  File "/opt/bitnami/airflow/venv/bin/airflow", line 37, in <module>
    args.func(args)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/utils/cli.py", line 76, in wrapper
    return f(*args, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/bin/cli.py", line 538, in run
    dag = get_dag(args)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/bin/cli.py", line 164, in get_dag
    'parse.'.format(args.dag_id))
airflow.exceptions.AirflowException: dag_id could not be found: persist_events_dag. Either the dag did not exist or it failed to parse.
[2021-01-08 10:36:20,373: ERROR/ForkPoolWorker-15] execute_command encountered a CalledProcessError
Traceback (most recent call last):
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 78, in execute_command
    close_fds=True, env=env)
  File "/opt/bitnami/python/lib/python3.6/subprocess.py", line 311, in check_call
    raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['airflow', 'run', 'persist_events_dag', 'task_id', '2021-01-08T09:00:00 00:00', '--local', '--pool', 'default_pool', '-sd', '/opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py']' returned non-zero exit status 1.
[2021-01-08 10:36:20,373: ERROR/ForkPoolWorker-15] None
[2021-01-08 10:36:20,526: ERROR/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[c6e3fdfb-5474-47aa-b333-be8c69b23ebe] raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 78, in execute_command
    close_fds=True, env=env)
  File "/opt/bitnami/python/lib/python3.6/subprocess.py", line 311, in check_call
    raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['airflow', 'run', 'persist_events_dag', 'task_id', '2021-01-08T09:00:00 00:00', '--local', '--pool', 'default_pool', '-sd', '/opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py']' returned non-zero exit status 1.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/celery/app/trace.py", line 412, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/celery/app/trace.py", line 704, in __protected_call__
    return self.run(*args, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 83, in execute_command
    raise AirflowException('Celery command failed')
airflow.exceptions.AirflowException: Celery command failed
[2021-01-08 10:36:43,230: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[8ba7956f-6b96-48f8-b112-3a4d7baa8bf7]
[2021-01-08 10:36:43,231: INFO/ForkPoolWorker-15] Executing command in Celery: ['airflow', 'run', 'persist_events_dag', 'task_id', '2021-01-08T10:36:40.516529 00:00', '--local', '--pool', 'default_pool', '-sd', '/opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py']
[2021-01-08 10:36:49,157] {__init__.py:50} INFO - Using executor CeleryExecutor
[2021-01-08 10:36:49,158] {dagbag.py:417} INFO - Filling up the DagBag from /opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py
/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/models/dag.py:1342: PendingDeprecationWarning: The requested task could not be added to the DAG because a task with task_id create_tag_template_field_result is already in the DAG. Starting in Airflow 2.0, trying to overwrite a task will raise an exception.
  category=PendingDeprecationWarning)
Traceback (most recent call last):
  File "/opt/bitnami/airflow/venv/bin/airflow", line 37, in <module>
    args.func(args)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/utils/cli.py", line 76, in wrapper
    return f(*args, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/bin/cli.py", line 538, in run
    dag = get_dag(args)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/bin/cli.py", line 164, in get_dag
    'parse.'.format(args.dag_id))
airflow.exceptions.AirflowException: dag_id could not be found: persist_events_dag. Either the dag did not exist or it failed to parse.
[2021-01-08 10:36:50,560: ERROR/ForkPoolWorker-15] execute_command encountered a CalledProcessError
Traceback (most recent call last):
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 78, in execute_command
    close_fds=True, env=env)
  File "/opt/bitnami/python/lib/python3.6/subprocess.py", line 311, in check_call
    raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['airflow', 'run', 'persist_events_dag', 'task_id', '2021-01-08T10:36:40.516529 00:00', '--local', '--pool', 'default_pool', '-sd', '/opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py']' returned non-zero exit status 1.
[2021-01-08 10:36:50,561: ERROR/ForkPoolWorker-15] None
[2021-01-08 10:36:50,649: ERROR/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[8ba7956f-6b96-48f8-b112-3a4d7baa8bf7] raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 78, in execute_command
    close_fds=True, env=env)
  File "/opt/bitnami/python/lib/python3.6/subprocess.py", line 311, in check_call
    raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command '['airflow', 'run', 'persist_events_dag', 'task_id', '2021-01-08T10:36:40.516529 00:00', '--local', '--pool', 'default_pool', '-sd', '/opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py']' returned non-zero exit status 1.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/celery/app/trace.py", line 412, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/celery/app/trace.py", line 704, in __protected_call__
    return self.run(*args, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 83, in execute_command
    raise AirflowException('Celery command failed')
airflow.exceptions.AirflowException: Celery command failed
[2021-01-08 10:37:23,414: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[10478bca-c21d-4cf5-b366-5ca1f66c0fe1]
[2021-01-08 10:37:23,415: INFO/ForkPoolWorker-15] Executing command in Celery: ['airflow', 'run', 'persist_events_dag', 'task_id', '2021-01-08T09:00:00 00:00', '--local', '--pool', 'default_pool', '-sd', '/opt/bitnami/airflow/dags/external/..data/persist_screenviews.py']
[2021-01-08 10:37:29,457] {__init__.py:50} INFO - Using executor CeleryExecutor
[2021-01-08 10:37:29,458] {dagbag.py:417} INFO - Filling up the DagBag from /opt/bitnami/airflow/dags/external/..data/persist_screenviews.py
Running %s on host %s <TaskInstance: persist_events_dag.task_id 2021-01-08T09:00:00 00:00 [queued]> airflow-worker-0.airflow-worker-headless.default.svc.cluster.local
[2021-01-08 10:37:36,294: INFO/ForkPoolWorker-15] Task airflow.executors.celery_executor.execute_command[10478bca-c21d-4cf5-b366-5ca1f66c0fe1] succeeded in 12.8788606680464s: None
 

Если запуск завершается успешно, я, тем не менее, получаю следующую ошибку в журналах планировщика воздушного потока:

 Process DagFileProcessor144631-Process:
Traceback (most recent call last):
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "dag_pkey"
DETAIL:  Key (dag_id)=(persist_events_dag) already exists.


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/bitnami/python/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/opt/bitnami/python/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 159, in _run_file_processor
    pickle_dags)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1609, in process_file
    dag.sync_to_db()
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/models/dag.py", line 1535, in sync_to_db
    orm_dag.tags = self.get_dagtags(session=session)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in wrapper
    return func(*args, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/models/dag.py", line 1574, in get_dagtags
    session.commit()
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1042, in commit
    self.transaction.commit()
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 504, in commit
    self._prepare_impl()
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl
    self.session.flush()
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2536, in flush
    self._flush(objects)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2678, in _flush
    transaction.rollback(_capture_exception=True)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2638, in _flush
    flush_context.execute()
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
    rec.execute(self)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
    uow,
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
    insert,
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 1083, in _emit_insert_statements
    c = cached_connections[connection].execute(statement, multiparams)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
    distilled_params,
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
    e, statement, parameters, cursor, context
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
    sqlalchemy_exception, with_traceback=exc_info[2], from_=e
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "dag_pkey"
DETAIL:  Key (dag_id)=(persist_events_dag) already exists.

[SQL: INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expired, scheduler_lock, pickle_id, fileloc, owners, description, default_view, schedule_interval) VALUES (%(dag_id)s, %(root_dag_id)s, %(is_paused)s, %(is_subdag)s, %(is_active)s, %(last_scheduler_run)s, %(last_pickled)s, %(last_expired)s, %(scheduler_lock)s, %(pickle_id)s, %(fileloc)s, %(owners)s, %(description)s, %(default_view)s, %(schedule_interval)s)]
[parameters: {'dag_id': 'persist_events_dag', 'root_dag_id': None, 'is_paused': True, 'is_subdag': False, 'is_active': True, 'last_scheduler_run': datetime.datetime(2021, 1, 8, 10, 55, 4, 297819, tzinfo=<Timezone [UTC]>), 'last_pickled': None, 'last_expired': None, 'scheduler_lock': None, 'pickle_id': None, 'fileloc': '/opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py', 'owners': 'Airflow', 'description': None, 'default_view': None, 'schedule_interval': '"0 * * * *"'}]
(Background on this error at: http://sqlalche.me/e/13/gkpj)
Process DagFileProcessor144689-Process:
Traceback (most recent call last):
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "dag_pkey"
DETAIL:  Key (dag_id)=(persist_events_dag) already exists.


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/opt/bitnami/python/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/opt/bitnami/python/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 159, in _run_file_processor
    pickle_dags)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1609, in process_file
    dag.sync_to_db()
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/models/dag.py", line 1535, in sync_to_db
    orm_dag.tags = self.get_dagtags(session=session)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/utils/db.py", line 70, in wrapper
    return func(*args, **kwargs)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/airflow/models/dag.py", line 1574, in get_dagtags
    session.commit()
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 1042, in commit
    self.transaction.commit()
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 504, in commit
    self._prepare_impl()
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl
    self.session.flush()
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2536, in flush
    self._flush(objects)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2678, in _flush
    transaction.rollback(_capture_exception=True)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    with_traceback=exc_tb,
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/session.py", line 2638, in _flush
    flush_context.execute()
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
    rec.execute(self)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
    uow,
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
    insert,
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/orm/persistence.py", line 1083, in _emit_insert_statements
    c = cached_connections[connection].execute(statement, multiparams)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement
    distilled_params,
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context
    e, statement, parameters, cursor, context
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception
    sqlalchemy_exception, with_traceback=exc_info[2], from_=e
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/opt/bitnami/airflow/venv/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "dag_pkey"
DETAIL:  Key (dag_id)=(persist_events_dag) already exists.

[SQL: INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, last_scheduler_run, last_pickled, last_expired, scheduler_lock, pickle_id, fileloc, owners, description, default_view, schedule_interval) VALUES (%(dag_id)s, %(root_dag_id)s, %(is_paused)s, %(is_subdag)s, %(is_active)s, %(last_scheduler_run)s, %(last_pickled)s, %(last_expired)s, %(scheduler_lock)s, %(pickle_id)s, %(fileloc)s, %(owners)s, %(description)s, %(default_view)s, %(schedule_interval)s)]
[parameters: {'dag_id': 'persist_events_dag', 'root_dag_id': None, 'is_paused': True, 'is_subdag': False, 'is_active': True, 'last_scheduler_run': datetime.datetime(2021, 1, 8, 10, 55, 33, 439514, tzinfo=<Timezone [UTC]>), 'last_pickled': None, 'last_expired': None, 'scheduler_lock': None, 'pickle_id': None, 'fileloc': '/opt/bitnami/airflow/dags/external/..2021_01_08_10_33_28.059622242/persist_screenviews.py', 'owners': 'Airflow', 'description': None, 'default_view': None, 'schedule_interval': '"0 * * * *"'}]
(Background on this error at: http://sqlalche.me/e/13/gkpj)
[2021-01-08 10:57:22,392] {scheduler_job.py:963} INFO - 1 tasks up for execution:
    <TaskInstance: persist_events_dag.task_id 2021-01-08 09:00:00 00:00 [scheduled]>
[2021-01-08 10:57:22,402] {scheduler_job.py:997} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 1 task instances ready to be queued
[2021-01-08 10:57:22,403] {scheduler_job.py:1025} INFO - DAG persist_events_dag has 0/16 running and queued tasks
[2021-01-08 10:57:22,407] {scheduler_job.py:1085} INFO - Setting the following tasks to queued state:
    <TaskInstance: persist_events_dag.task_id 2021-01-08 09:00:00 00:00 [scheduled]>
[2021-01-08 10:57:22,420] {scheduler_job.py:1159} INFO - Setting the following 1 tasks to queued state:
    <TaskInstance: persist_events_dag.task_id 2021-01-08 09:00:00 00:00 [queued]>
[2021-01-08 10:57:22,420] {scheduler_job.py:1195} INFO - Sending ('persist_events_dag', 'task_id', datetime.datetime(2021, 1, 8, 9, 0, tzinfo=<TimezoneInfo [UTC, GMT,  00:00:00, STD]>), 1) to executor with priority 1 and queue default
[2021-01-08 10:57:22,424] {base_executor.py:58} INFO - Adding to queue: ['airflow', 'run', 'persist_events_dag', 'task_id', '2021-01-08T09:00:00 00:00', '--local', '--pool', 'default_pool', '-sd', '/opt/bitnami/airflow/dags/external/persist_screenviews.py']
[2021-01-08 10:57:36,579] {scheduler_job.py:1334} INFO - Executor reports execution of persist_events_dag.task_id execution_date=2021-01-08 09:00:00 00:00 exited with status success for try_number 1
 

Комментарии:

1. Интересно, что dag_id в коде — это «persist__events_dag» с подчеркиванием 2, но в журнале это «persist_events_dag»

2. Я бы изменил пару вещей: schedule_interval=’@hourly’, start_date=datetime(2021, 1, 8), catchup= False, max_active_runs = 1. Аргументы: ‘повторные попытки’: 1