Использовать транзакции PublishKafka_2_0

#apache-kafka #apache-nifi

#apache-kafka #apache-nifi

Вопрос:

Я использую следующую конфигурацию процессора публикации Kafka:

введите описание изображения здесь Отредактированная конфигурация kafka и zookeeper:

 zookeeper.properties

authProvider.sasl=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
 
 zookeeper_jaas.conf

Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
   user_super="zookeeper"
   user_admin="admin-secret";
};
 
 server.properties

security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=false
auto.create.topics.enable=false
listeners=SASL_PLAINTEXT://172.23.199.20:9092
advertised.listeners=SASL_PLAINTEXT://172.23.199.20:9092
zookeeper.set.acl=true
super.users=User:admin
 
 kafka_server_jaas.conf

KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin-secret";
};
 

Аутентификация работает нормально.

Включение авторизации

Добавить администратора:

 ./bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret]' --entity-type users --entity-name admin
 

Добавить пользователя:

 ./bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=123456]' --entity-type users --entity-name pkalita
 

Добавить разрешение:

 ./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:admin --producer --topic test
 
 ./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:pkalita --producer --topic test
 

После этих действий процессор PublishKafka отлично работает с основным администратором, но выдает исключение, если выбрать пользователя pkalita:

 org.apache.kafka.common.errors.TransactionalIdAuthorizationException: Transactional Id authorization failed
 

введите описание изображения здесь
Процессор работает, только если установлено значение Использовать транзакции — false

Что я делаю не так?

upd: я попытался отправить сообщение с помощью spring Kafka producer с пользователем pkalita — сообщение было опубликовано в теме успешно

Ответ №1:

Вам необходимо указать идентификатор транзакции при настройке списков управления доступом.

Из документации:

Принципал, используемый производителями транзакций, должен быть авторизован для операций описания и записи в сконфигурированном transactional.id .

 bin/kafka-acls --bootstrap-server localhost:9092 --command-config adminclient-configs.conf 
 --add --allow-principal User:Alice 
 --producer --topic test-topic --transactional-id test-txn
 

Вы также можете использовать --transactional-id * , чтобы разрешить любой идентификатор транзакции.

Комментарии:

1. спасибо, я вижу эту команду, но я не нашел, что должно быть в файле adminclient-configs.conf?

2. этот файл может содержать конфигурации для клиента администратора (например, конфигурацию SASL или SSL). Вам, вероятно, это не нужно, просто добавьте --transactional-id конфигурацию при настройке ACL для пользователя pkalita