Объединенный центр управления — потребитель отсутствует в списке

#java #apache-kafka #kafka-consumer-api #confluent-control-center

Вопрос:

У меня есть следующий код для подключения к Kafka

 Properties props = new Properties();
props.put("bootstrap.servers", "myconfluentkafkabroker:9092");
props.put("group.id","test");
props.put("enable.auto.commit","true");
props.put("auto.commit.interval.ms","1000");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my_CG");
props.put("group.instance.id", "my_instance_CG_id");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put("key.deserializer", Class.forName("org.apache.kafka.common.serialization.StringDeserializer"));
props.put("value.deserializer", Class.forName("org.apache.kafka.common.serialization.StringDeserializer"));

KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList("MyTopic"));
try {
    while (true) { 
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
        {
            log.debug("topic = %s, partition = %d, offset = %d,"
                customer = %s, country = %sn",
                record.topic(), record.partition(), record.offset(),
                record.key(), record.value());

            int updatedCount = 1;
            if (custCountryMap.countainsKey(record.value())) {
                updatedCount = custCountryMap.get(record.value())   1;
            }
            custCountryMap.put(record.value(), updatedCount)

            JSONObject json = new JSONObject(custCountryMap);
            System.out.println(json.toString(4));
        }
    }
} finally {
    consumer.close();
}
 

Код не выдал никаких ошибок, но я все еще не вижу в списке потребителя

введите описание изображения здесь

будет ли это проблемой?

props.put(«group.instance.id «, «my_instance_CG_id»);

Ответ №1:

Вы должны проверить информацию, которую вы видите, с помощью встроенных инструментов, которые предоставляет Kafka, например kafka-consumer-groups.sh

Вам также потребуется фактически опрашивать сообщения и фиксировать смещения, а не просто подписываться, прежде чем вы что-либо увидите.

В противном случае для этой конкретной панели мониторинга центра управления может потребоваться добавить перехватчики мониторинга в ваш клиент

Комментарии:

1. Да, я извлекаю, но он печатает 0 сообщений

2. Что ж, тогда используйте инструменты CLI, чтобы убедиться, что задержка равна нулю

3. как распечатать значения ConsumerConfig? что-то вроде 2021-04-01T00:44:32.367 05:30 [APP / PROC /WEB/0] [OUT] 2021-03-31 19:14:32.367 ИНФОРМАЦИЯ 38 — [http-nio-8080-exec-9] org.apache. kafka.clients.consumer. ConsumerConfig: значения ConsumerConfig: 2021-04-01T00:44:32.367 05:30 [APP/PROC/WEB/0] [OUT] разрешить.auto.create.topics = true 2021-04-01T00:44:32.367 05:30 [APP/PROC/WEB/0] [OUT]auto.commit.interval.ms = 5000 2021-04-01T00:44:32.367 05:30 [APP/ PROC/WEB/0] [ВЫХОД] авто.смещение.сброс = последняя версия 2021-04-01T00:44:32.367 05:30 [APP/PROC/WEB/0] [OUT] bootstrap.servers = [17xxx:9092]

4. Они должны автоматически печататься, если вы правильно настроили регистратор SLF4J

5. Насколько я знаю, нет. Удалите его. Это что-нибудь меняет?