Как мне создать только одно сообщение, используя Spring Cloud Stream без устаревшего @Output или отключить опрос?

#java #spring #spring-cloud #producer

Вопрос:

Я пытаюсь опубликовать только одно сообщение в Kafka, используя Spring Cloud без каких-либо устаревших классов / методов или аннотаций. Я также хотел бы иметь возможность легко изменять полезную нагрузку.

Итак, для полной ясности я пытаюсь не использовать устаревшую @Output аннотацию или любую KafkaTemplate другую .

Моя конфигурация:

 spring:
  cloud:
    stream:
      bindings:
        message-out-0:
          destination: ${spring.application.name}
          producer:
            key:
              serializer:
                type: string
                format: utf-8
                charset: utf-8
            value:
              serializer:
                type: string
                format: utf-8
                charset: utf-8
 

Мой код — то, что я пробовал до сих пор:

 @Component
@RequiredArgsConstructor
public class ApplicationAnnouncer implements CommandLineRunner {
    
    private final MessageService messageService;
    
    @Override
    public void run(String... args) throws Exception {
        messageService.value = "Application started...";
        messageService.message();
    }
}
 

Одна попытка:

 @Configuration
public class MessageService {
     public Object value;
     
     @Bean
     public Supplier<Message<?>> message () {
          return () -> MessageBuilder.withPayload(value).build();
     }
}
 

Еще одна попытка:

 @Configuration
public class MessageService {
     public Object value;
     
     @Bean
     public Supplier<Flux<?>> message () {
          return () -> Flux.fromStream(Stream.generate(() -> {
               try {
                    Thread.sleep(1000);
                    return value;
               } catch (Exception e) {
                    // ignore
               }
               return null;
          })).subscribeOn(Schedulers.elastic()).share();
     }
}
 

Вывод в console consumer для обеих попыток:

 Hello World!
Hello World!
Hello World!
Hello World! // ... Repeated every second
 

В документации указано:

Фреймворк предоставляет механизм опроса по умолчанию (отвечая на вопрос «Кто?»), который вызовет вызов поставщика, и по умолчанию он будет делать это каждую секунду (отвечая на вопрос «Как часто?»).

Но что, если я не хочу, чтобы он опрашивал каждую секунду?

Странно, как я предоставляю MessageService сообщение… Это конфигурация? Или это сервис?

Мне еще предстоит найти самый простой пример простой отправки ОДНОГО НАСТРАИВАЕМОГО СООБЩЕНИЯ в Kafka.

Ответ №1:

Вы можете подключиться к привязкам облачных потоков с помощью StreamBridge:

 @Component
@RequiredArgsConstructor
public class ApplicationAnnouncer implements CommandLineRunner {

    private final StreamBridge streamBridge;
    
    @Override
    public void run(String... args) throws Exception {
        streamBridge.send("message-out-0", "Application started...");
    }
}
 

Первая строка — это имя привязки, указанное в настройках приложения, полученное из компонента, предоставляющего функцию.

Вам даже не нужен фактический компонент, из которого получено имя привязки. в этом случае подойдет любое имя.


Вы можете найти несколько примеров здесь.

Комментарии:

1. Спасибо! Я написал ответ, потому что ответ, содержащий только ссылки, может устареть.