Асинхронная маршрутизация ответов AWS для успешного прохождения канала для sqs-исходящего канала-адаптера

#spring-integration #amazon-sqs #spring-integration-aws

#spring-интеграция #amazon-sqs #spring-интеграция-aws

Вопрос:

Я зарегистрировал, AsyncHandler а также добавил success-channel в исходящий поток SQS. У success-channel есть int:logging-channel-adapter конечная точка. Однако я не могу видеть никаких журналов с этого адаптера. AsyncHandler Может принимать обратные вызовы, но ничего на success-channel . В SqsMessageHandler я вижу, что мы устанавливаем выходной канал в obtainAsyncHandler методе, но я нигде не видел success-channel установленного. Я что-то упускаю? Я бы предпочел использовать каналы success и failure, а не обратный вызов AsyncHandler Impl, чтобы избежать использования специфичного для AWS кода в моих классах.

Также мой <int-aws:sqs-outbound-channel-adapter> находится внутри <int:chain> , у которого нет выходного канала, поскольку поток заканчивается при отправке сообщения.

РЕДАКТИРОВАТЬ — Добавлена конфигурация Это единственный способ, которым я могу заставить его регистрировать обратный вызов.

     <int:channel id="chainChannel" />
    <int:channel id="successChannel" />
    <bean class="ServiceTransformer" id="serviceTransformer" />
    <int:chain input-channel="serviceChannel" id="sendToServiceSqsChain" output-channel="chainChannel">
        <int:transformer ref="serviceTransformer" method="transform" />
        <int:header-filter header-names="config" />
        <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS" queue="some-queue" async-handler="sqsPublishCallbackHandler" success-channel="successChannel"/>
    </int:chain>

    <int:logging-channel-adapter log-full-message="true" channel="chainChannel" />
  

Здесь я могу просто использовать один и тот же канал как в цепочке (исходящий канал), так и в sqs-outbound (success-channel)

Не удается заставить его работать, как показано ниже:

         <int:channel id="successChannel" />
        <bean class="ServiceTransformer" id="serviceTransformer" />
        <int:chain input-channel="serviceChannel" id="sendToServiceSqsChain" >
            <int:transformer ref="serviceTransformer" method="transform" />
            <int:header-filter header-names="config" />
            <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS" queue="some-queue" async-handler="sqsPublishCallbackHandler" success-channel="successChannel"/>
        </int:chain>
    
        <int:logging-channel-adapter log-full-message="true" channel="successChannel" />
  

Ответ №1:

<int-aws:sqs-outbound-channel-adapter> Компонент является односторонним, поэтому нет outputChannel опции expose. Однако целевой класс является AbstractMessageProducingHandler . Чтобы избежать дублирования кода, мы повторно используем существующий outputChannel внутренний для этого AsyncHandler .

В анализаторе XML мы просто переназначаем одно на другое:

 IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "success-channel", "outputChannel");
  

Вероятно, вы ничего не видите в журналах, потому что вам нужно настроить конфигурацию ведения журнала соответственно для соответствующей категории и уровня.

Обновить

Согласно моему тестированию, определенно невозможно настроить такой компонент с XML DSL в <chain> . Это <int-aws:sqs-outbound-channel-adapter> должно быть представлено за пределами <chain> .

Рассмотрите возможность дополнительной настройки на Java DSL вместо этого: https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/dsl.html#java-dsl .

Комментарии:

1. Я думаю, проблема в том, что мой <int-aws: sqs-outbound-channel-adapter> находится внутри <int: цепочки без выходного канала>. Следовательно, сообщение не перенаправляется на success-channel. Он (success-channel) работает только тогда, когда я добавляю выходной канал в цепочку. Как справиться с этим сценарием без создания выходного канала в <int: chain>?

2. Переместите его за пределы chain . Было бы здорово просмотреть вашу конфигурацию для подтверждения. В противном случае это все предположения

3. Я добавил конфигурацию с помощью РЕДАКТИРОВАНИЯ к основному вопросу, прошу вас просмотреть ее.

4. ОК. В любом случае. Как это работает, если оно настроено за пределами chain ?

5. Смотрите ОБНОВЛЕНИЕ в моем ответе.