#amazon-web-services #amazon-sqs #spring-cloud-aws #sqslistener
Вопрос:
Я пишу приложение для издателя/потребителя SQS с использованием Spring Cloud AWS 2.3.2
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-messaging</artifactId>
<version>2.3.2</version>
</dependency>
Я дошел до того, что могу успешно публиковать msg в своих SQS, но мой аннотированный метод @SqsListener не использует msg. Я посмотрел другие вопросы и ответы здесь, но, похоже, ни один из них не дал должного понимания для решения этой проблемы.
Я следую документам API здесь: https://docs.awspring.io/spring-cloud-aws/docs/current/reference/html/index.html#annotation-driven-listener-endpoints
Моя конфигурация определена следующим образом:
@Configuration
public class SqsMessagingConfig {
@Value("${cloud.aws.credentials.secret-key}")
private String secretKey;
@Value("${cloud.aws.credentials.access-key}")
private String accessKey;
private AWSCredentialsProvider awsCredentialsProvider() {
return new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey,
secretKey));
}
@Bean
@Primary
public AmazonSQSAsync amazonSQSAsync() {
return AmazonSQSAsyncClientBuilder
.standard()
.withRegion("us-east-2")
.withCredentials(awsCredentialsProvider())
.build();
}
@Bean
public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSQSAsync) {
return new QueueMessagingTemplate(amazonSQSAsync);
}
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQSAsync) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSQSAsync);
factory.setAutoStartup(true);
factory.setMaxNumberOfMessages(10);
return factory;
}
@Bean()
public QueueMessageHandlerFactory queueMessageHandlerFactory(final ObjectMapper mapper, final AmazonSQSAsync amazonSQSAsync) {
final QueueMessageHandlerFactory queueHandlerFactory = new QueueMessageHandlerFactory();
queueHandlerFactory.setAmazonSqs(amazonSQSAsync);
queueHandlerFactory.setArgumentResolvers(Collections.singletonList(new PayloadMethodArgumentResolver(jackson2MessageConverter(mapper))));
return queueHandlerFactory;
}
private MessageConverter jackson2MessageConverter(final ObjectMapper mapper) {
final MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setObjectMapper(mapper);
return converter;
}
}
И тогда мой SqsService выглядит следующим образом:
@Service
public class SqsQueueService {
private static final Logger logger = LoggerFactory.getLogger(SqsQueueService.class);
private final QueueMessagingTemplate queueMessagingTemplate;
private final ObjectWriter objectWriter;
private final String QUEUE_NAME = "SCHEDULES";
public SqsQueueService(QueueMessagingTemplate queueMessagingTemplate, ObjectMapper mapper) {
this.queueMessagingTemplate = queueMessagingTemplate;
this.objectWriter = mapper.writer();
}
public void send(List<Schedule> schedules) {
List<String> originatingIds = schedules.stream().map(Schedule::getOriginatingId).collect(Collectors.toList());
try {
Message<String> message = MessageBuilder.withPayload(objectWriter.writeValueAsString(schedules))
.build();
this.queueMessagingTemplate.convertAndSend(QUEUE_NAME, message);
logger.info("Successfully sent {} schedule(s) to SQS, with originatingId={}", schedules.size(),
originatingIds);
} catch (Exception e) {
logger.error("Failed to send the following schedule(s) to SQS=" originatingIds, e);
}
}
// NO_REDRIVE ensures we do not re-queue messages forever. They will be sent to a DLQ if they exceed maxReceiveCount
@SqsListener(value = "SCHEDULES", deletionPolicy = SqsMessageDeletionPolicy.NO_REDRIVE)
private void receiveMessage(List<Schedule> schedules) String partnerId) {
List<String> originatingIds = schedules.stream().map(Schedule::getOriginatingId).collect(Collectors.toList());
logger.info("Received request from SQS for originatingId={}", originatingIds);
try {
someService.createSchedules(schedules);
} catch (Exception e) {
throw new RuntimeException("An issue occurred during ingest for originatingId=" originatingIds, e);
}
}
}
Я также попробовал зависимость с автоматической настройкой aws, но это добавило массу дополнительного шума, и я все еще не мог заставить ее потреблять из SQS. Надеюсь, кто-нибудь заметит, где я что-то путаю/упускаю. Документы, которые я рассматривал как ссылку непосредственно от разработчиков spring, указывают на то, что я поступаю правильно, но, очевидно, это не так.
После того, как я отправлю сообщение в очередь, я вижу, что оно ждет, чтобы его использовали, но ничего не происходит. Мы будем очень признательны за любую помощь.