#python #airflow
#python #воздушный поток
Вопрос:
Я пытаюсь запустить Apache Airflow PythonVirtualenvOperator в одном из моих баз данных, но Airflow выдает следующую ошибку:
[2020-12-14 20:06:32,291] {python_operator.py:316} INFO - Executing cmd
['virtualenv', '/tmp/venvwtqb3rki', '--python=python3.8']
[2020-12-14 20:06:32,301] {taskinstance.py:1150} ERROR - [Errno 2] No such file or directory: 'virtualenv'
Traceback (most recent call last):
File "/opt/airflow/airflow_env/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
result = task_copy.execute(context=context)
File "/opt/airflow/airflow_env/lib/python3.8/site-packages/airflow/operators/python_operator.py", line 113, in execute
return_value = self.execute_callable()
File "/opt/airflow/airflow_env/lib/python3.8/site-packages/airflow/operators/python_operator.py", line 292, in execute_callable
self._execute_in_subprocess(self._generate_virtualenv_cmd(tmp_dir))
File "/opt/airflow/airflow_env/lib/python3.8/site-packages/airflow/operators/python_operator.py", line 317, in _execute_in_subprocess
output = subprocess.check_output(cmd,
File "/usr/lib/python3.8/subprocess.py", line 411, in check_output
return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
File "/usr/lib/python3.8/subprocess.py", line 489, in run
with Popen(*popenargs, **kwargs) as process:
File "/usr/lib/python3.8/subprocess.py", line 854, in __init__
self._execute_child(args, executable, preexec_fn, close_fds,
File "/usr/lib/python3.8/subprocess.py", line 1702, in _execute_child
raise child_exception_type(errno_num, err_msg, err_filename)
У меня есть Airflow и все мои базы данных, работающие от имени пользователя airflow. Я подумал, что, возможно, airflow не может найти команду virutalenv на своем пути во время настройки / выполнения задачи.
Вот код, который у меня есть в настоящее время для тестирования.
import logging
import datetime
from airflow import DAG
import airflow
from airflow.hooks.S3_hook import S3Hook
from airflow.contrib.hooks import aws_hook
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator, PythonVirtualenvOperator
from airflow.utils.dates import days_ago
import time
default_args = {
'owner':'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'retries': 0
}
dag = DAG (
dag_id = 'list_reqestor',
default_args = default_args,
catchup=False,
schedule_interval = None
)
def setup_driver(ti):
from libcloud.compute.types import Provider
from libcloud.compute.providers import get_driver
"""
Sets up Apache libcloud AWS ec2 node driver.
Args:
region: AWS region to perform credential check.
"""
try:
shopper_logger.info("Setting up node deployment driver.")
region = Variable.get('REGION')
cls = get_driver(Provider.EC2)
a_hook = aws_hook.AwsHook()
driver = cls(creds,region=region)
ti.xcom_push(XCOM_REQUESTOR_LIB_CLOUD_DRIVER, driver)
time.sleep(30)
setup_driver_task = PythonVirtualenvOperator(
task_id='setup_driver_task',
python_callable=setup_driver,
retries=0,
requirements=['apache-libcloud'],
python_version="3.8",
system_site_packages=False,
provide_context=True,
xcom_push=True,
dag=dag
)
setup_driver
Я не уверен, чего мне не хватает.
Ответ №1:
Скорее всего, это связано с отсутствием «virtualenv» в среде вашего воздушного потока.
Вы можете проверить (когда находитесь в той же среде, что и airflow) с virtualenv --version
помощью своего терминала.
Если он не находит «virtualenv», просто установите его с помощью:
pip install virtualenv
и это должно работать