#java #apache-kafka
#java #apache-kafka
Вопрос:
Почему я не получаю сообщение об ошибке при запуске следующего кода производителя, а Kafka даже не запущен?
Я ожидал createTopics
бы, что метод выдаст исключение, но этого не происходит. Почему?
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
AdminClient adminClient = AdminClient.create(properties);
NewTopic newTopic = new NewTopic("events", 1, (short) 1);
adminClient.createTopics(Arrays.asList(newTopic));
adminClient.close();
Ответ №1:
Метод createTopics возвращает CreateTopicsResult
значение with KafkaFutures
в качестве значений. Поскольку в настоящее время вы не блокируете свой код для завершения этого действия (использования get
) и не перехватываете никаких исключений, ваш код будет работать нормально без какого-либо уведомления о том, что брокер недоступен.
Следующий код выдаст ExecutionException
сообщение, когда ваш брокер недоступен:
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "5000");
AdminClient adminClient = AdminClient.create(properties);
NewTopic newTopic = new NewTopic("events-test", 1, (short) 1);
CreateTopicsResult topicResult = adminClient.createTopics(Arrays.asList(newTopic));
KafkaFuture<Void> resultFuture = topicResult.all();
try {
resultFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
adminClient.close();
Я протестировал его с помощью клиента Kafka 2.5.0, и вот исключение:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, deadlineMs=1601136227182) timed out at 1601136227183 after 1 attempt(s)
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at org.michael.big.data.kafka.java.BasicAdminClient.main(BasicAdminClient.java:27)
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, deadlineMs=1601136227182) timed out at 1601136227183 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
Имейте в виду, что класс Admin
(суперкласс AdminClient
) аннотируется как развивающийся и может быть изменен в будущих выпусках.