#java #spring #apache-kafka #spring-kafka #zipkin
#java #весна #apache-kafka #spring-кафка #zipkin
Вопрос:
Я использую библиотеку Brave https://github.com/openzipkin/brave для отслеживания, и теперь я хотел бы использовать его также для потребителя Kafka. Я бы хотел избежать добавления Spring Sleuth и использовать только инструментарий Brave Kafka https://github.com/openzipkin/brave/tree/master/instrumentation/kafka-clients .
Для потребителя Kafka я использую @KafkaListener. Код выглядит следующим образом:
TestKafkaEndpoint.java
@Service
public class TestKafkaEndpoint {
@KafkaListener(topics = "myTestTopic", containerFactory = "testKafkaListenerContainerFactory")
public void procesMyRequest(@Payload final MyRequest request) {
// do some magic...
}
}
И класс конфигурации TestKafkaConfig.java
@Configuration
@EnableKafka
@ComponentScan
public class TestKafkaConfig {
@Bean
public ConsumerFactory<String, MyRequest> testConsumerFactory() {
final Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka01-localhost:9092");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "TestGROUP");
return new DefaultKafkaConsumerFactory<>(consumerProperties, new StringDeserializer(), new JsonDeserializer<>(MyRequest.class));
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MyRequest>> testKafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, MyRequest> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(testConsumerFactory());
factory.getContainerProperties().setErrorHandler(new LoggingErrorHandler());
return factory;
}
Но я не знаю, как использовать KafkaConsumer при использовании фабрики Kafka или использовать KafkaTracing. У кого-нибудь есть какой-либо опыт в этом, и он работает?
Ответ №1:
Я не знаком с этим, но похоже TracingConsumer
, что это простая потребительская оболочка: https://github.com/openzipkin/brave/blob/363ceb4c922305ffb4a68ac47dc152e1d15da0fb/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java#L69-L79
Вы должны иметь возможность создать подкласс DefaultKafkaConsumerFactory
; переопределить createConsumer
методы, используемые контейнером-слушателем…
this.consumer =
KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
this.consumerGroupId,
this.containerProperties.getClientId(),
KafkaMessageListenerContainer.this.clientIdSuffix,
consumerProperties);
… вызовите super.createConsumer(…) и оберните его в TracingConsumer
.
Если вы используете 2.5.3 или более позднюю версию, вы можете добавить a ConsumerPostProcessor
в DKCF.
Вот как это делает sleuth:
Комментарии:
1. Привет, Гэри, спасибо за твой великолепный комментарий. Я уже начал изучать подклассы DefaultKafkaConsumerFactory, что может быть правильным решением. Я также посмотрю на ConsumerPostProcessor, который звучит немного аккуратнее, и опубликую свой опыт, окончательное решение здесь 🙂
2. Помог подкласс DefaultKafkaConsumerFactory. Спасибо 🙂