#spring #spring-boot #kafka-consumer-api #apache-kafka-streams #spring-kafka
#весна #пружинный ботинок #kafka-consumer-api #apache-kafka-streams #spring-kafka
Вопрос:
Мне нужно распечатать / записать / сохранить раздел kafka и номер смещения, в котором обрабатывается мое сообщение. Как я могу этого добиться? Я использую StreamBridge для отправки сообщения от производителя, а также использую функциональный подход spring kafka streams
Public delegateToSupplier(String id, Abc obj) {
Message<Abc> message = MessageBuilder.withPayload(obj).seHeaders(KafkaHeaders.MESSAGE_KEY, id.getBytes()).build();
streamBridge.send("out-topic", message);
}
Ответ №1:
Метаданные записи доступны (асинхронно) через канал метаданных:
@SpringBootApplication
public class So66436499Application {
public static void main(String[] args) {
SpringApplication.run(So66436499Application.class, args);
}
@Autowired
StreamBridge bridge;
@Bean
public ApplicationRunner runner() {
return args -> {
this.bridge.send("myBinding", "test");
Thread.sleep(5000);
};
}
@ServiceActivator(inputChannel = "meta")
void meta(Message<?> sent) {
System.out.println("Sent: " sent.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class));
}
}
spring.cloud.stream.bindings.myBinding.destination=foo
spring.cloud.stream.kafka.bindings.myBinding.producer.record-metadata-channel=meta
Sent: foo-0@5
Комментарии:
1. Могу ли я получить доступ к заголовкам сообщений в consumer?? То есть я использую функциональный подход spring boot для потоков kafka. docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0 /. … Здесь, в функции<Kstream<String,?>,KStream<String,?>process() {}, есть ли способ, с помощью которого я могу получить доступ к заголовкам, которые были отправлены сообщением, пожалуйста??
2. Когда это не связано, всегда задавайте новый вопрос. Задавая вопросы в комментариях к другим ответам, вы не помогаете другим искать вопросы и ответы. Вы можете получить доступ к заголовкам в пользовательской
Processor
илиTransformer
подключенной к вашейKStream
топологии. Если вам нужна помощь в этом, задайте новый вопрос, показывающий ваш текущий компонент-потребитель.3. @GaryRussell Этот ответ почти работает для меня — я вижу, что код, аннотированный с
@ServiceActivator
помощью (или@StreamListener
), выполняется внутри потока ввода-вывода производителя kafka — однако в журналах я вижу, что запущен дополнительный потребитель Kafka с анонимной группойanonymous....: partitions assigned: [meta-0]
, где канал ‘meta’ определен только в этом ServiceActivator (или StreamListener) иspring.cloud.stream.kafka.bindings.my-output.producer.record-metadata-channel=meta
— как следует Я слушаю канал для записи-метаданных-канала, не запуская надлежащего потребителя kafka4. Не задавайте новых вопросов в комментариях. Задайте новый вопрос с указанием вашего кода и конфигурации. Что-то не так с вашей конфигурацией, если вы получаете дополнительную привязку. Кроме того,
@StreamListener
он давно устарел и скоро будет удален; вам необходимо переключиться на функциональную модель.5. Проблема заключалась в том, что я определил мета-канал с помощью интерфейса с
@Input
, но это просто необходимо@ServiceActivator
(кстати, я не смог найти примеры использования record-metadata-channel в документации HTML для проекта cloud-stream). Мое приложение мигрировало из cloud-stream 2.x и, таким образом, использует StreamListener, а также из быстрого просмотра я еще не понял, как перейти из StreamListener, пакетной обработки, управления интеграцией с системами обмена сообщениями (сканирование всех компонентов, реализующих функциональный интерфейс, кажется слишком сильным предположением, что все они интегрированы с системами обмена сообщениями), однако Flux имеет большиепотенциал.