Агрегированное будущее / ответ для подтверждений SQS / ошибка в нескольких очередях с помощью SQS интеграции Spring

#spring-integration #spring-integration-aws

#весенняя интеграция #spring-интеграция-aws

Вопрос:

Я использую Gateway и внутри шлюза я перебираю цикл, чтобы отправить сообщение в 2 очереди через sqs-outbound adapter .

Я хочу добиться чего-то подобного:

 jsonArray.forEach(data -> {
    Future<AggregatedAckorErrorFrom2Queues> result = dataPublishGateway.publishToService(data);
    futures.add(result);
});
futures.forEach(f -> {                      
    System.out.println("Future Result -> "  f.get())
});
  

Конфигурация шлюза и адаптера SQS

 <!-- Gateway to Publish to SQS -->
<task:executor id="dataExecutor" pool-size="5"/>
<int:publish-subscribe-channel id="dataChannel" task-executor="dataExecutor" />
<int:gateway service-interface="com.integration.gateway.PublishGateway" id="dataPublishGateway">
    <int:method name="publishToDataService" payload-expression="#args[0]" request-channel="dataChannel" />
</int:gateway>

<!-- Send to 2 Queues -->

    <int:channel id="successChannel"/>
    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
                                          queue="queue-1"
                                          channel="dataChannel"
                                          success-channel="successChannel"
                                          failure-channel="errorChannel"/>
    <int-aws:sqs-outbound-channel-adapter sqs="amazonSQS"
                                          queue="queue-2"
                                          channel="dataChannel"
                                          success-channel="successChannel"
                                          failure-channel="errorChannel"/>
  

Я смотрю на apply-sequence на dataChannel и aggregator , но aggregator должен иметь возможность обрабатывать оба ack и error .

Вопрос: Как я могу вернуть агрегированный ответ (подтверждение ошибка) из 2 очередей обратно на шлюз?

Ответ №1:

Вы находитесь в правильном направлении с этими success-channel и failure-channel . Агрегатор может подписаться на это successChannel и выполнить корреляцию и выпуск, используя стратегии по умолчанию. С failure-channel вам, вероятно, нужен пользовательский канал ошибок, а не один глобальный errorChannel . ErrorMessage Отправленный на этот канал из SqsMessageHandler имеет полезную нагрузку, подобную этой AwsRequestFailureException , где она failedMessage действительно содержит сообщение запроса с указанными деталями корреляции. Итак, вам нужно добавить некоторый шаг преобразования, чтобы извлечь эту информацию из исключения, прежде чем перейти к агрегатору.

Комментарии:

1. Как я могу вернуть результат шлюзу из этого вызова, скажем, поток 1 -> Future<AggregatedAckorErrorFrom2Queues> result = dataPublishGateway.publishToService(data) . Поскольку dataPublishGateway публикуется в 2 очередях SQS по каналу pub / sub ( dataChannel ), как вызывающий поток (поток 1 выше) может получать результаты от обратных вызовов этих 2 очередей?

2. Агрегатор сделает это автоматически на основе replyChannel заголовка одного из сообщений, которые вы собираетесь агрегировать. И ожидающий поток в шлюзе выполнит корреляцию для этого ответа.

3. Поток не ожидает, поскольку подписчик на канал данных является подписчиком на основе исполнителя. Чего-нибудь мне не хватает?

4. Вы инициируете вызов из dataPublishGateway , поэтому действительно существует поведение запроса-ответа. Даже если это Future результат, он все равно будет выполнен ответным сообщением из этого replyChannel заголовка.

5. Ну, нужно посмотреть, что вы там делаете в <chain>