#apache-kafka #aws-msk #aws-databricks #aws-iam-authenticator
Вопрос:
У меня MSK работает на AWS, и я хотел бы использовать информацию с помощью аутентификации AWS_MSK_IAM.
Мой MSK настроен правильно, и я могу использовать информацию с помощью интерфейса командной строки Kafka со следующей командой:
../bin/kafka-console-consumer.sh --bootstrap-server b-1.kafka.*********.***********.amazonaws.com:9098 --consumer.config client_auth.properties --topic TopicTest --from-beginning
Мой client_auth.properties содержит следующую информацию:
# Sets up TLS for encryption and SASL for authN.
security.protocol = SASL_SSL
# Identifies the SASL mechanism to use.
sasl.mechanism = AWS_MSK_IAM
# Binds SASL client implementation.
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;
# Encapsulates constructing a SigV4 signature based on extracted credentials.
# The SASL client bound by "sasl.jaas.config" invokes this class.
sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler
Когда я пытаюсь использовать данные из своего кластера Databricks с помощью spark, я получаю следующую ошибку:
Caused by: kafkashaded.org.apache.kafka.common.KafkaException: java.lang.ClassCastException: software.amazon.msk.auth.iam.IAMClientCallbackHandler cannot be cast to kafkashaded.org.apache.kafka.common.security.auth.AuthenticateCallbackHandler
Вот моя конфигурация кластера:
Библиотеки, которые я использую в кластере:
И код, который я запускаю в базах данных:
raw = (
spark
.readStream
.format('kafka')
.option('kafka.bootstrap.servers', 'b-.kafka.*********.***********.amazonaws.com:9098')
.option('subscribe', 'TopicTest')
.option('startingOffsets', 'earliest')
.option('kafka.sasl.mechanism', 'AWS_MSK_IAM')
.option('kafka.security.protocol', 'SASL_SSL')
.option('kafka.sasl.jaas.config', 'software.amazon.msk.auth.iam.IAMLoginModule required;')
.option('kafka.sasl.client.callback.handler.class', 'software.amazon.msk.auth.iam.IAMClientCallbackHandler')
.load()
)
Комментарии:
1. Я постоянно общался с поддержкой Databricks по этому вопросу. По сути, Databricks затеняет
org.apache.kafka.common
значение «доkafkashaded.org.apache.kafka.common
«, что означает, что это не сработает из коробки. Мне пришлось двигаться дальше и использовать SASL/SCRAM, но теоретически вы могли бы сами отключить аутентификатор iam. Я не уверен на 100%, как это сделать.2. Привет @AndrewGelnar, спасибо за ответ, вы имеете в виду, что на данный момент это вообще не будет работать для AWS IAM? Или вы смогли запустить AWS IAM, но с помощью SASL/SCRAM вместо SASL_SSL?
3. SASL/SCRAM-это другой механизм, который не интегрирован с IAM. MSK использует имена пользователей/пароли, настроенные в AWS secretsmanager docs.aws.amazon.com/msk/latest/developerguide/msk-password.html
4. Я сталкиваюсь с аналогичной проблемой, вы нашли решение для этого?