Потребитель Кафки, использующий ошибку AWS_MSK_IAM ClassCastException

#apache-kafka #aws-msk #aws-databricks #aws-iam-authenticator

Вопрос:

У меня MSK работает на AWS, и я хотел бы использовать информацию с помощью аутентификации AWS_MSK_IAM.

Мой MSK настроен правильно, и я могу использовать информацию с помощью интерфейса командной строки Kafka со следующей командой:

 ../bin/kafka-console-consumer.sh --bootstrap-server b-1.kafka.*********.***********.amazonaws.com:9098 --consumer.config client_auth.properties --topic TopicTest --from-beginning
 

Мой client_auth.properties содержит следующую информацию:

 # Sets up TLS for encryption and SASL for authN.
security.protocol = SASL_SSL

# Identifies the SASL mechanism to use.
sasl.mechanism = AWS_MSK_IAM

# Binds SASL client implementation.
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;

# Encapsulates constructing a SigV4 signature based on extracted credentials.
# The SASL client bound by "sasl.jaas.config" invokes this class.
sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler
 

Когда я пытаюсь использовать данные из своего кластера Databricks с помощью spark, я получаю следующую ошибку:

 Caused by: kafkashaded.org.apache.kafka.common.KafkaException: java.lang.ClassCastException: software.amazon.msk.auth.iam.IAMClientCallbackHandler cannot be cast to kafkashaded.org.apache.kafka.common.security.auth.AuthenticateCallbackHandler
 

Вот моя конфигурация кластера:
введите описание изображения здесь

Библиотеки, которые я использую в кластере:

введите описание изображения здесь

И код, который я запускаю в базах данных:

 raw = (
    spark
        .readStream
        .format('kafka')
        .option('kafka.bootstrap.servers', 'b-.kafka.*********.***********.amazonaws.com:9098')
        .option('subscribe', 'TopicTest') 
        .option('startingOffsets', 'earliest')
        .option('kafka.sasl.mechanism', 'AWS_MSK_IAM')
        .option('kafka.security.protocol', 'SASL_SSL')
        .option('kafka.sasl.jaas.config', 'software.amazon.msk.auth.iam.IAMLoginModule required;')
        .option('kafka.sasl.client.callback.handler.class', 'software.amazon.msk.auth.iam.IAMClientCallbackHandler')
        .load()
)
 

Комментарии:

1. Я постоянно общался с поддержкой Databricks по этому вопросу. По сути, Databricks затеняет org.apache.kafka.common значение «до kafkashaded.org.apache.kafka.common «, что означает, что это не сработает из коробки. Мне пришлось двигаться дальше и использовать SASL/SCRAM, но теоретически вы могли бы сами отключить аутентификатор iam. Я не уверен на 100%, как это сделать.

2. Привет @AndrewGelnar, спасибо за ответ, вы имеете в виду, что на данный момент это вообще не будет работать для AWS IAM? Или вы смогли запустить AWS IAM, но с помощью SASL/SCRAM вместо SASL_SSL?

3. SASL/SCRAM-это другой механизм, который не интегрирован с IAM. MSK использует имена пользователей/пароли, настроенные в AWS secretsmanager docs.aws.amazon.com/msk/latest/developerguide/msk-password.html

4. Я сталкиваюсь с аналогичной проблемой, вы нашли решение для этого?