#docker #airflow
#docker #воздушный поток
Вопрос:
Я пытаюсь использовать оператор docker для автоматизации выполнения некоторых скриптов с использованием airflow.
Версия Airflow: apache-airflow==1.10.12
Что я хочу сделать, так это «скопировать» все файлы моего проекта (с папками и файлами) в контейнер, используя этот код.
В ml-intermediate.py
этом каталоге находится следующий файл ~/airflow/dags/ml-intermediate.py
:
"""
Template to convert a Ploomber DAG to Airflow
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from ploomber.spec import DAGSpec
from soopervisor.script.ScriptConfig import ScriptConfig
script_cfg = ScriptConfig.from_path('/home/letyndr/airflow/dags/ml-intermediate')
# Replace the project root to reflect the new location - or maybe just
# write a soopervisor.yaml, then we can we rid of this line
script_cfg.paths.project = '/home/letyndr/airflow/dags/ml-intermediate'
# TODO: use lazy_import from script_cfg
dag_ploomber = DAGSpec('/home/letyndr/airflow/dags/ml-intermediate/pipeline.yaml',
lazy_import=True).to_dag()
dag_ploomber.name = "ML Intermediate"
default_args = {
'start_date': days_ago(0),
}
dag_airflow = DAG(
dag_ploomber.name.replace(' ', '-'),
default_args=default_args,
description='Ploomber dag',
schedule_interval=None,
)
script_cfg.save_script()
from airflow.operators.docker_operator import DockerOperator
for task_name in dag_ploomber:
DockerOperator(task_id=task_name,
image="continuumio/miniconda3",
api_version="auto",
auto_remove=True,
# command="sh /home/letyndr/airflow/dags/ml-intermediate/script.sh",
command="sleep 600",
docker_url="unix://var/run/docker.sock",
volumes=[
"/home/letyndr/airflow/dags/ml-intermediate:/home/letyndr/airflow/dags/ml-intermediate:rw",
"/home/letyndr/airflow-data/ml-intermediate:/home/letyndr/airflow-data/ml-intermediate:rw"
],
working_dir=script_cfg.paths.project,
dag=dag_airflow,
container_name=task_name,
)
for task_name in dag_ploomber:
task_ploomber = dag_ploomber[task_name]
task_airflow = dag_airflow.get_task(task_name)
for upstream in task_ploomber.upstream:
task_airflow.set_upstream(dag_airflow.get_task(upstream))
dag = dag_airflow
Когда я выполняю этот DAG с использованием Airflow, я получаю сообщение об ошибке, что docker не находит /home/letyndr/airflow/dags/ml-intermediate/script.sh
скрипт. Я изменил команду выполнения оператора docker sleep 600
, чтобы войти в контейнер и проверить файлы в контейнере с правильными путями.
Например, когда я нахожусь в контейнере, я могу перейти по этому пути /home/letyndr/airflow/dags/ml-intermediate/
, но я не вижу файлов, которые должны быть там.
Я попытался воспроизвести, как Airflow реализует Docker SDK для Python, проверяя эту часть пакета docker operator Airflow, в частности, ту, где он создает контейнер docker: создание контейнера docker
Это моя единственная репликация реализации docker:
import docker
client = docker.APIClient()
# binds = {
# "/home/letyndr/airflow/dags": {
# "bind": "/home/letyndr/airflow/dags",
# "mode": "rw"
# },
# "/home/letyndr/airflow-data/ml-intermediate": {
# "bind": "/home/letyndr/airflow-data/ml-intermediate",
# "mode": "rw"
# }
# }
binds = ["/home/letyndr/airflow/dags:/home/letyndr/airflow/dags:rw",
"/home/letyndr/airflow-data/ml-intermediate:/home/letyndr/airflow-data/ml-intermediate:rw"]
container = client.create_container(
image="continuumio/miniconda3",
command="sleep 600",
volumes=["/home/letyndr/airflow/dags", "/home/letyndr/airflow-data/ml-intermediate"],
host_config=client.create_host_config(binds=binds),
working_dir="/home/letyndr/airflow/dags",
name="simple_example",
)
client.start(container=container.get("Id"))
Я обнаружил, что монтирование томов работает, только если оно установлено host_config
, и volumes
проблема в том, что реализация в Airflow просто установлена host_config
, но нет volumes
. Я добавил параметр в метод create_container
, он сработал.
Вы знаете, правильно ли я использую docker operator или это проблема?
Ответ №1:
Попробуйте использовать mounts
аргумент вместо volumes
. Именно так определяются объемы в документации / исходном коде Airflow.
Так что это должно выглядеть примерно так:
DockerOperator(task_id=task_name,
image="continuumio/miniconda3",
api_version="auto",
auto_remove=True,
# command="sh /home/letyndr/airflow/dags/ml-intermediate/script.sh",
command="sleep 600",
docker_url="unix://var/run/docker.sock",
mounts=[
"/home/letyndr/airflow/dags/ml-intermediate:/home/letyndr/airflow/dags/ml-intermediate:rw",
"/home/letyndr/airflow-data/ml-intermediate:/home/letyndr/airflow-data/ml-intermediate:rw"
],
working_dir=script_cfg.paths.project,
dag=dag_airflow,
container_name=task_name,
)
Это некоторые другие необязательные аргументы, которые могут быть полезны:
- host_tmp_dir: укажите местоположение временного каталога на хосте, к которому будет сопоставлено
tmp_dir
. Если не указано, по умолчанию используется стандартный системный временный каталог. - tmp_dir: точка монтирования внутри контейнера во временный каталог, созданный на хосте оператором tmp_dir:
РЕДАКТИРОВАТЬ: после дополнительного просмотра я вижу, что каждый mount
элемент должен быть типа Mount
from docker.types
. Аргумент volumes
также был переименован в mount
как часть списка изменений для Airflow 2.1. Вот пример из исходного кода Airflow.
Исправлено, код должен выглядеть примерно так
from docker.types import Mount
...
...
DockerOperator(task_id=task_name,
image="continuumio/miniconda3",
api_version="auto",
auto_remove=True,
# command="sh /home/letyndr/airflow/dags/ml-intermediate/script.sh",
command="sleep 600",
docker_url="unix://var/run/docker.sock",
mounts=[
Mount(
source='/home/letyndr/airflow/dags/ml-intermediate',
target='/home/letyndr/airflow/dags/ml-intermediate:rw',
type='bind'
)
],
working_dir=script_cfg.paths.project,
dag=dag_airflow,
container_name=task_name,
)