Атрибуты сообщений в spring-интеграция-потоки aws приводят к зацикливанию сообщений SQS, где их лучше фильтровать?

#spring-integration #spring-cloud-aws

#spring-интеграция #spring-cloud-aws

Вопрос:

Итак, я создал пару коротких потоков, используя SQS в качестве резервного хранилища между сегментами обработки. Основной поток:

 RestController -> SQS Queue1 OCA
  SQS Queue1 MDCA -> Service-Adapter -> SQS Queue2 OCA
  SQS Queue2 MDCA -> Service Adapter
  

Однако я столкнулся с парой проблем…
«SQS Queue1 MDCA» считывает сообщения из очереди с определенными заголовками сообщений AWS, которые в конечном итоге попадают на исходящий адаптер, записывающий в Queue2. В этих заголовках сообщений указываются такие вещи, как имя AWS_queue, идентификатор сообщения aws и т. Д., Которые приводят к повторной доставке сообщения в очередь 1.

Как только я отфильтровал эти заголовки (я использовал простой пользовательский преобразователь, чтобы проверить это), атрибуты адаптера исходящего канала работают должным образом, и QueueMessagingTemplate доставляет сообщение в соответствующую очередь.

Итак, мой вопрос к экспертам spring-int и spring-aws: «Как вы думаете, где подходящее место для фильтрации любых уже существующих заголовков SQS, чтобы они не были обнаружены какой-либо последующей обработкой SQS?» Мне кажется, что вы хотели бы сделать это в SQSHandler, поскольку они могут быть актуальны в любых базовых вызовах spring-aws-messaging.

В качестве примечания, при создании сообщения из SQS ICA и преобразовании объекта в json в потоке это также привело к созданию 12 заголовков сообщений, что на 2 больше, чем разрешено SQS (что приводит к сбою доставки сообщений).

Пример сообщения с заголовками, слегка измененными для защиты невинных очередей… Как вы можете видеть, aws_queue, destination и т. Д. Все еще Находятся в сообщении после чтения из «wrench_workflow-mjg», так что при попытке доставки в следующую очередь эти заголовки перезаписывают конфигурацию в конфигурации xml spring-int и никогда не доставляются в следующую очередь. Сообщение просто продолжало циклически перемещаться по очереди SQS «wrench_workflow-mjg» (после устранения проблемы с более чем 10 атрибутами).

 GenericMessage [payload={"eventId":"event-1","eventStartDateTime":1476127827.201000000}, 
headers={aws_messageId=db9b6cc0-f133-4182-b79c-4d5d9717a3a9, ApproximateReceiveCount=1, 
SentTimestamp=1476127827803, id=0b662b72-f149-a970-5e63-64a1b28290fb, 
SenderId=AIDAJOYV7TECZCZCOK22C,
aws_receiptHandle=AQEBdaToWo9utbjBwADeSuChX/KrY3l8eoSBMZ0RMmbI8ewjJQw6tV74RwSYxWPKZBSzgJhCmfJ8AUX 
reOy2yGrmefULU7NS8nqYTdMW6BB4Ug2 mHIY 8Tze 2ws15FB5t96q3iJO8 tP5pl/xuo CiTDv 
L1rlYkVODD0Yv1zosGKx48IhGOXdL8nJ4Im8bujcUNLJy/vEYD8Qcfsi6NHOF3Qn0A4Xw Nva0wp86zfq,
aws_queue=wrench_workflow-mjg,
lookupDestination=wrench_workflow-mjg, 
ApproximateFirstReceiveTimestamp=1476127827886, timestamp=1476127836254}
  

Я, вероятно, могу собрать пример, если это необходимо, но я надеюсь, вы понимаете, к чему я клоню.

Ответ №1:

Вероятно, нам следует добавить средства сопоставления заголовков в адаптеры, чтобы включить выборочное сопоставление заголовков, как у нас для других технологий в Spring Integration.

Я открыл проблему с GitHub.

В то же время вы можете поместить <int:header-filter.../> (или HeaderFilter transformer) непосредственно перед исходящим адаптером, чтобы удалить ненужные заголовки.

Вы также можете добавить пользовательский обработчик запросов к исходящему адаптеру.

Комментарии:

1. просто с головы до ног, где пример, в котором реализовано приведенное выше сопоставление заголовков?

2. Непосредственно перед исходящим адаптером; если вы можете сказать мне, используете ли вы конфигурацию xml или @Bean, я могу отредактировать свой ответ с помощью config .

3. Спасибо, но не обязательно. Мы довольно заядлые пользователи spring-int и уже внедрили предложенный вами обходной путь. Если мы сможем запустить этот проект, и у меня будет немного времени, я тоже попытаюсь решить проблему с GitHub.

4. (О, и мы склонны использовать XML для определений потоков — по иронии судьбы, кажется, легче увидеть потоки, используя это, чем более длинную конфигурацию java. Хотя java DSL выглядит довольно хорошо для простых потоков.

5. Мой первый ответ касался вопроса о проблеме с github — я должен был спросить там. Еще раз спасибо!