#spring-integration #spring-integration-aws
#весенняя интеграция #spring-интеграция-aws
Вопрос:
Я использую Gateway
и внутри шлюза я перебираю цикл, чтобы отправить сообщение в 2 очереди через sqs-outbound adapter
.
Я хочу добиться чего-то подобного:
jsonArray.forEach(data -> {
Future<AggregatedAckorErrorFrom2Queues> result = dataPublishGateway.publishToService(data);
futures.add(result);
});
futures.forEach(f -> {
System.out.println("Future Result -> " f.get())
});
Конфигурация шлюза и адаптера SQS
<!-- Gateway to Publish to SQS -->
<task:executor id="dataExecutor" pool-size="5"/>
<int:publish-subscribe-channel id="dataChannel" task-executor="dataExecutor" />
<int:gateway service-interface="com.integration.gateway.PublishGateway" id="dataPublishGateway">
<int:method name="publishToDataService" payload-expression="#args[0]" request-channel="dataChannel" />
</int:gateway>
<!-- Send to 2 Queues -->
<int:channel id="successChannel"/>
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
queue="queue-1"
channel="dataChannel"
success-channel="successChannel"
failure-channel="errorChannel"/>
<int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
queue="queue-2"
channel="dataChannel"
success-channel="successChannel"
failure-channel="errorChannel"/>
Я смотрю на apply-sequence
на dataChannel
и aggregator
, но aggregator
должен иметь возможность обрабатывать оба ack
и error
.
Вопрос: Как я могу вернуть агрегированный ответ (подтверждение ошибка) из 2 очередей обратно на шлюз?
Ответ №1:
Вы находитесь в правильном направлении с этими success-channel
и failure-channel
. Агрегатор может подписаться на это successChannel
и выполнить корреляцию и выпуск, используя стратегии по умолчанию. С failure-channel
вам, вероятно, нужен пользовательский канал ошибок, а не один глобальный errorChannel
. ErrorMessage
Отправленный на этот канал из SqsMessageHandler
имеет полезную нагрузку, подобную этой AwsRequestFailureException
, где она failedMessage
действительно содержит сообщение запроса с указанными деталями корреляции. Итак, вам нужно добавить некоторый шаг преобразования, чтобы извлечь эту информацию из исключения, прежде чем перейти к агрегатору.
Комментарии:
1. Как я могу вернуть результат шлюзу из этого вызова, скажем, поток 1 ->
Future<AggregatedAckorErrorFrom2Queues> result = dataPublishGateway.publishToService(data)
. ПосколькуdataPublishGateway
публикуется в 2 очередях SQS по каналу pub / sub (dataChannel
), как вызывающий поток (поток 1 выше) может получать результаты от обратных вызовов этих 2 очередей?2. Агрегатор сделает это автоматически на основе
replyChannel
заголовка одного из сообщений, которые вы собираетесь агрегировать. И ожидающий поток в шлюзе выполнит корреляцию для этого ответа.3. Поток не ожидает, поскольку подписчик на канал данных является подписчиком на основе исполнителя. Чего-нибудь мне не хватает?
4. Вы инициируете вызов из
dataPublishGateway
, поэтому действительно существует поведение запроса-ответа. Даже если этоFuture
результат, он все равно будет выполнен ответным сообщением из этогоreplyChannel
заголовка.5. Ну, нужно посмотреть, что вы там делаете в
<chain>