Перезагрузите контекст SSL, используемый Spring Kafka во время выполнения

#java #spring-boot #apache-kafka #spring-kafka

#java #spring-boot #apache-kafka #spring-kafka

Вопрос:

Каков рекомендуемый способ заставить Spring Kafka перезагрузить SSL-контекст?

У меня есть требование вставлять новые сертификаты в хранилище доверия, которое использует мой производитель Kafka, без каких-либо простоев.

Однако я обнаружил, что после запуска приложения и создания Kafka producer создается экземпляр SSLContext и кэшируется. Есть способ перенастроить это, но единственный способ, который я нашел до сих пор, — уничтожить всех существующих производителей, вызвав метод destroy при DefaultKafkaProducerFactory (после обновления сертификата), который вызывает любые последующие вызовы KafkaTemplate.send для принудительного создания нового производителя, который, в свою очередь, перезагружает контекст SSL.

Я чувствую, что это похоже на использование кувалды для решения этой проблемы, и может быть более элегантное решение. Я также заметил, что если я вызываю destroy при отправке сообщений, я получаю приведенное ниже исключение, которое выглядит не очень позитивно, когда мы не можем позволить себе потерять какие-либо события.

 java.util.concurrent.CompletionException: org.apache.kafka.common.KafkaException: Producer closed while send in progress
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1592)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java)
    at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.kafka.common.KafkaException: Producer closed while send in progress
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:826)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:372)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:190)
    at org.springframework.kafka.core.KafkaOperations$send.call(Unknown Source)
    at com.example.event.publisher.kafka.KafkaEventPublisher.doPublish(KafkaEventPublisher.groovy:57)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:98)
    at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:325)
    at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:352)
    at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1034)
    at org.codehaus.groovy.runtime.callsite.PogoMetaClassSite.callCurrent(PogoMetaClassSite.java:68)
    at org.codehaus.groovy.runtime.callsite.AbstractCallSite.callCurrent(AbstractCallSite.java:177)
    at com.example.event.publisher.kafka.KafkaEventPublisher$_publish_closure1.doCall(KafkaEventPublisher.groovy:47)
    at com.example.event.publisher.kafka.KafkaEventPublisher$_publish_closure1.doCall(KafkaEventPublisher.groovy)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:98)
    at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:325)
    at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:264)
    at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1034)
    at groovy.lang.Closure.call(Closure.java:418)
    at org.codehaus.groovy.runtime.ConvertedClosure.invokeCustom(ConvertedClosure.java:54)
    at org.codehaus.groovy.runtime.ConversionHandler.invoke(ConversionHandler.java:124)
    at com.sun.proxy.$Proxy103.get(Unknown Source)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1590)
    ... 6 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: Requested metadata update after close
    at org.apache.kafka.clients.Metadata.awaitUpdate(Metadata.java:200)
    at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:938)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:823)
    ... 37 common frames omitted
  

Комментарии:

1. у меня похожая ситуация. По этому поводу было несколько обсуждений в официальном разделе дефектов ( issues.apache.org/jira/browse/KAFKA-4701 ). У вас случайно нет какого-либо решения?

Ответ №1:

Похоже, что простой сброс конфигурации для определенных значений вызовет перестройку на заводе SSL engine. Они даже вызывают конфигурацию ключа файла, которая вызовет горячую перезагрузку.

Ссылка

Комментарии:

1. shouldBeRebuild является частью динамической конфигурации Kafka. Это функция на стороне брокера, и нет практического способа создать ее экземпляр на стороне клиента. ( docs.confluent.io/platform/current/kafka/dynamic-config.html )

Ответ №2:

Вместо Spring Kafka 2.6.5, Kafka 2.4.1 я использую KafkaProducerFactory.reset().

 @Autowired
private final KafkaTemplate<String, byte[]> kafkaTemplate;


private void reloadProducer() {
    kafkaTemplate.getProducerFactory().reset();
}
  

При следующем вызове send() Spring воссоздаст совершенно новый KafkaConsumer с новым сертификатом.