#spring-integration #spring-kafka #spring-cloud-stream #spring-cloud-dataflow
#spring-интеграция #spring-kafka #весна-облако-поток #spring-cloud-поток данных
Вопрос:
я работаю над приложением потока данных Spring Cloud, ниже приведен фрагмент кода
@Bean
@InboundChannelAdapter(channel = TbeSource.PR1, poller = @Poller(fixedDelay = "2000"))
public MessageSource<Product> getProductSource(ProductBuilder dataAccess) {
return new MessageSource<Product>() {
@SneakyThrows
@Override
public Message<Product> receive() {
System.out.println("calling method");
return MessageBuilder.withPayload(dataAccess.getNext()).build();
}
};
}
В приведенном выше коде getNext()
метод получит данные из базы данных и вернет этот объект, поэтому, если данные будут полностью прочитаны, он вернет null
мы не можем вернуть значение null в этот MessageSource.
итак, есть ли какие-либо доступные опции для приостановки и возобновления этого исходного класса подключения, когда нам нужно?
Кто-нибудь сталкивался / преодолевал этот сценарий?
Ответ №1:
Прежде всего, вы просто можете иметь a Supplier<Product>
вместо этого MessageSource
, и ваш код будет таким же:
return () -> dataAccess.getNext();
null
Результат здесь действителен, и в этом случае сообщение не будет отправлено, и ошибки не будет, поскольку фреймворк null
правильно обрабатывает результат.
У вас все еще может быть незанятая функциональность, @InboundChannelAdapter
когда результат вызова метода null
. По этой причине вам нужно заглянуть в SimpleActiveIdleMessageSourceAdvice
. Дополнительную информацию см. в документах: https://docs.spring.io/spring-integration/docs/5.3.4.RELEASE/reference/html/core.html#simpleactiveidlereceivemessageadvice