Отправка задания Spark в Livy (в EMR) из Airflow (с использованием оператора Livy airflow)

#amazon-emr #airflow #livy

#amazon-emr #поток воздуха #livy

Вопрос:

Я пытаюсь запланировать задание в EMR с помощью оператора airflow livy. Вот пример кода, которому я следовал. Проблема здесь в том, что … нигде не указана строка подключения Livy (имя хоста и порт). Как мне указать имя хоста и порт сервера Livy для оператора?

Кроме того, у оператора есть параметр livy_conn_id , для которого в примере задано значение livy_conn_default . Это правильное значение ?… или я установил какое-то другое значение?

Ответ №1:

У вас должен быть livy_conn_default раздел «Подключения» на вкладке «Администратор» панели управления Airflow, если все в порядке, тогда да, вы можете использовать это. В противном случае вы можете изменить это или создать другой идентификатор подключения и использовать его в livy_conn_id

Комментарии:

1. Как livy_conn_default узнает, какой IP-адрес у вашего кластера EMR?

Ответ №2:

Есть 2 API, которые мы можем использовать для подключения Livy и Airflow:

  1. Использование LivyBatchOperator
  2. Использование LivyOperator

В следующем примере я рассмотрю API LivyOperator.

LivyOperator

Шаг 1: обновите конфигурацию livy:

Войдите в пользовательский интерфейс airflow -> перейдите на вкладку администратора -> Подключения -> Поиск livy. Нажмите на кнопку редактирования и обновите параметры хоста и порта.

Шаг 2: установите apache-airflow-providers-apache-livy

 pip install apache-airflow-providers-apache-livy
 

Шаг 3: Создайте файл данных в $AIRFLOW_HOME/dags каталоге.

vi $AIRFLOW_HOME/dags/livy_operator_sparkpi_dag.py

 from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.apache.livy.operators.livy import LivyOperator

default_args = {
    'owner': 'RangaReddy',
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
}

# Initiate DAG
livy_operator_sparkpi_dag = DAG(
    dag_id = "livy_operator_sparkpi_dag",
    default_args=default_args,
    schedule_interval='@once',
    start_date = datetime(2022, 3, 2),
    tags=['example', 'spark', 'livy']
)

# define livy task with LivyOperator
livy_sparkpi_submit_task = LivyOperator(
    file="/root/spark-3.2.1-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.1.jar",
    class_name="org.apache.spark.examples.SparkPi",
    driver_memory="1g",
    driver_cores=1,
    executor_memory="1g",
    executor_cores=2,
    num_executors=1,
    name="LivyOperator SparkPi",
    task_id="livy_sparkpi_submit_task",
    dag=livy_operator_sparkpi_dag,
)

begin_task = DummyOperator(task_id="begin_task")
end_task = DummyOperator(task_id="end_task")

begin_task >> livy_sparkpi_submit_task >> end_task
 
 LIVY_HOST=192.168.0.1
curl http://${LIVY_HOST}:8998/batches/0/log | python3 -m json.tool
 

Вывод:

 "Pi is roughly 3.14144103141441"