Планировщик воздушного потока 2.0 с локальным исполнителем и сбоем postgress

#postgresql #airflow-scheduler #airflow-2.x

Вопрос:

Я переключился на LocalExecutor с SequentialExecutor, потому что хотел, чтобы один из моих экземпляров dag работал параллельно. Когда я запускаю их либо параллельно, либо в одном экземпляре, я получаю ошибку postgress.

В моей dag у меня есть 3 задачи оператора bash: «test1_run», «test2_run» и «test3_run»

Я запускаю dag из CLI с помощью :

 airflow dags trigger 'parallel_test_dag' --conf '{"message":"test1"}'
and then,
airflow dags trigger 'parallel_test_dag' --conf '{"message":"test2"}'
from two different terminals
 

Я прикрепляю трассировку стека

 Traceback (most recent call last):
  File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
psycopg2.OperationalError: SSL error: sslv3 alert bad record mac


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ubu20/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 91, in wrapper
    return f(*args, **kwargs)
  File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py", line 64, in scheduler
    job.run()
  File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 245, in run
    self._execute()
  File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1299, in _execute
    self._run_scheduler_loop()
  File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1396, in _run_scheduler_loop
    num_finished_events = self._process_executor_events(session=session)
  File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1236, in _process_executor_events
    tis: List[TI] = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')).all()
  File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3373, in all
    return list(self)
  File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__
    return self._execute_and_instances(context)
  File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances
    result = conn.execute(querycontext.statement, self._params)
  File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
    return meth(self, multiparams, params)
  File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
    ret = self._execute_context(
  File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
    self._handle_dbapi_exception(
  File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
    util.raise_(
  File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
    raise exception
  File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
    self.dialect.do_execute(
  File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: sslv3 alert bad record mac

[SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id 
FROM task_instance 
WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.execution_date = %(execution_date_1)s AND task_instance.task_id IN (%(task_id_1)s)]
[parameters: {'dag_id_1': 'parallel_test_dag', 'execution_date_1': datetime.datetime(2021, 9, 16, 17, 54, 38, tzinfo=Timezone('UTC')), 'task_id_1': 'test1_run'}]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
 

Я получаю эту ошибку, когда одна задача завершается, и она пытается загрузить следующую задачу после успешного завершения предыдущей задачи

Есть идеи, как это исправить?