#java #apache-kafka #kafka-consumer-api #confluent-control-center
Вопрос:
У меня есть следующий код для подключения к Kafka
Properties props = new Properties();
props.put("bootstrap.servers", "myconfluentkafkabroker:9092");
props.put("group.id","test");
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms","1000");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my_CG");
props.put("group.instance.id", "my_instance_CG_id");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("key.deserializer", Class.forName("org.apache.kafka.common.serialization.StringDeserializer"));
props.put("value.deserializer", Class.forName("org.apache.kafka.common.serialization.StringDeserializer"));
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList("MyTopic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
log.debug("topic = %s, partition = %d, offset = %d,"
customer = %s, country = %sn",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsKey(record.value())) {
updatedCount = custCountryMap.get(record.value()) 1;
}
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4));
}
}
} finally {
consumer.close();
}
Код не выдал никаких ошибок, но я все еще не вижу в списке потребителя
будет ли это проблемой?
props.put(«group.instance.id «, «my_instance_CG_id»);
Ответ №1:
Вы должны проверить информацию, которую вы видите, с помощью встроенных инструментов, которые предоставляет Kafka, например kafka-consumer-groups.sh
Вам также потребуется фактически опрашивать сообщения и фиксировать смещения, а не просто подписываться, прежде чем вы что-либо увидите.
В противном случае для этой конкретной панели мониторинга центра управления может потребоваться добавить перехватчики мониторинга в ваш клиент
Комментарии:
1. Да, я извлекаю, но он печатает 0 сообщений
2. Что ж, тогда используйте инструменты CLI, чтобы убедиться, что задержка равна нулю
3. как распечатать значения ConsumerConfig? что-то вроде 2021-04-01T00:44:32.367 05:30 [APP / PROC /WEB/0] [OUT] 2021-03-31 19:14:32.367 ИНФОРМАЦИЯ 38 — [http-nio-8080-exec-9] org.apache. kafka.clients.consumer. ConsumerConfig: значения ConsumerConfig: 2021-04-01T00:44:32.367 05:30 [APP/PROC/WEB/0] [OUT] разрешить.auto.create.topics = true 2021-04-01T00:44:32.367 05:30 [APP/PROC/WEB/0] [OUT]auto.commit.interval.ms = 5000 2021-04-01T00:44:32.367 05:30 [APP/ PROC/WEB/0] [ВЫХОД] авто.смещение.сброс = последняя версия 2021-04-01T00:44:32.367 05:30 [APP/PROC/WEB/0] [OUT] bootstrap.servers = [17xxx:9092]
4. Они должны автоматически печататься, если вы правильно настроили регистратор SLF4J
5. Насколько я знаю, нет. Удалите его. Это что-нибудь меняет?