Получить номер раздела и смещения, в котором обрабатывается сообщение kafka с использованием StreamBridge

#spring #spring-boot #kafka-consumer-api #apache-kafka-streams #spring-kafka

#весна #пружинный ботинок #kafka-consumer-api #apache-kafka-streams #spring-kafka

Вопрос:

Мне нужно распечатать / записать / сохранить раздел kafka и номер смещения, в котором обрабатывается мое сообщение. Как я могу этого добиться? Я использую StreamBridge для отправки сообщения от производителя, а также использую функциональный подход spring kafka streams

 Public delegateToSupplier(String id, Abc obj) {
Message<Abc> message = MessageBuilder.withPayload(obj).seHeaders(KafkaHeaders.MESSAGE_KEY, id.getBytes()).build();
streamBridge.send("out-topic", message);
}
 

Ответ №1:

Метаданные записи доступны (асинхронно) через канал метаданных:

 @SpringBootApplication
public class So66436499Application {

    public static void main(String[] args) {
        SpringApplication.run(So66436499Application.class, args);
    }

    @Autowired
    StreamBridge bridge;

    @Bean
    public ApplicationRunner runner() {
        return args -> {
            this.bridge.send("myBinding", "test");
            Thread.sleep(5000);
        };
    }

    @ServiceActivator(inputChannel = "meta")
    void meta(Message<?> sent) {
        System.out.println("Sent: "   sent.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class));
    }

}
 
 spring.cloud.stream.bindings.myBinding.destination=foo
spring.cloud.stream.kafka.bindings.myBinding.producer.record-metadata-channel=meta
 
 Sent: foo-0@5
 

Комментарии:

1. Могу ли я получить доступ к заголовкам сообщений в consumer?? То есть я использую функциональный подход spring boot для потоков kafka. docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0 /. … Здесь, в функции<Kstream<String,?>,KStream<String,?>process() {}, есть ли способ, с помощью которого я могу получить доступ к заголовкам, которые были отправлены сообщением, пожалуйста??

2. Когда это не связано, всегда задавайте новый вопрос. Задавая вопросы в комментариях к другим ответам, вы не помогаете другим искать вопросы и ответы. Вы можете получить доступ к заголовкам в пользовательской Processor или Transformer подключенной к вашей KStream топологии. Если вам нужна помощь в этом, задайте новый вопрос, показывающий ваш текущий компонент-потребитель.

3. @GaryRussell Этот ответ почти работает для меня — я вижу, что код, аннотированный с @ServiceActivator помощью (или @StreamListener ), выполняется внутри потока ввода-вывода производителя kafka — однако в журналах я вижу, что запущен дополнительный потребитель Kafka с анонимной группой anonymous....: partitions assigned: [meta-0] , где канал ‘meta’ определен только в этом ServiceActivator (или StreamListener) и spring.cloud.stream.kafka.bindings.my-output.producer.record-metadata-channel=meta — как следует Я слушаю канал для записи-метаданных-канала, не запуская надлежащего потребителя kafka

4. Не задавайте новых вопросов в комментариях. Задайте новый вопрос с указанием вашего кода и конфигурации. Что-то не так с вашей конфигурацией, если вы получаете дополнительную привязку. Кроме того, @StreamListener он давно устарел и скоро будет удален; вам необходимо переключиться на функциональную модель.

5. Проблема заключалась в том, что я определил мета-канал с помощью интерфейса с @Input , но это просто необходимо @ServiceActivator (кстати, я не смог найти примеры использования record-metadata-channel в документации HTML для проекта cloud-stream). Мое приложение мигрировало из cloud-stream 2.x и, таким образом, использует StreamListener, а также из быстрого просмотра я еще не понял, как перейти из StreamListener, пакетной обработки, управления интеграцией с системами обмена сообщениями (сканирование всех компонентов, реализующих функциональный интерфейс, кажется слишком сильным предположением, что все они интегрированы с системами обмена сообщениями), однако Flux имеет большиепотенциал.