Потребитель Apache Camel 2.18.0 не использует

#java #apache-kafka #apache-camel

#java #apache-kafka #apache-camel

Вопрос:

Мы используем apache camel до версии 2.17.* для использования параметра maxPollRecords я пытаюсь обновить до 2.18.0. После обновления до 2.18.0 потребитель, похоже, больше не распознается брокером. Ниже приведен пример потребителя, который я пытался создать. Я мог бы создать сообщение из cli для темы, и если бы я создал потребителя в cli, я мог бы видеть, что потребитель, созданный в cli, потребляет сообщение, но не потребитель, созданный через apache camel.

Также с помощью команды cli «Описать группу потребителей» я мог бы видеть идентификатор потребителя пустым, если бы я запускал только экземпляр потребителя apache camel. Пока я работал с 2.17.5, брокер распознавал и назначал это разделу. Я не могу найти пример, пожалуйста, помогите.

 package com.test;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.properties.PropertiesComponent;
import org.apache.camel.impl.DefaultCamelContext;

public class CamelConsumer {

public static void main(String argv[]){
CamelContext camelContext = new DefaultCamelContext();

// Add route to send messages to Kafka

try {
camelContext.addRoutes(new RouteBuilder() {
public void configure() {
PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class);
pc.setLocation("classpath:application.properties");

System.out.println("About to start route: Kafka Server -> Log ");


from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
  "amp;maxPollRecords={{consumer.maxPollRecords}}"   "amp;consumersCount={{consumer.consumersCount}}"
  "amp;groupId={{consumer.group}}").routeId("FromKafka")
.process(new Processor() {

@Override
public void process(Exchange exchange) throws Exception {

Message message = exchange.getIn();
Object data = message.getBody();

System.out.println(data);
}
});
}
});

camelContext.start();

Thread.sleep(5 * 60 * 1000);

camelContext.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
  

Я также не получаю никаких исключений. Я не нахожу никакой документации, связанной с этим. Пожалуйста, помогите.

потребитель.тема= тест кафка.хост=локальный хост кафка.порт =9092 потребитель.maxPollRecords=1 потребитель.consumersCount = 1 потребитель.группа= тест

Комментарии:

1. Где вы можете решить эту проблему?

2. Я смог найти пример программы из 2.19 в github.com/Talend/apache-camel/branches (Разветвленная ветвь) ссылка. Я мог видеть, что они работают нормально. Но теперь, когда я пытаюсь реализовать то же самое в приложении во время выполнения команды describe для группы, которую я запустил, я всегда могу видеть идентификатор потребителя, указанный как пустой РАЗДЕЛ ТЕМЫ ГРУППЫ, ЖУРНАЛ ТЕКУЩЕГО СМЕЩЕНИЯ, ЗАДЕРЖКА КОНЕЧНОГО СМЕЩЕНИЯ, ИДЕНТИФИКАТОР ПОТРЕБИТЕЛЯ, идентификатор КЛИЕНТА ХОСТА

Ответ №1:

Я мог бы найти рабочий пример кода из 2.19 * в следующем репозитории. https://github.com/Talend/apache-camel/branches (Разветвленная ветвь)

https://github.com/apache/camel (фактическая ветка camel)

Наконец, это сработало с версией 2.21.5, и мне пришлось увеличить версию apache kafka maven с 0.9 до 1.0.0*