Потребитель Kafka пропускает сообщения при использовании сообщений в цикле

#java #apache-kafka #kafka-consumer-api #talend

#java #apache-kafka #kafka-consumer-api #talend

Вопрос:

Я запускаю свой потребительский код в цикле из-за ограничений памяти, фиксирую свои данные и затем загружаю в таблицы

Ниже приведен код, который будет выполняться в цикле

 // here is the main part of the component,
// a piece of code executed in the row
// loop
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
System.out.println("Consumer created");
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic "   topic);
try {
    while (pollFlag) {
    ConsumerRecords<String, String> records = consumer.poll(context.consumer_polltime);
     if (records.isEmpty()) {
     globalMap.put("emptyRecordsFlag",false); //Passing the flag value to previous component to end loop
            break;
        }
        for (ConsumerRecord<String, String> record : records) {
            listPayload.add(record.value()); // Adding the messages to list
            i  ;
            if(i>=msgbtch)
            {
                pollFlag = false; // Assigning flag value to end the poll at 5000 messages
                break;
            }       
        }
    }
globalMap.put("ConsumerObj",consumer);  
            
}   catch (Exception e) {
            System.out.println("Error Consuming Msg: "   e);
            // TODO: handle exception
            //consumer.close();
    }
row3.payload= String.valueOf(listPayload); // Passing the message data to next component
System.out.println("Committing");
consumer.commitSync();
System.out.println("Closing");
consumer.close();
 

Но по какой-то причине я, похоже, пропускаю несколько сообщений. Я считаю, что это должно как-то повлиять на перебалансировку / фиксацию потребителя.

Как я могу проверить, готов ли мой потребитель использовать следующую партию сообщений с самого начала, не пропуская ни одного сообщения?

Комментарии:

1. Есть ли другие потребители с тем же идентификатором группы? Вы уверены, что производитель действительно отправил сообщения, которые вы ищете?

2. Я сам отправил 5000 сообщений в тему и запустил свое приложение. Но в пакетном режиме таблица загружена только 4917 сообщениями, тогда как 5000 сообщений загружаются при загрузке за один раз.

3. Опять же, как вы проверили, что все 5000 сообщений успешно поступили в тему?

4. Я проверил это в центре управления. Он показывает, что 5000 сообщений отстают. Я смог самостоятельно разобраться с проблемой. Я только что удалил разрыв в приведенном ниже условии if if(i>=msgbtch) { pollFlag = false; // Assigning flag value to end the poll at 5000 messages break; } Вышеупомянутый разрыв прерывал цикл for еще до того, как все сообщения в записях были загружены в список.

Ответ №1:

Обновление: я смог самостоятельно разобраться с проблемой. Сообщения уже загружены в записи и во время цикла, поскольку я поставил следующее условие

 if(i>=msgbtch)
            {
                pollFlag = false; // Assigning flag value to end the poll at 5000 messages
                break;
            }     
 

Даже перед размещением всех сообщений в списке цикл прерывается, и все сообщения из записей не вставляются в список. Я удалил условие прерывания, и оно работает нормально