не работает монтирование каталогов с помощью docker operator на airflow

#docker #airflow

#docker #воздушный поток

Вопрос:

Я пытаюсь использовать оператор docker для автоматизации выполнения некоторых скриптов с использованием airflow.

Версия Airflow: apache-airflow==1.10.12

Что я хочу сделать, так это «скопировать» все файлы моего проекта (с папками и файлами) в контейнер, используя этот код.

В ml-intermediate.py этом каталоге находится следующий файл ~/airflow/dags/ml-intermediate.py :

 """
Template to convert a Ploomber DAG to Airflow
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

from ploomber.spec import DAGSpec
from soopervisor.script.ScriptConfig import ScriptConfig

script_cfg = ScriptConfig.from_path('/home/letyndr/airflow/dags/ml-intermediate')
# Replace the project root to reflect the new location - or maybe just
# write a soopervisor.yaml, then we can we rid of this line
script_cfg.paths.project = '/home/letyndr/airflow/dags/ml-intermediate'

# TODO: use lazy_import from script_cfg
dag_ploomber = DAGSpec('/home/letyndr/airflow/dags/ml-intermediate/pipeline.yaml',
                       lazy_import=True).to_dag()
dag_ploomber.name = "ML Intermediate"

default_args = {
    'start_date': days_ago(0),
}

dag_airflow = DAG(
    dag_ploomber.name.replace(' ', '-'),
    default_args=default_args,
    description='Ploomber dag',
    schedule_interval=None,
)

script_cfg.save_script()

from airflow.operators.docker_operator import DockerOperator
for task_name in dag_ploomber:
    DockerOperator(task_id=task_name,
        image="continuumio/miniconda3",
        api_version="auto",
        auto_remove=True,
        # command="sh /home/letyndr/airflow/dags/ml-intermediate/script.sh",
        command="sleep 600",
        docker_url="unix://var/run/docker.sock",
        volumes=[
            "/home/letyndr/airflow/dags/ml-intermediate:/home/letyndr/airflow/dags/ml-intermediate:rw",
            "/home/letyndr/airflow-data/ml-intermediate:/home/letyndr/airflow-data/ml-intermediate:rw"
        ],
        working_dir=script_cfg.paths.project,
        dag=dag_airflow,
        container_name=task_name,
    )



for task_name in dag_ploomber:
    task_ploomber = dag_ploomber[task_name]
    task_airflow = dag_airflow.get_task(task_name)

    for upstream in task_ploomber.upstream:
        task_airflow.set_upstream(dag_airflow.get_task(upstream))

dag = dag_airflow
 

Когда я выполняю этот DAG с использованием Airflow, я получаю сообщение об ошибке, что docker не находит /home/letyndr/airflow/dags/ml-intermediate/script.sh скрипт. Я изменил команду выполнения оператора docker sleep 600 , чтобы войти в контейнер и проверить файлы в контейнере с правильными путями.

Например, когда я нахожусь в контейнере, я могу перейти по этому пути /home/letyndr/airflow/dags/ml-intermediate/ , но я не вижу файлов, которые должны быть там.

Я попытался воспроизвести, как Airflow реализует Docker SDK для Python, проверяя эту часть пакета docker operator Airflow, в частности, ту, где он создает контейнер docker: создание контейнера docker

Это моя единственная репликация реализации docker:

 import docker

client = docker.APIClient()

# binds = {
#         "/home/letyndr/airflow/dags": {
#             "bind": "/home/letyndr/airflow/dags",
#             "mode": "rw"
#         },
#         "/home/letyndr/airflow-data/ml-intermediate": {
#             "bind": "/home/letyndr/airflow-data/ml-intermediate",
#             "mode": "rw"
#         }
#     }

binds = ["/home/letyndr/airflow/dags:/home/letyndr/airflow/dags:rw",
"/home/letyndr/airflow-data/ml-intermediate:/home/letyndr/airflow-data/ml-intermediate:rw"]

container = client.create_container(
    image="continuumio/miniconda3",
    command="sleep 600",
    volumes=["/home/letyndr/airflow/dags", "/home/letyndr/airflow-data/ml-intermediate"],
    host_config=client.create_host_config(binds=binds),
    working_dir="/home/letyndr/airflow/dags",
    name="simple_example",
)

client.start(container=container.get("Id"))

 

Я обнаружил, что монтирование томов работает, только если оно установлено host_config , и volumes проблема в том, что реализация в Airflow просто установлена host_config , но нет volumes . Я добавил параметр в метод create_container , он сработал.

Вы знаете, правильно ли я использую docker operator или это проблема?

Ответ №1:

Попробуйте использовать mounts аргумент вместо volumes . Именно так определяются объемы в документации / исходном коде Airflow.

Так что это должно выглядеть примерно так:

 DockerOperator(task_id=task_name,
        image="continuumio/miniconda3",
        api_version="auto",
        auto_remove=True,
        # command="sh /home/letyndr/airflow/dags/ml-intermediate/script.sh",
        command="sleep 600",
        docker_url="unix://var/run/docker.sock",
        mounts=[
            "/home/letyndr/airflow/dags/ml-intermediate:/home/letyndr/airflow/dags/ml-intermediate:rw",
            "/home/letyndr/airflow-data/ml-intermediate:/home/letyndr/airflow-data/ml-intermediate:rw"
        ],
        working_dir=script_cfg.paths.project,
        dag=dag_airflow,
        container_name=task_name,
    )
 

Это некоторые другие необязательные аргументы, которые могут быть полезны:

  1. host_tmp_dir: укажите местоположение временного каталога на хосте, к которому будет сопоставлено tmp_dir . Если не указано, по умолчанию используется стандартный системный временный каталог.
  2. tmp_dir: точка монтирования внутри контейнера во временный каталог, созданный на хосте оператором tmp_dir:

РЕДАКТИРОВАТЬ: после дополнительного просмотра я вижу, что каждый mount элемент должен быть типа Mount from docker.types . Аргумент volumes также был переименован в mount как часть списка изменений для Airflow 2.1. Вот пример из исходного кода Airflow.

Исправлено, код должен выглядеть примерно так

 from docker.types import Mount 

...
...

DockerOperator(task_id=task_name,
    image="continuumio/miniconda3",
    api_version="auto",
    auto_remove=True,
    # command="sh /home/letyndr/airflow/dags/ml-intermediate/script.sh",
    command="sleep 600",
    docker_url="unix://var/run/docker.sock",
    mounts=[
        Mount(
            source='/home/letyndr/airflow/dags/ml-intermediate', 
            target='/home/letyndr/airflow/dags/ml-intermediate:rw', 
            type='bind'
        )
    ],
    working_dir=script_cfg.paths.project,
    dag=dag_airflow,
    container_name=task_name,
)