#amazon-emr #airflow #livy
#amazon-emr #поток воздуха #livy
Вопрос:
Я пытаюсь запланировать задание в EMR с помощью оператора airflow livy. Вот пример кода, которому я следовал. Проблема здесь в том, что … нигде не указана строка подключения Livy (имя хоста и порт). Как мне указать имя хоста и порт сервера Livy для оператора?
Кроме того, у оператора есть параметр livy_conn_id
, для которого в примере задано значение livy_conn_default
. Это правильное значение ?… или я установил какое-то другое значение?
Ответ №1:
У вас должен быть livy_conn_default
раздел «Подключения» на вкладке «Администратор» панели управления Airflow, если все в порядке, тогда да, вы можете использовать это. В противном случае вы можете изменить это или создать другой идентификатор подключения и использовать его в livy_conn_id
Комментарии:
1. Как livy_conn_default узнает, какой IP-адрес у вашего кластера EMR?
Ответ №2:
Есть 2 API, которые мы можем использовать для подключения Livy и Airflow:
- Использование LivyBatchOperator
- Использование LivyOperator
В следующем примере я рассмотрю API LivyOperator.
LivyOperator
Шаг 1: обновите конфигурацию livy:
Войдите в пользовательский интерфейс airflow -> перейдите на вкладку администратора -> Подключения -> Поиск livy. Нажмите на кнопку редактирования и обновите параметры хоста и порта.
Шаг 2: установите apache-airflow-providers-apache-livy
pip install apache-airflow-providers-apache-livy
Шаг 3: Создайте файл данных в $AIRFLOW_HOME/dags
каталоге.
vi $AIRFLOW_HOME/dags/livy_operator_sparkpi_dag.py
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.apache.livy.operators.livy import LivyOperator
default_args = {
'owner': 'RangaReddy',
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
# Initiate DAG
livy_operator_sparkpi_dag = DAG(
dag_id = "livy_operator_sparkpi_dag",
default_args=default_args,
schedule_interval='@once',
start_date = datetime(2022, 3, 2),
tags=['example', 'spark', 'livy']
)
# define livy task with LivyOperator
livy_sparkpi_submit_task = LivyOperator(
file="/root/spark-3.2.1-bin-hadoop3.2/examples/jars/spark-examples_2.12-3.2.1.jar",
class_name="org.apache.spark.examples.SparkPi",
driver_memory="1g",
driver_cores=1,
executor_memory="1g",
executor_cores=2,
num_executors=1,
name="LivyOperator SparkPi",
task_id="livy_sparkpi_submit_task",
dag=livy_operator_sparkpi_dag,
)
begin_task = DummyOperator(task_id="begin_task")
end_task = DummyOperator(task_id="end_task")
begin_task >> livy_sparkpi_submit_task >> end_task
LIVY_HOST=192.168.0.1
curl http://${LIVY_HOST}:8998/batches/0/log | python3 -m json.tool
Вывод:
"Pi is roughly 3.14144103141441"