Настройте shardIteratorType на AFTER_SEQUENCE_NUMBER динамически для кинетического связующего потока spring cloud

#spring-boot #spring-cloud-stream

#spring-boot #spring-cloud-stream

Вопрос:

Мое приложение spring boot использует aws kinesis binder, который выступает в качестве потребителя. Я сохраняю последний порядковый номер в базе данных, так что при перезапуске приложения я прочитаю это, и обработка начнется после этого порядкового номера, используя AFTER_SEQUENCE_NUMBER в качестве shardIteratorType . Я могу сделать это, вручную обновив config yml новым порядковым номером. Могу ли я каким-либо образом динамически установить этот порядковый номер. Спасибо.

Ответ №1:

Вы можете предоставить ConsumerEndpointCustomizer<KinesisMessageDrivenChannelAdapter> компонент, перехватить его KinesisMessageDrivenChannelAdapter для конкретного назначения и вызывать его resetCheckpointForShardToSequenceNumber() всякий раз, когда вам нужно изменить порядковый номер во время выполнения.

Другой способ — обратиться к stop() этому адаптеру канала и вызвать его setStreamInitialSequence() , а затем start() применить новый порядковый номер ко всем новым запущенным потребителям внутри.

Комментарии:

1. Не могли бы вы подробнее рассказать, как это сделать. Спасибо

2.Не уверен, что вас беспокоит. Вы предоставляете ConsumerEndpointCustomizer<KinesisMessageDrivenChannelAdapter> @Bean . Заполните некоторые AtomicReference<KinesisMessageDrivenChannelAdapter> адаптером из этого обратного вызова. И используйте это AtomicReference из любого другого сервиса, когда вам нужно изменить состояние адаптера, используя API, который я предоставил в ответе.