#spring-boot #spring-cloud-stream
#spring-boot #spring-cloud-stream
Вопрос:
Мое приложение spring boot использует aws kinesis binder, который выступает в качестве потребителя. Я сохраняю последний порядковый номер в базе данных, так что при перезапуске приложения я прочитаю это, и обработка начнется после этого порядкового номера, используя AFTER_SEQUENCE_NUMBER в качестве shardIteratorType . Я могу сделать это, вручную обновив config yml новым порядковым номером. Могу ли я каким-либо образом динамически установить этот порядковый номер. Спасибо.
Ответ №1:
Вы можете предоставить ConsumerEndpointCustomizer<KinesisMessageDrivenChannelAdapter>
компонент, перехватить его KinesisMessageDrivenChannelAdapter
для конкретного назначения и вызывать его resetCheckpointForShardToSequenceNumber()
всякий раз, когда вам нужно изменить порядковый номер во время выполнения.
Другой способ — обратиться к stop()
этому адаптеру канала и вызвать его setStreamInitialSequence()
, а затем start()
применить новый порядковый номер ко всем новым запущенным потребителям внутри.
Комментарии:
1. Не могли бы вы подробнее рассказать, как это сделать. Спасибо
2.Не уверен, что вас беспокоит. Вы предоставляете
ConsumerEndpointCustomizer<KinesisMessageDrivenChannelAdapter>
@Bean
. Заполните некоторыеAtomicReference<KinesisMessageDrivenChannelAdapter>
адаптером из этого обратного вызова. И используйте этоAtomicReference
из любого другого сервиса, когда вам нужно изменить состояние адаптера, используя API, который я предоставил в ответе.