#java #spring #spring-cloud #producer
Вопрос:
Я пытаюсь опубликовать только одно сообщение в Kafka, используя Spring Cloud без каких-либо устаревших классов / методов или аннотаций. Я также хотел бы иметь возможность легко изменять полезную нагрузку.
Итак, для полной ясности я пытаюсь не использовать устаревшую @Output
аннотацию или любую KafkaTemplate
другую .
Моя конфигурация:
spring:
cloud:
stream:
bindings:
message-out-0:
destination: ${spring.application.name}
producer:
key:
serializer:
type: string
format: utf-8
charset: utf-8
value:
serializer:
type: string
format: utf-8
charset: utf-8
Мой код — то, что я пробовал до сих пор:
@Component
@RequiredArgsConstructor
public class ApplicationAnnouncer implements CommandLineRunner {
private final MessageService messageService;
@Override
public void run(String... args) throws Exception {
messageService.value = "Application started...";
messageService.message();
}
}
Одна попытка:
@Configuration
public class MessageService {
public Object value;
@Bean
public Supplier<Message<?>> message () {
return () -> MessageBuilder.withPayload(value).build();
}
}
Еще одна попытка:
@Configuration
public class MessageService {
public Object value;
@Bean
public Supplier<Flux<?>> message () {
return () -> Flux.fromStream(Stream.generate(() -> {
try {
Thread.sleep(1000);
return value;
} catch (Exception e) {
// ignore
}
return null;
})).subscribeOn(Schedulers.elastic()).share();
}
}
Вывод в console consumer для обеих попыток:
Hello World!
Hello World!
Hello World!
Hello World! // ... Repeated every second
В документации указано:
Фреймворк предоставляет механизм опроса по умолчанию (отвечая на вопрос «Кто?»), который вызовет вызов поставщика, и по умолчанию он будет делать это каждую секунду (отвечая на вопрос «Как часто?»).
Но что, если я не хочу, чтобы он опрашивал каждую секунду?
Странно, как я предоставляю MessageService сообщение… Это конфигурация? Или это сервис?
Мне еще предстоит найти самый простой пример простой отправки ОДНОГО НАСТРАИВАЕМОГО СООБЩЕНИЯ в Kafka.
Ответ №1:
Вы можете подключиться к привязкам облачных потоков с помощью StreamBridge:
@Component
@RequiredArgsConstructor
public class ApplicationAnnouncer implements CommandLineRunner {
private final StreamBridge streamBridge;
@Override
public void run(String... args) throws Exception {
streamBridge.send("message-out-0", "Application started...");
}
}
Первая строка — это имя привязки, указанное в настройках приложения, полученное из компонента, предоставляющего функцию.
Вам даже не нужен фактический компонент, из которого получено имя привязки. в этом случае подойдет любое имя.
Вы можете найти несколько примеров здесь.
Комментарии:
1. Спасибо! Я написал ответ, потому что ответ, содержащий только ссылки, может устареть.