извлеките данные xcom за пределы любого оператора в потоке воздуха

#python #airflow #mwaa

Вопрос:

Мне нужно извлечь данные из xcom в переменную python, которая будет преобразована с помощью некоторого регулярного выражения и передана дальше. Однако я нигде не могу найти, как я могу считывать данные из xcom без использования какого-либо оператора (непосредственно в коде python). Я использую MWAA на AWS с airflow 2.0.2 и играю с приведенным ниже фрагментом.

 s3Path = ""
def pull_from_xcom(**context):
        global s3Path
        msg = context['ti'].xcom_pull(task_ids='sqs', key='messages')
        s3Path = msg['Messages'][0]['Body']

    SQSRUN = SQSSensor(
    task_id='sqs',
    poke_interval=0,
    timeout=10,
    sqs_queue=SQS_URL,
    aws_conn_id=AWS)

    xcomGet = PythonOperator(
    task_id='xcom_pull',
    python_callable=pull_from_xcom,
    provide_context=True,
    depends_on_past=False)

    # s3Path Transformations
    para1 = re.findall(r"(para1=w )",s3Path)
    para2 = re.findall(r"(para2=w )",s3Path)

    sparkstep = #Constructing dict using para1 and para2 for spark job submission

    #Calling sparkStep
    sparkTransform = EmrAddStepsOperator(
            task_id='S3PathTransform',
            job_flow_id=Variable.get("EMR"),
            aws_conn_id=AWS,
            steps=sparkstep,
        )
        #Further tasks in dag

 

Это не работает, так как оператор python будет работать после запуска dag, пока я использую преобразованное значение s3Path в другой оператор перед запуском dag. Я попытался установить значение s3Path в качестве переменной и прочитать его, но это не работает, потому что эта переменная не создается при загрузке dag.

Я вижу, что ti.xcom_pull(key=messages, task_ids='sqs') это можно использовать для извлечения данных из xcom, но откуда мне взять ti? Есть ли какой-либо способ заставить экземпляр задачи работать с xcom без использования какого-либо оператора.

В основном вопрос в том, как получить значение, которое SQSRUN отправляет в xcom. Я не могу найти никакой документации или онлайн-ссылок о том, как использовать значение, полученное SQSSensor. Был бы очень признателен за некоторую помощь.

Ответ №1:

Я вижу, что ti.xcom_pull(ключ=сообщения, идентификаторы задач=’sqs’) можно использовать для извлечения данных из xcom, но откуда мне их взять ti ?

ti передается в контексте выполнения. Ваш фрагмент демонстрирует, как это делается.

Есть ли какой-либо способ заставить экземпляр задачи работать с xcom без использования какого-либо оператора?

Да, вы можете получить xcom, аналогично запросив базу данных, как это делает Airflow.

 from airflow.utils.session import provide_session
from airflow.models.xcom import XCom

@provide_session
def get_sqs_messages(session):
    query = XCom.get_many(
        key="messages",
        dag_ids="dag-id",
        task_ids="sqs",
        session=session,
        limit=1
    )
    # ensure the most recent value is retrieved.
    query = query.order_by("execution_date desc")
    xcom = query.with_entities(XCom.value).first()

    if xcom:
       return XCom.deserialize_value(xcom)
 

В своем фрагменте вы, похоже, устанавливали глобальный s3Path в своем модуле dag и переопределяли его значение в операторе.
EmrAddStepsOperator инициализируется, когда модуль анализируется до начального значения, связанного с s3Path .

Существует лучший способ, учитывая, что ваша цель-получить steps значение для инициализации EmrAddStepsOperator из значения xcom,

steps кварги, передаваемые EmrAddStepsOperator конструктору, являются шаблоном. Это означает, что вы можете указать строку шаблона Jinja2 для ее значения, и это будет включено во время инициализации экземпляра задачи.

sparkstep может быть объявлено как:

 sparkstep = "{{sparkstep_from_messsages(ti.xcom_pull(task_ids='sqs', key='messages'))}}"

sparkTransform = EmrAddStepsOperator(
            task_id='S3PathTransform',
            job_flow_id=Variable.get("EMR"),
            aws_conn_id=AWS,
            steps=sparkstep,
        )
 

Там значение, полученное из xcom, передается функции с именем sparkstep_from_messages , определенным следующим образом.

 def sparkstep_from_messages(messages):
    # s3Path Transformations
    para1 = re.findall(r"(para1=w )",s3Path)
    para2 = re.findall(r"(para2=w )",s3Path)

    sparkstep = #Constructing dict using para1 and para2 for spark job submission
    return sparkstep
 

Вы должны предоставить эту функцию в качестве user_defined_macros в вашей инициализации DAG, чтобы она была доступна в контексте шаблона.

 user_defined_macros = dict(
    sparkstep_from_messages=sparkstep_from_messages
)

dag = DAG(dag_id="sample-dag", user_defined_macros=user_defined_macros)
 

Комментарии:

1. После sqssensor этот фрагмент должен быть выполнен PythonOperator? если это так, то как я получу возвращаемое значение из этой функции. afaik PythonOperator просто запускает функцию python и не получает возвращаемое значение, которое можно использовать в дальнейшем.

2. Да, это должно быть выполнено после сеанса. Вы можете получить возвращаемое значение, вызвав функцию like messages = get_sqs_messages() . Вы можете передать значение в вызываемом python в xcom. Не могли бы вы поделиться своей трансформацией S3 в своем вопросе? Я думаю, что на самом деле вам может не понадобиться PythonOperator, извлекающий значения xcom из sqs.

3. Я отредактировал свой вопрос с кодом о том, как я буду запускать некоторые преобразования регулярных выражений в s3path и использовать все эти входные данные для отправки задания spark в EMR.