Почему не происходит сбой AdminClient при создании темы с не запущенным кластером Kafka?

#java #apache-kafka

#java #apache-kafka

Вопрос:

Почему я не получаю сообщение об ошибке при запуске следующего кода производителя, а Kafka даже не запущен?

Я ожидал createTopics бы, что метод выдаст исключение, но этого не происходит. Почему?

 final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");

AdminClient adminClient = AdminClient.create(properties);
NewTopic newTopic = new NewTopic("events", 1, (short) 1);
adminClient.createTopics(Arrays.asList(newTopic));
adminClient.close();
  

Ответ №1:

Метод createTopics возвращает CreateTopicsResult значение with KafkaFutures в качестве значений. Поскольку в настоящее время вы не блокируете свой код для завершения этого действия (использования get ) и не перехватываете никаких исключений, ваш код будет работать нормально без какого-либо уведомления о том, что брокер недоступен.

Следующий код выдаст ExecutionException сообщение, когда ваш брокер недоступен:

 final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000");

AdminClient adminClient = AdminClient.create(properties);
NewTopic newTopic = new NewTopic("events-test", 1, (short) 1);
CreateTopicsResult topicResult = adminClient.createTopics(Arrays.asList(newTopic));
KafkaFuture<Void> resultFuture = topicResult.all();
try {
    resultFuture.get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}
adminClient.close();
  

Я протестировал его с помощью клиента Kafka 2.5.0, и вот исключение:

 java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, deadlineMs=1601136227182) timed out at 1601136227183 after 1 attempt(s)
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at org.michael.big.data.kafka.java.BasicAdminClient.main(BasicAdminClient.java:27)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, deadlineMs=1601136227182) timed out at 1601136227183 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

  

Имейте в виду, что класс Admin (суперкласс AdminClient ) аннотируется как развивающийся и может быть изменен в будущих выпусках.