Как приостановить отправку данных классом источника потока данных Spring cloud в kafka?

#spring-integration #spring-kafka #spring-cloud-stream #spring-cloud-dataflow

#spring-интеграция #spring-kafka #весна-облако-поток #spring-cloud-поток данных

Вопрос:

я работаю над приложением потока данных Spring Cloud, ниже приведен фрагмент кода

     @Bean
    @InboundChannelAdapter(channel = TbeSource.PR1, poller = @Poller(fixedDelay = "2000"))
    public MessageSource<Product> getProductSource(ProductBuilder dataAccess) {

        return new MessageSource<Product>() {
            @SneakyThrows
            @Override
            public Message<Product> receive() {
                System.out.println("calling method");
                return MessageBuilder.withPayload(dataAccess.getNext()).build();
            }
        };
    }
 

В приведенном выше коде getNext() метод получит данные из базы данных и вернет этот объект, поэтому, если данные будут полностью прочитаны, он вернет null

мы не можем вернуть значение null в этот MessageSource.

итак, есть ли какие-либо доступные опции для приостановки и возобновления этого исходного класса подключения, когда нам нужно?

Кто-нибудь сталкивался / преодолевал этот сценарий?

Ответ №1:

Прежде всего, вы просто можете иметь a Supplier<Product> вместо этого MessageSource , и ваш код будет таким же:

 return () -> dataAccess.getNext();
 

null Результат здесь действителен, и в этом случае сообщение не будет отправлено, и ошибки не будет, поскольку фреймворк null правильно обрабатывает результат.

У вас все еще может быть незанятая функциональность, @InboundChannelAdapter когда результат вызова метода null . По этой причине вам нужно заглянуть в SimpleActiveIdleMessageSourceAdvice . Дополнительную информацию см. в документах: https://docs.spring.io/spring-integration/docs/5.3.4.RELEASE/reference/html/core.html#simpleactiveidlereceivemessageadvice