Аннотированный метод SpringCloud AWS — SQSListener не получает сообщений

#amazon-web-services #amazon-sqs #spring-cloud-aws #sqslistener

Вопрос:

Я пишу приложение для издателя/потребителя SQS с использованием Spring Cloud AWS 2.3.2

 <dependency>
      <groupId>io.awspring.cloud</groupId>
      <artifactId>spring-cloud-aws-messaging</artifactId>
      <version>2.3.2</version>
</dependency>
 

Я дошел до того, что могу успешно публиковать msg в своих SQS, но мой аннотированный метод @SqsListener не использует msg. Я посмотрел другие вопросы и ответы здесь, но, похоже, ни один из них не дал должного понимания для решения этой проблемы.

Я следую документам API здесь: https://docs.awspring.io/spring-cloud-aws/docs/current/reference/html/index.html#annotation-driven-listener-endpoints

Моя конфигурация определена следующим образом:

 @Configuration
public class SqsMessagingConfig {

    @Value("${cloud.aws.credentials.secret-key}")
    private String secretKey;
    @Value("${cloud.aws.credentials.access-key}")
    private String accessKey;

    private AWSCredentialsProvider awsCredentialsProvider() {
        return new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey,
                secretKey));
    }

    @Bean
    @Primary
    public AmazonSQSAsync amazonSQSAsync() {
        return AmazonSQSAsyncClientBuilder
                .standard()
                .withRegion("us-east-2")
                .withCredentials(awsCredentialsProvider())
                .build();
    }

    @Bean
    public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSQSAsync) {
        return new QueueMessagingTemplate(amazonSQSAsync);
    }

    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQSAsync) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSQSAsync);
        factory.setAutoStartup(true);
        factory.setMaxNumberOfMessages(10);

        return factory;
    }

 
    @Bean()
    public QueueMessageHandlerFactory queueMessageHandlerFactory(final ObjectMapper mapper, final AmazonSQSAsync amazonSQSAsync) {
        final QueueMessageHandlerFactory queueHandlerFactory = new QueueMessageHandlerFactory();
        queueHandlerFactory.setAmazonSqs(amazonSQSAsync);
        queueHandlerFactory.setArgumentResolvers(Collections.singletonList(new PayloadMethodArgumentResolver(jackson2MessageConverter(mapper))));
        return queueHandlerFactory;
    }

    private MessageConverter jackson2MessageConverter(final ObjectMapper mapper) {
        final MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setObjectMapper(mapper);
        return converter;
    }
}
 

И тогда мой SqsService выглядит следующим образом:

 @Service
public class SqsQueueService {
    private static final Logger logger = LoggerFactory.getLogger(SqsQueueService.class);
    private final QueueMessagingTemplate queueMessagingTemplate;
    private final ObjectWriter objectWriter;
    private final String QUEUE_NAME = "SCHEDULES";

    public SqsQueueService(QueueMessagingTemplate queueMessagingTemplate, ObjectMapper mapper) {
        this.queueMessagingTemplate = queueMessagingTemplate;
        this.objectWriter = mapper.writer();
    }

    public void send(List<Schedule> schedules) {
        List<String> originatingIds = schedules.stream().map(Schedule::getOriginatingId).collect(Collectors.toList());
        try {
            Message<String> message = MessageBuilder.withPayload(objectWriter.writeValueAsString(schedules))
                    .build();

            this.queueMessagingTemplate.convertAndSend(QUEUE_NAME, message);
            logger.info("Successfully sent {} schedule(s) to SQS, with originatingId={}", schedules.size(),
                    originatingIds);
        } catch (Exception e) {
            logger.error("Failed to send the following schedule(s) to SQS="   originatingIds, e);
        }

    }

    // NO_REDRIVE ensures we do not re-queue messages forever. They will be sent to a DLQ if they exceed maxReceiveCount
    @SqsListener(value = "SCHEDULES", deletionPolicy = SqsMessageDeletionPolicy.NO_REDRIVE)
    private void receiveMessage(List<Schedule> schedules) String partnerId) {
        List<String> originatingIds = schedules.stream().map(Schedule::getOriginatingId).collect(Collectors.toList());
        logger.info("Received request from SQS for originatingId={}", originatingIds);
        try {
            someService.createSchedules(schedules);
        } catch (Exception e) {
            throw new RuntimeException("An issue occurred during ingest for originatingId="   originatingIds, e);
        }
    }

}
 

Я также попробовал зависимость с автоматической настройкой aws, но это добавило массу дополнительного шума, и я все еще не мог заставить ее потреблять из SQS. Надеюсь, кто-нибудь заметит, где я что-то путаю/упускаю. Документы, которые я рассматривал как ссылку непосредственно от разработчиков spring, указывают на то, что я поступаю правильно, но, очевидно, это не так.

После того, как я отправлю сообщение в очередь, я вижу, что оно ждет, чтобы его использовали, но ничего не происходит. Мы будем очень признательны за любую помощь.