#postgresql #airflow-scheduler #airflow-2.x
Вопрос:
Я переключился на LocalExecutor с SequentialExecutor, потому что хотел, чтобы один из моих экземпляров dag работал параллельно. Когда я запускаю их либо параллельно, либо в одном экземпляре, я получаю ошибку postgress.
В моей dag у меня есть 3 задачи оператора bash: «test1_run», «test2_run» и «test3_run»
Я запускаю dag из CLI с помощью :
airflow dags trigger 'parallel_test_dag' --conf '{"message":"test1"}'
and then,
airflow dags trigger 'parallel_test_dag' --conf '{"message":"test2"}'
from two different terminals
Я прикрепляю трассировку стека
Traceback (most recent call last):
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
psycopg2.OperationalError: SSL error: sslv3 alert bad record mac
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/ubu20/.local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/__main__.py", line 40, in main
args.func(args)
File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 91, in wrapper
return f(*args, **kwargs)
File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py", line 64, in scheduler
job.run()
File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 245, in run
self._execute()
File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1299, in _execute
self._run_scheduler_loop()
File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1396, in _run_scheduler_loop
num_finished_events = self._process_executor_events(session=session)
File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
return func(*args, **kwargs)
File "/home/ubu20/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1236, in _process_executor_events
tis: List[TI] = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')).all()
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3373, in all
return list(self)
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__
return self._execute_and_instances(context)
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances
result = conn.execute(querycontext.statement, self._params)
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1011, in execute
return meth(self, multiparams, params)
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1124, in _execute_clauseelement
ret = self._execute_context(
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1316, in _execute_context
self._handle_dbapi_exception(
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1510, in _handle_dbapi_exception
util.raise_(
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 182, in raise_
raise exception
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context
self.dialect.do_execute(
File "/home/ubu20/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) SSL error: sslv3 alert bad record mac
[SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.external_executor_id AS task_instance_external_executor_id
FROM task_instance
WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.execution_date = %(execution_date_1)s AND task_instance.task_id IN (%(task_id_1)s)]
[parameters: {'dag_id_1': 'parallel_test_dag', 'execution_date_1': datetime.datetime(2021, 9, 16, 17, 54, 38, tzinfo=Timezone('UTC')), 'task_id_1': 'test1_run'}]
(Background on this error at: http://sqlalche.me/e/13/e3q8)
Я получаю эту ошибку, когда одна задача завершается, и она пытается загрузить следующую задачу после успешного завершения предыдущей задачи
Есть идеи, как это исправить?