#spring-integration #spring-cloud-aws
#spring-интеграция #spring-cloud-aws
Вопрос:
Итак, я создал пару коротких потоков, используя SQS в качестве резервного хранилища между сегментами обработки. Основной поток:
RestController -> SQS Queue1 OCA
SQS Queue1 MDCA -> Service-Adapter -> SQS Queue2 OCA
SQS Queue2 MDCA -> Service Adapter
Однако я столкнулся с парой проблем…
«SQS Queue1 MDCA» считывает сообщения из очереди с определенными заголовками сообщений AWS, которые в конечном итоге попадают на исходящий адаптер, записывающий в Queue2. В этих заголовках сообщений указываются такие вещи, как имя AWS_queue, идентификатор сообщения aws и т. Д., Которые приводят к повторной доставке сообщения в очередь 1.
Как только я отфильтровал эти заголовки (я использовал простой пользовательский преобразователь, чтобы проверить это), атрибуты адаптера исходящего канала работают должным образом, и QueueMessagingTemplate доставляет сообщение в соответствующую очередь.
Итак, мой вопрос к экспертам spring-int и spring-aws: «Как вы думаете, где подходящее место для фильтрации любых уже существующих заголовков SQS, чтобы они не были обнаружены какой-либо последующей обработкой SQS?» Мне кажется, что вы хотели бы сделать это в SQSHandler, поскольку они могут быть актуальны в любых базовых вызовах spring-aws-messaging.
В качестве примечания, при создании сообщения из SQS ICA и преобразовании объекта в json в потоке это также привело к созданию 12 заголовков сообщений, что на 2 больше, чем разрешено SQS (что приводит к сбою доставки сообщений).
Пример сообщения с заголовками, слегка измененными для защиты невинных очередей… Как вы можете видеть, aws_queue, destination и т. Д. Все еще Находятся в сообщении после чтения из «wrench_workflow-mjg», так что при попытке доставки в следующую очередь эти заголовки перезаписывают конфигурацию в конфигурации xml spring-int и никогда не доставляются в следующую очередь. Сообщение просто продолжало циклически перемещаться по очереди SQS «wrench_workflow-mjg» (после устранения проблемы с более чем 10 атрибутами).
GenericMessage [payload={"eventId":"event-1","eventStartDateTime":1476127827.201000000},
headers={aws_messageId=db9b6cc0-f133-4182-b79c-4d5d9717a3a9, ApproximateReceiveCount=1,
SentTimestamp=1476127827803, id=0b662b72-f149-a970-5e63-64a1b28290fb,
SenderId=AIDAJOYV7TECZCZCOK22C,
aws_receiptHandle=AQEBdaToWo9utbjBwADeSuChX/KrY3l8eoSBMZ0RMmbI8ewjJQw6tV74RwSYxWPKZBSzgJhCmfJ8AUX
reOy2yGrmefULU7NS8nqYTdMW6BB4Ug2 mHIY 8Tze 2ws15FB5t96q3iJO8 tP5pl/xuo CiTDv
L1rlYkVODD0Yv1zosGKx48IhGOXdL8nJ4Im8bujcUNLJy/vEYD8Qcfsi6NHOF3Qn0A4Xw Nva0wp86zfq,
aws_queue=wrench_workflow-mjg,
lookupDestination=wrench_workflow-mjg,
ApproximateFirstReceiveTimestamp=1476127827886, timestamp=1476127836254}
Я, вероятно, могу собрать пример, если это необходимо, но я надеюсь, вы понимаете, к чему я клоню.
Ответ №1:
Вероятно, нам следует добавить средства сопоставления заголовков в адаптеры, чтобы включить выборочное сопоставление заголовков, как у нас для других технологий в Spring Integration.
Я открыл проблему с GitHub.
В то же время вы можете поместить <int:header-filter.../>
(или HeaderFilter
transformer) непосредственно перед исходящим адаптером, чтобы удалить ненужные заголовки.
Вы также можете добавить пользовательский обработчик запросов к исходящему адаптеру.
Комментарии:
1. просто с головы до ног, где пример, в котором реализовано приведенное выше сопоставление заголовков?
2. Непосредственно перед исходящим адаптером; если вы можете сказать мне, используете ли вы конфигурацию xml или @Bean, я могу отредактировать свой ответ с помощью config .
3. Спасибо, но не обязательно. Мы довольно заядлые пользователи spring-int и уже внедрили предложенный вами обходной путь. Если мы сможем запустить этот проект, и у меня будет немного времени, я тоже попытаюсь решить проблему с GitHub.
4. (О, и мы склонны использовать XML для определений потоков — по иронии судьбы, кажется, легче увидеть потоки, используя это, чем более длинную конфигурацию java. Хотя java DSL выглядит довольно хорошо для простых потоков.
5. Мой первый ответ касался вопроса о проблеме с github — я должен был спросить там. Еще раз спасибо!