#apache-kafka #ksqldb #kafka-topic
#apache-кафка #ksqldb #кафка-тема
Вопрос:
Я работаю с ksql довольно давно. Кластер кафки, состоящий из 3 узлов. Я также использую udf, и все выглядит хорошо, пока я не остановлю серверы и не запущу их снова. При запуске сервера я вижу следующее в журналах:
[2019-04-03 11:29:54,381] ERROR Exception encountered running command: A Kafka topic with the name 'czxcorp-structured-data-enriched' already exists, with different partition/replica configuration than required. KSQL expects 4 partitions (topic has 9), and 1 replication factor (topic has 1).. Retrying in 5000 ms (io.confluent.ksql.util.RetryUtil:80)
[2019-04-03 11:29:54,381] ERROR Stack trace: io.confluent.ksql.exception.KafkaTopicExistsException: A Kafka topic with the name 'czxcorp-structured-data-enriched' already exists, with different partition/replica configuration than required. KSQL expects 4 partitions (topic has 9), and 1 replication factor (topic has 1).
at io.confluent.ksql.services.TopicValidationUtil.validateTopicProperties(TopicValidationUtil.java:51)
at io.confluent.ksql.services.TopicValidationUtil.validateTopicProperties(TopicValidationUtil.java:35)
at io.confluent.ksql.services.KafkaTopicClientImpl.validateTopicProperties(KafkaTopicClientImpl.java:292)
at io.confluent.ksql.services.KafkaTopicClientImpl.createTopic(KafkaTopicClientImpl.java:76)
at io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode.createSinkTopic(KsqlStructuredDataOutputNode.java:244)
at io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode.buildStream(KsqlStructuredDataOutputNode.java:146)
at io.confluent.ksql.physical.PhysicalPlanBuilder.buildPhysicalPlan(PhysicalPlanBuilder.java:106)
at io.confluent.ksql.QueryEngine.buildPhysicalPlan(QueryEngine.java:113)
at io.confluent.ksql.KsqlEngine$EngineExecutor.execute(KsqlEngine.java:625)
at io.confluent.ksql.KsqlEngine$EngineExecutor.access$800(KsqlEngine.java:577)
at io.confluent.ksql.KsqlEngine.execute(KsqlEngine.java:247)
at io.confluent.ksql.rest.server.computation.StatementExecutor.startQuery(StatementExecutor.java:277)
at io.confluent.ksql.rest.server.computation.StatementExecutor.executeStatement(StatementExecutor.java:191)
at io.confluent.ksql.rest.server.computation.StatementExecutor.handleStatementWithTerminatedQueries(StatementExecutor.java:167)
at io.confluent.ksql.rest.server.computation.StatementExecutor.handleRestore(StatementExecutor.java:101)
at io.confluent.ksql.rest.server.computation.CommandRunner.lambda$null$0(CommandRunner.java:139)
at io.confluent.ksql.util.RetryUtil.retryWithBackoff(RetryUtil.java:63)
at io.confluent.ksql.util.RetryUtil.retryWithBackoff(RetryUtil.java:36)
at io.confluent.ksql.rest.server.computation.CommandRunner.lambda$processPriorCommands$1(CommandRunner.java:135)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at io.confluent.ksql.rest.server.computation.CommandRunner.processPriorCommands(CommandRunner.java:134)
at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:414)
at io.confluent.ksql.rest.server.KsqlServerMain.createExecutable(KsqlServerMain.java:80)
at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:42)
(io.confluent.ksql.util.RetryUtil:84)
Хотя я остановил / завершил все запросы, в журнале печатаются все команды, которые я выполнял с самого начала для моего тестирования до данных, включая create, select, drop
. Я извлек .jar
(UDF) из папки / ext, и сервер запустился, хотя функция печати журнала udf (которую я использую) недоступна.
Это мой ksql-server.properties:
bootstrap.servers=hostname:9092
service.id=cyan_ksql
commit.interval.ms=5000
cache.max.bytes.buffering=20000000
num.stream.threads=10
fail.on.deserialization.error=false
listeners=http://localhost:8088
ksql.extension.dir=/opt/ksql-master/ext/
Схожу с ума от ошибки. Я удаляю тему и каким-то образом ее воссоздаю. Кто-нибудь, пожалуйста, помогите.
Ответ №1:
Проверьте ошибку:
A Kafka topic with the name 'czxcorp-structured-data-enriched' already exists, with different partition/replica configuration than required.
KSQL expects 4 partitions (topic has 9), and 1 replication factor (topic has 1)
Если вы удалили тему, то либо
- на самом деле он не был удален
- он был удален, и что-то другое воссоздало его с девятью разделами, и ваш ошибочный запрос KSQL не указал переопределение (
WITH (PARTITIONS=9
) для четырех разделов по умолчанию - другая команда KSQL создает ее раньше той, которая выдает ошибку, и в вашем запросе KSQL с ошибкой не указано переопределение (
WITH (PARTITIONS=9
) для четырех значений по умолчанию
Если вы хотите изменить свое состояние и начать с нуля, просто измените свое ksql.service.id
, что заставит KSQL использовать новую командную тему (которая воспроизводится при перезапуске процесса)
Комментарии:
1. Красивые. Он работает с изменением ksql.service.id . Спасибо, Робин. Я обновил вопрос, не могли бы вы предложить исправления в ksql-server.properties? Спасибо
2. Вы можете изменить
service.id
в файле свойств — это то, что вы имеете в виду? Не уверен, о чем вы спрашиваете3. Ну, у меня есть изменение service.id и сервер ksql запустился заново. Никаких проблем вообще. Я спрашиваю о каких-либо изменениях / дополнительных параметрах, которые требуются, кроме однажды упомянутых в исходном вопросе, для повышения производительности?
4. Я бы рекомендовал начать новый вопрос и подробно описать проблемы с производительностью, с которыми вы сталкиваетесь и которые вы хотите решить. Спасибо.