Планировщик воздушного потока возвращает ошибку атрибута: Не удается выбрать локальный объект ‘SchedulerJob._execute..processor_factory’

#python-3.x #airflow #workflow

Вопрос:

это мой первый пост о StackOverflow и воздушном потоке. Сегодня я настраиваю свою установку воздушного потока и натыкаюсь на эту проблему.

(учебник по воздушному потоку) alex@MacBook-Pro воздушный поток-учебник % планировщик воздушного потока


____ |( )_______ / /________ __
____ /| |_ /__ / / __ / __ _ | /| / /
___ ___ | / _ / _ / _ / / // / |/ |/ /
// |// // // // _
/____/|__/

 [2021-10-13 20:45:18,369] {__init__.py:51} INFO - Using executor SequentialExecutor
[2021-10-13 20:45:18,373] {scheduler_job.py:1346} INFO - Starting the scheduler
[2021-10-13 20:45:18,373] {scheduler_job.py:1354} INFO - Running execute loop for -1 seconds
[2021-10-13 20:45:18,373] {scheduler_job.py:1355} INFO - Processing each file at most -1 times
[2021-10-13 20:45:18,373] {scheduler_job.py:1358} INFO - Searching for files in /Users/alex/Desktop/airflow-tutorial/dags
[2021-10-13 20:45:18,376] {scheduler_job.py:1360} INFO - There are 24 files in /Users/alex/Desktop/airflow-tutorial/dags
[2021-10-13 20:45:18,376] {scheduler_job.py:1411} INFO - Resetting orphaned tasks for active dag runs
[2021-10-13 20:45:18,388] {scheduler_job.py:1384} ERROR - Exception when executing execute_helper
 
 Traceback (most recent call last):
  File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1382, in _execute
    self._execute_helper()
  File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1415, in _execute_helper
    self.processor_agent.start()
  File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.8/site-packages/airflow/utils/dag_processing.py", line 554, in start
    self._process.start()
  File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.8/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.8/multiprocessing/context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.8/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.8/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.8/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'SchedulerJob._execute.<locals>.processor_factory'
[2021-10-13 20:45:18,395] {helpers.py:325} INFO - Sending Signals.SIGTERM to GPID None
Traceback (most recent call last):
  File "/opt/anaconda3/envs/airflow-tutorial/bin/airflow", line 37, in <module>
    args.func(args)
  File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.8/site-packages/airflow/utils/cli.py", line 75, in wrapper
    return f(*args, **kwargs)
  File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.8/site-packages/airflow/bin/cli.py", line 1040, in scheduler
    job.run()
  File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 221, in run
    self._execute()
  File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 1386, in _execute
    self.processor_agent.end()
  File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.8/site-packages/airflow/utils/dag_processing.py", line 707, in end
    reap_process_group(self._process.pid, log=self.log)
  File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.8/site-packages/airflow/utils/helpers.py", line 327, in reap_process_group
    signal_procs(sig)
  File "/opt/anaconda3/envs/airflow-tutorial/lib/python3.8/site-packages/airflow/utils/helpers.py", line 296, in signal_procs
    os.killpg(pgid, sig)
TypeError: an integer is required (got type NoneType)
 

Комментарии:

1. Та же ошибка, и в macOS тоже… Python 3.8.6, apache-воздушный поток==1.10.9