Воздушный поток PythonVirtualenvOperator, нет такого файла или каталога: ‘virtualenv’

#python #airflow

#python #воздушный поток

Вопрос:

Я пытаюсь запустить Apache Airflow PythonVirtualenvOperator в одном из моих баз данных, но Airflow выдает следующую ошибку:

 [2020-12-14 20:06:32,291] {python_operator.py:316} INFO - Executing cmd
['virtualenv', '/tmp/venvwtqb3rki', '--python=python3.8']
[2020-12-14 20:06:32,301] {taskinstance.py:1150} ERROR - [Errno 2] No such file or directory: 'virtualenv'
Traceback (most recent call last):
  File "/opt/airflow/airflow_env/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/opt/airflow/airflow_env/lib/python3.8/site-packages/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/opt/airflow/airflow_env/lib/python3.8/site-packages/airflow/operators/python_operator.py", line 292, in execute_callable
    self._execute_in_subprocess(self._generate_virtualenv_cmd(tmp_dir))
  File "/opt/airflow/airflow_env/lib/python3.8/site-packages/airflow/operators/python_operator.py", line 317, in _execute_in_subprocess
    output = subprocess.check_output(cmd,
  File "/usr/lib/python3.8/subprocess.py", line 411, in check_output
    return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
  File "/usr/lib/python3.8/subprocess.py", line 489, in run
    with Popen(*popenargs, **kwargs) as process:
  File "/usr/lib/python3.8/subprocess.py", line 854, in __init__
    self._execute_child(args, executable, preexec_fn, close_fds,
  File "/usr/lib/python3.8/subprocess.py", line 1702, in _execute_child
    raise child_exception_type(errno_num, err_msg, err_filename)
 

У меня есть Airflow и все мои базы данных, работающие от имени пользователя airflow. Я подумал, что, возможно, airflow не может найти команду virutalenv на своем пути во время настройки / выполнения задачи.

Вот код, который у меня есть в настоящее время для тестирования.

 import logging
import datetime
from airflow import DAG
import airflow
from airflow.hooks.S3_hook import S3Hook
from airflow.contrib.hooks import aws_hook
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator, PythonVirtualenvOperator
from airflow.utils.dates import days_ago
import time

default_args = {
    'owner':'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'retries': 0
}

dag = DAG (
    dag_id = 'list_reqestor',
    default_args = default_args,
    catchup=False,
    schedule_interval = None
    
)


def setup_driver(ti):
    from libcloud.compute.types import Provider
    from libcloud.compute.providers import get_driver
    """
    Sets up Apache libcloud AWS ec2 node driver.

    Args:
        region: AWS region to perform credential check.
    """
    try:
        shopper_logger.info("Setting up node deployment driver.")
        region = Variable.get('REGION')
        cls = get_driver(Provider.EC2)
        a_hook = aws_hook.AwsHook()
        driver = cls(creds,region=region)
        ti.xcom_push(XCOM_REQUESTOR_LIB_CLOUD_DRIVER, driver)   
        time.sleep(30)     

setup_driver_task = PythonVirtualenvOperator(
    task_id='setup_driver_task',
    python_callable=setup_driver,
    retries=0,
    requirements=['apache-libcloud'],
    python_version="3.8",
    system_site_packages=False,
    provide_context=True,
    xcom_push=True,
    dag=dag
)

setup_driver
 

Я не уверен, чего мне не хватает.

Ответ №1:

Скорее всего, это связано с отсутствием «virtualenv» в среде вашего воздушного потока.

Вы можете проверить (когда находитесь в той же среде, что и airflow) с virtualenv --version помощью своего терминала.

Если он не находит «virtualenv», просто установите его с помощью:

 pip install virtualenv
 

и это должно работать