Смелая трассировка для потребителя Kafka с использованием @KafkaListener

#java #spring #apache-kafka #spring-kafka #zipkin

#java #весна #apache-kafka #spring-кафка #zipkin

Вопрос:

Я использую библиотеку Brave https://github.com/openzipkin/brave для отслеживания, и теперь я хотел бы использовать его также для потребителя Kafka. Я бы хотел избежать добавления Spring Sleuth и использовать только инструментарий Brave Kafka https://github.com/openzipkin/brave/tree/master/instrumentation/kafka-clients .

Для потребителя Kafka я использую @KafkaListener. Код выглядит следующим образом:

TestKafkaEndpoint.java

 @Service
public class TestKafkaEndpoint {

    @KafkaListener(topics = "myTestTopic", containerFactory = "testKafkaListenerContainerFactory")
    public void procesMyRequest(@Payload final MyRequest request) {
       // do some magic...
    }
}
  

И класс конфигурации TestKafkaConfig.java

 
@Configuration
@EnableKafka
@ComponentScan
public class TestKafkaConfig {

    @Bean
    public ConsumerFactory<String, MyRequest> testConsumerFactory() {
        final Map<String, Object> consumerProperties = new HashMap<>();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01-localhost:9092");
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "TestGROUP");
        return new DefaultKafkaConsumerFactory<>(consumerProperties, new StringDeserializer(), new JsonDeserializer<>(MyRequest.class));
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MyRequest>> testKafkaListenerContainerFactory() {
        final ConcurrentKafkaListenerContainerFactory<String, MyRequest> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(testConsumerFactory());
        factory.getContainerProperties().setErrorHandler(new LoggingErrorHandler());
        return factory;
    }

  

Но я не знаю, как использовать KafkaConsumer при использовании фабрики Kafka или использовать KafkaTracing. У кого-нибудь есть какой-либо опыт в этом, и он работает?

Ответ №1:

Я не знаком с этим, но похоже TracingConsumer , что это простая потребительская оболочка: https://github.com/openzipkin/brave/blob/363ceb4c922305ffb4a68ac47dc152e1d15da0fb/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java#L69-L79

Вы должны иметь возможность создать подкласс DefaultKafkaConsumerFactory ; переопределить createConsumer методы, используемые контейнером-слушателем…

 this.consumer =
        KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
                this.consumerGroupId,
                this.containerProperties.getClientId(),
                KafkaMessageListenerContainer.this.clientIdSuffix,
                consumerProperties);
  

… вызовите super.createConsumer(…) и оберните его в TracingConsumer .

Если вы используете 2.5.3 или более позднюю версию, вы можете добавить a ConsumerPostProcessor в DKCF.

Вот как это делает sleuth:

https://github.com/spring-cloud/spring-cloud-sleuth/blob/6e306e594d20361483fd19739e0f5f8e82354bf5/spring-cloud-sleuth-brave/src/main/java/org/springframework/cloud/sleuth/brave/instrument/messaging/TraceMessagingAutoConfiguration.java#L263-L285

Комментарии:

1. Привет, Гэри, спасибо за твой великолепный комментарий. Я уже начал изучать подклассы DefaultKafkaConsumerFactory, что может быть правильным решением. Я также посмотрю на ConsumerPostProcessor, который звучит немного аккуратнее, и опубликую свой опыт, окончательное решение здесь 🙂

2. Помог подкласс DefaultKafkaConsumerFactory. Спасибо 🙂