#apache-kafka
#апач-кафка
Вопрос:
Я пытаюсь создать приложение потоков Кафки, и оно не работает с созданием темы списка изменений. Чего мне не хватает с точки зрения разрешений?
org.apache.kafka.streams.errors.StreamsException: Could not create topic cdcstream-109812-cdcrecords-changelog. at org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:235) ~[kafka-streams-2.8.1.jar:na] at org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:258) ~[kafka-streams-2.8.1.jar:na] at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:115) ~[kafka-streams-2.8.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareChangelogTopics(StreamsPartitionAssignor.java:598) ~[kafka-streams-2.8.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToClients(StreamsPartitionAssignor.java:625) ~[kafka-streams-2.8.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:387) ~[kafka-streams-2.8.1.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:693) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1182) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1157) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:426) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1232) ~[kafka-clients-2.8.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1165) ~[kafka-clients-2.8.1.jar:na] at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:89) ~[brave-instrumentation-kafka-clients-5.13.2.jar:na] at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:83) ~[brave-instrumentation-kafka-clients-5.13.2.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:925) ~[kafka-streams-2.8.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:885) ~[kafka-streams-2.8.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720) ~[kafka-streams-2.8.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) ~[kafka-streams-2.8.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556) ~[kafka-streams-2.8.1.jar:na] Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Topic authorization failed.
Комментарии:
1. Используете ли вы функцию ACL для брокеров kafka? Если нет, проверьте
auto.create.topics.enable
настройки брокеров.