#java #apache-kafka #apache-camel
#java #apache-kafka #apache-camel
Вопрос:
Мы используем apache camel до версии 2.17.* для использования параметра maxPollRecords я пытаюсь обновить до 2.18.0. После обновления до 2.18.0 потребитель, похоже, больше не распознается брокером. Ниже приведен пример потребителя, который я пытался создать. Я мог бы создать сообщение из cli для темы, и если бы я создал потребителя в cli, я мог бы видеть, что потребитель, созданный в cli, потребляет сообщение, но не потребитель, созданный через apache camel.
Также с помощью команды cli «Описать группу потребителей» я мог бы видеть идентификатор потребителя пустым, если бы я запускал только экземпляр потребителя apache camel. Пока я работал с 2.17.5, брокер распознавал и назначал это разделу. Я не могу найти пример, пожалуйста, помогите.
package com.test;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.properties.PropertiesComponent;
import org.apache.camel.impl.DefaultCamelContext;
public class CamelConsumer {
public static void main(String argv[]){
CamelContext camelContext = new DefaultCamelContext();
// Add route to send messages to Kafka
try {
camelContext.addRoutes(new RouteBuilder() {
public void configure() {
PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class);
pc.setLocation("classpath:application.properties");
System.out.println("About to start route: Kafka Server -> Log ");
from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
"amp;maxPollRecords={{consumer.maxPollRecords}}" "amp;consumersCount={{consumer.consumersCount}}"
"amp;groupId={{consumer.group}}").routeId("FromKafka")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
Object data = message.getBody();
System.out.println(data);
}
});
}
});
camelContext.start();
Thread.sleep(5 * 60 * 1000);
camelContext.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Я также не получаю никаких исключений. Я не нахожу никакой документации, связанной с этим. Пожалуйста, помогите.
потребитель.тема= тест кафка.хост=локальный хост кафка.порт =9092 потребитель.maxPollRecords=1 потребитель.consumersCount = 1 потребитель.группа= тест
Комментарии:
1. Где вы можете решить эту проблему?
2. Я смог найти пример программы из 2.19 в github.com/Talend/apache-camel/branches (Разветвленная ветвь) ссылка. Я мог видеть, что они работают нормально. Но теперь, когда я пытаюсь реализовать то же самое в приложении во время выполнения команды describe для группы, которую я запустил, я всегда могу видеть идентификатор потребителя, указанный как пустой РАЗДЕЛ ТЕМЫ ГРУППЫ, ЖУРНАЛ ТЕКУЩЕГО СМЕЩЕНИЯ, ЗАДЕРЖКА КОНЕЧНОГО СМЕЩЕНИЯ, ИДЕНТИФИКАТОР ПОТРЕБИТЕЛЯ, идентификатор КЛИЕНТА ХОСТА
Ответ №1:
Я мог бы найти рабочий пример кода из 2.19 * в следующем репозитории. https://github.com/Talend/apache-camel/branches (Разветвленная ветвь)
https://github.com/apache/camel (фактическая ветка camel)
Наконец, это сработало с версией 2.21.5, и мне пришлось увеличить версию apache kafka maven с 0.9 до 1.0.0*