#python #airflow #mwaa
Вопрос:
Мне нужно извлечь данные из xcom в переменную python, которая будет преобразована с помощью некоторого регулярного выражения и передана дальше. Однако я нигде не могу найти, как я могу считывать данные из xcom без использования какого-либо оператора (непосредственно в коде python). Я использую MWAA на AWS с airflow 2.0.2 и играю с приведенным ниже фрагментом.
s3Path = ""
def pull_from_xcom(**context):
global s3Path
msg = context['ti'].xcom_pull(task_ids='sqs', key='messages')
s3Path = msg['Messages'][0]['Body']
SQSRUN = SQSSensor(
task_id='sqs',
poke_interval=0,
timeout=10,
sqs_queue=SQS_URL,
aws_conn_id=AWS)
xcomGet = PythonOperator(
task_id='xcom_pull',
python_callable=pull_from_xcom,
provide_context=True,
depends_on_past=False)
# s3Path Transformations
para1 = re.findall(r"(para1=w )",s3Path)
para2 = re.findall(r"(para2=w )",s3Path)
sparkstep = #Constructing dict using para1 and para2 for spark job submission
#Calling sparkStep
sparkTransform = EmrAddStepsOperator(
task_id='S3PathTransform',
job_flow_id=Variable.get("EMR"),
aws_conn_id=AWS,
steps=sparkstep,
)
#Further tasks in dag
Это не работает, так как оператор python будет работать после запуска dag, пока я использую преобразованное значение s3Path в другой оператор перед запуском dag. Я попытался установить значение s3Path в качестве переменной и прочитать его, но это не работает, потому что эта переменная не создается при загрузке dag.
Я вижу, что ti.xcom_pull(key=messages, task_ids='sqs')
это можно использовать для извлечения данных из xcom, но откуда мне взять ti? Есть ли какой-либо способ заставить экземпляр задачи работать с xcom без использования какого-либо оператора.
В основном вопрос в том, как получить значение, которое SQSRUN отправляет в xcom. Я не могу найти никакой документации или онлайн-ссылок о том, как использовать значение, полученное SQSSensor. Был бы очень признателен за некоторую помощь.
Ответ №1:
Я вижу, что ti.xcom_pull(ключ=сообщения, идентификаторы задач=’sqs’) можно использовать для извлечения данных из xcom, но откуда мне их взять
ti
?
ti
передается в контексте выполнения. Ваш фрагмент демонстрирует, как это делается.
Есть ли какой-либо способ заставить экземпляр задачи работать с xcom без использования какого-либо оператора?
Да, вы можете получить xcom, аналогично запросив базу данных, как это делает Airflow.
from airflow.utils.session import provide_session
from airflow.models.xcom import XCom
@provide_session
def get_sqs_messages(session):
query = XCom.get_many(
key="messages",
dag_ids="dag-id",
task_ids="sqs",
session=session,
limit=1
)
# ensure the most recent value is retrieved.
query = query.order_by("execution_date desc")
xcom = query.with_entities(XCom.value).first()
if xcom:
return XCom.deserialize_value(xcom)
В своем фрагменте вы, похоже, устанавливали глобальный s3Path
в своем модуле dag и переопределяли его значение в операторе.
EmrAddStepsOperator
инициализируется, когда модуль анализируется до начального значения, связанного с s3Path
.
Существует лучший способ, учитывая, что ваша цель-получить steps
значение для инициализации EmrAddStepsOperator
из значения xcom,
steps
кварги, передаваемые EmrAddStepsOperator
конструктору, являются шаблоном. Это означает, что вы можете указать строку шаблона Jinja2 для ее значения, и это будет включено во время инициализации экземпляра задачи.
sparkstep
может быть объявлено как:
sparkstep = "{{sparkstep_from_messsages(ti.xcom_pull(task_ids='sqs', key='messages'))}}"
sparkTransform = EmrAddStepsOperator(
task_id='S3PathTransform',
job_flow_id=Variable.get("EMR"),
aws_conn_id=AWS,
steps=sparkstep,
)
Там значение, полученное из xcom, передается функции с именем sparkstep_from_messages
, определенным следующим образом.
def sparkstep_from_messages(messages):
# s3Path Transformations
para1 = re.findall(r"(para1=w )",s3Path)
para2 = re.findall(r"(para2=w )",s3Path)
sparkstep = #Constructing dict using para1 and para2 for spark job submission
return sparkstep
Вы должны предоставить эту функцию в качестве user_defined_macros в вашей инициализации DAG, чтобы она была доступна в контексте шаблона.
user_defined_macros = dict(
sparkstep_from_messages=sparkstep_from_messages
)
dag = DAG(dag_id="sample-dag", user_defined_macros=user_defined_macros)
Комментарии:
1. После sqssensor этот фрагмент должен быть выполнен PythonOperator? если это так, то как я получу возвращаемое значение из этой функции. afaik PythonOperator просто запускает функцию python и не получает возвращаемое значение, которое можно использовать в дальнейшем.
2. Да, это должно быть выполнено после сеанса. Вы можете получить возвращаемое значение, вызвав функцию like
messages = get_sqs_messages()
. Вы можете передать значение в вызываемом python в xcom. Не могли бы вы поделиться своей трансформацией S3 в своем вопросе? Я думаю, что на самом деле вам может не понадобиться PythonOperator, извлекающий значения xcom из sqs.3. Я отредактировал свой вопрос с кодом о том, как я буду запускать некоторые преобразования регулярных выражений в s3path и использовать все эти входные данные для отправки задания spark в EMR.