#amazon-web-services #amazon-emr #airflow #message-passing
Вопрос:
В airflow я выполняю jar с помощью emrcreatejobflowоператора в кластере EMR. Это задание записывает некоторые данные в S3, и я хочу передать этот путь S3 на следующий шаг в dag воздушного потока, так как этот путь будет динамичным для каждого запуска.
Я думаю, что XCom не полезен в этом сценарии, так как моя работа основана на Java, и она просто выполняется в виде jar на EMR. Кроме того, я не хочу, чтобы мой Java-код был специфичным для воздушного потока, так как я также хочу использовать его отдельно.
Какие-либо решения для достижения этой цели?
Ответ №1:
Я думаю, что XCom — ваш единственный вариант. Когда вы запускаете EmrCreateJobFlowОператор, за ним должен следовать EmrJobFlowSensor. Вы можете переопределить датчик своим собственным классом, я полагаю, и получить путь s3, в котором записаны данные (я полагаю, вы можете вернуть его датчику в некоторых метаданных состояния), — тогда ваш пользовательский сенсор может передать данные в xcom, и следующий оператор с зависимостью от датчика) может прочитать этот путь s3 из xcom.
Комментарии:
1. Как и куда записывать данные из кода Java, т. е. путь S3? Кроме того, есть ли какой-либо способ получить идентификатор кластера EMR в моем Java-коде и на следующем шаге в DAG airflow? Может быть, я смогу использовать это в качестве префикса в пути S3.
2. Я не знаю подробностей о Java-коде. Вам нужно каким-то образом передать нужные данные в результатах задания (которые, я думаю, можно добавить в качестве метаданных), которые вы сможете увидеть по завершении задания.
Ответ №2:
Для пути S3 вы можете использовать xcom, чтобы передать его на следующий шаг, или либо сделать хэшированную/префиксированную версию фактического имени в качестве пути, чтобы впоследствии к нему можно было получить доступ.
Для повторного создания кластера При использовании EmrCreateJobFlowOperator для создания задания вы можете получить доступ output
к ключу возвращаемого объекта, чтобы получить идентификатор jobflow_id. Это также будет идентификатор кластера
job_flow_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
)
job_sensor = EmrJobFlowSensor(
task_id='check_job_flow',
job_flow_id=job_flow_creator.output,
aws_conn_id='aws_default',
)