#spring-kafka
#spring-kafka
Вопрос:
Я создал класс Java, а метод имеет @KafkaListener. Но я хочу, чтобы этот метод вызывался только тогда, когда я нажимаю кнопку из пользовательского интерфейса. Когда я нажимаю кнопку из пользовательского интерфейса, вызов переходит к классу контроллера, а из класса контроллера я хочу вызвать этот класс. Как это можно сделать? Я не добавлял никаких аннотаций на уровне класса.
public class MessageListener implements ConsumerSeekAware {
private String topicName;
private int partition;
private long beginOffset;
private long endOffset;
public MessageListener(String topicName, int partition, long beginOffset, long endOffset) {
super();
this.topicName = topicName;
this.partition = partition;
this.beginOffset = beginOffset;
this.endOffset = endOffset;
}
@KafkaListener(topics = "topicName",
concurrency = "20",
clientIdPrefix = "clientId-Test",
groupId = "clientId-group")
public void handleMessage(final ConsumerRecord<Object, Object> consumerRecord) {
System.out.println(consumerRecord);
}
}
Вариант 2:
public List<String> searchMessages(String topicName, int partitionNo, long beginOffset, long endOffset) {
List<String> filteredMessages = new ArrayList<>();
TopicPartition tp = new TopicPartition("topicName", partitionNo);
Properties clusterOneProps = kafkaConsumerConfig.getConsumerProperties();
KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(clusterOneProps);
try {
consumer.subscribe(Collections.singletonList("topicName"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// TODO Auto-generated method stub
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// TODO Auto-generated method stub
consumer.seek(tp, beginOffset);
}
});
Thread.sleep(100);
boolean flag = true;
System.out.println("search started......from offset is " beginOffset);
while(flag) {
ConsumerRecords<String, Object> crs = consumer.poll(Duration.ofMillis(100L));
for (ConsumerRecord<String, Object> record : crs) {
// search criteria
if(record.value().toString().contains("01111") amp;amp; record.value().toString().contains("2021-11-06")) {
System.out.println("founddddddddddddddddddddddddddddddddddddddd " record.offset());
filteredMessages.add(record.value().toString());
}
if (record.offset() == endOffset) {
flag = false;
break;
}
}
}
System.out.println("doneeeeeeeeeeeeeeeee");
}catch(Exception e) {
e.printStackTrace();
}finally {
consumer.close();
}
Комментарии:
1. Просто сделайте это
MessageListener
как компонент. Вставьте его в свой контроллер и вызовите этот метод со своей стороны MVC! В противном случае неясно, хотите ли вы по-прежнему получать доступ к Kafka или нет. Пожалуйста, уточните подробнее, какова ваша реальная цель. Это@KafkaListener
действительно предназначено для вызова контейнером прослушивателя Kafka при извлечении записей из темы. В чем причина вызова этого метода вручную, пока не ясно…2. @ArtemBilan — Моя цель: «У меня есть тема с 20 разделами и данными о сотрудниках. На экране поиска пользовательского интерфейса я передам несколько номеров сотрудников, теперь я хочу выполнить поиск по всем этим разделам, чтобы найти конкретные данные о сотрудниках, которых там нет. Если он соответствует, я хочу поместить его в отдельный список и загрузить как файл. Я заранее знаю beginOffset и endOffest каждого раздела. и посмотрите, насколько эффективно / быстро я могу это выполнить.
Ответ №1:
Теперь я хочу выполнить поиск по всем этим разделам, чтобы найти конкретные данные о сотрудниках, которых там нет.
Тогда @KafkaListener
это не для вас. Похоже, что ваше требование выполняется по требованию, когда запрос выполнен. Это @KafkaListener
вещь сама по себе: она полагается на постоянный цикл опроса внизу. Это полностью не зависит от запросов конечного пользователя.
Вам нужно подумать о создании KafkaConsumer
«на лету» на основе данных, полученных в веб-запросе. ConsumerFactory
Вместо этого смотрите, как использовать его напрямую для создания этих потребителей. Затем вы можете вызвать poll()
этого потребителя и закрыть его в конце. Однако это уже выходит за рамки Spring для Apache Kafka: https://docs.spring.io/spring-kafka/docs/current/reference/html/#connecting
Или… Вы можете обновить до последней spring-kafka-2.8.0
версии и использовать KafkaTemplate.receive()
API: https://docs.spring.io/spring-kafka/docs/current/reference/html/#kafka-template-receive
Комментарии:
1. Я попытался, создав KafkaConsumer и опросив записи, выполнив поиск beginOffset . Но поскольку KafkaConsumer не является потокобезопасным, я могу выполнять поиск только по одному разделу за раз. Таким образом, для завершения 20 разделов требуется больше времени. Пожалуйста, ознакомьтесь с моей логикой в вопросе (вариант 2). Как можно добиться этого более быстрым способом. Как реализовать многопоточность с помощью KafkaConsumer?
2. Вы не можете.
KafkaConsumer
на самом деле предназначен только для использования в одном потоке. Не похоже, что то, о чем вы говорите, предназначено для Kafka как хранилища. Подумайте о том, чтобы иметь некоторую базу данных между ними. Kafka — это брокер обмена сообщениями, его нельзя искать простым способом по запросу.