Ошибка при попытке доступа к Кафке с помощью Quarkus в собственном режиме

#apache-kafka #kerberos #quarkus

Вопрос:

Я попробовал простой пример кода для проверки доступа к «kerberized» Кафке из Quarkus 2.2.2 с помощью smallrye-реактивного обмена сообщениями-кафка :

 package org.acme;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class MyTopicConsumer {

    @Incoming("in") 
    public void consume(ConsumerRecord<String, String> record) {
        System.out.println("read from Kafka : "   record.value() ) ;
    }

}
 

Кафкас стоит за Kerberos, поэтому я использовал приложение.свойства, подобные этому :

 quarkus.ssl.native=true
quarkus.native.enable-all-security-services=true
mp.messaging.incoming.in.group.id=my-group
mp.messaging.incoming.in.auto.commit.interval.ms=1000
mp.messaging.incoming.in.security.protocol=SASL_SSL
mp.messaging.incoming.in.sasl.kerberos.service.name=kafka
mp.messaging.incoming.in.sasl.mechanism=GSSAPI
mp.messaging.incoming.in.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule "required" doNotPrompt=true useKeyTab=true storeKey=true serviceName="kafka" keyTab="<keytab>" principal="<principal>" useTicketCache=false;
mp.messaging.incoming.in.ssl.truststore.location=<location>
mp.messaging.incoming.in.ssl.truststore.password=<password>
mp.messaging.incoming.in.connector=smallrye-kafka
mp.messaging.incoming.in.topic=<topic>
mp.messaging.incoming.in.auto.offset.reset=earliest
mp.messaging.incoming.in.enable.auto.commit=false
mp.messaging.incoming.in.bootstrap.servers=<list of servers>
 

Он хорошо работает в режиме jvm, но не работает в собственном режиме (graalvm-ce-java11-21.2.0) с этой ошибкой :

 ERROR [io.sma.rea.mes.provider] (main) SRMSG00230: Unable to create the publisher or subscriber during initialization: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)
        at io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.<init>(ReactiveKafkaConsumer.java:80)
        at io.smallrye.reactive.messaging.kafka.impl.KafkaSource.<init>(KafkaSource.java:85)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector.getPublisherBuilder(KafkaConnector.java:182)
        at io.smallrye.reactive.messaging.kafka.KafkaConnector_ClientProxy.getPublisherBuilder(KafkaConnector_ClientProxy.zig:159)
        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.createPublisherBuilder(ConfiguredChannelFactory.java:190)
        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:153)
        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:125)
        at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory_ClientProxy.initialize(ConfiguredChannelFactory_ClientProxy.zig:189)
        at java.util.Iterator.forEachRemaining(Iterator.java:133)
        at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
        at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
        at io.smallrye.reactive.messaging.extension.MediatorManager.start(MediatorManager.java:189)
        at io.smallrye.reactive.messaging.extension.MediatorManager_ClientProxy.start(MediatorManager_ClientProxy.zig:220)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:41)
        at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_4e8937813d9e8faff65c3c07f88fa96615b70e70.notify(SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_4e8937813d9e8faff65c3c07f88fa96615b70e70.zig:111)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:300)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:282)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:70)
        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:128)
        at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:97)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(LifecycleEventsBuildStep$startupEvent1144526294.zig:87)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(LifecycleEventsBuildStep$startupEvent1144526294.zig:40)
        at io.quarkus.runner.ApplicationImpl.doStart(ApplicationImpl.zig:623)
        at io.quarkus.runtime.Application.start(Application.java:101)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:101)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:66)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:42)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:119)
        at io.quarkus.runner.GeneratedMain.main(GeneratedMain.zig:29)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Could not find a public no-argument constructor for org.apache.kafka.common.security.kerberos.KerberosLogin
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
        at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:737)
        ... 30 more
 

Я попробовал несколько изменений, предложенных некоторыми постами, но безрезультатно.
Кто-нибудь может подсказать, как это исправить или обойти ?
Спасибо

Ответ №1:

Похоже, что код не содержит конструктора org.apache.кафка.общие.безопасность.kerberos.KerberosLogin. Вы пытались добавить класс, как описано в https://quarkus.io/guides/writing-native-applications-tips#registering-for-reflection

Возможно, вам нужно добавить строку в свой файл конфигурации

кваркус.собственные.дополнительные-сборки-аргументы=-H:Файлы конфигурации отражения=отражение-конфигурация.json

И добавьте класс org.apache.кафка.общие.безопасность.kerberos.KerberosLogin в reflection-config.json, как описано здесь https://quarkus.io/guides/writing-native-applications-tips#using-a-configuration-file