Как вызвать метод spring KafkaListener вручную

#spring-kafka

#spring-kafka

Вопрос:

Я создал класс Java, а метод имеет @KafkaListener. Но я хочу, чтобы этот метод вызывался только тогда, когда я нажимаю кнопку из пользовательского интерфейса. Когда я нажимаю кнопку из пользовательского интерфейса, вызов переходит к классу контроллера, а из класса контроллера я хочу вызвать этот класс. Как это можно сделать? Я не добавлял никаких аннотаций на уровне класса.

 public class MessageListener implements ConsumerSeekAware {

   
    private String topicName;
     
     private int partition;
     
     private long beginOffset;
     
     private long endOffset;
 
  
    public MessageListener(String topicName, int partition, long beginOffset, long endOffset) {
        super();
        this.topicName = topicName;
        this.partition = partition;
        this.beginOffset = beginOffset;
        this.endOffset = endOffset;
    }
 

    @KafkaListener(topics = "topicName", 
    concurrency = "20",
    clientIdPrefix = "clientId-Test",
    groupId = "clientId-group")
    public void handleMessage(final ConsumerRecord<Object, Object> consumerRecord) {
        System.out.println(consumerRecord);
    }
 

}

Вариант 2:

        public List<String> searchMessages(String topicName, int partitionNo, long beginOffset, long endOffset) {

    List<String> filteredMessages = new ArrayList<>();
    TopicPartition tp = new TopicPartition("topicName", partitionNo);
    Properties clusterOneProps = kafkaConsumerConfig.getConsumerProperties();
    KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(clusterOneProps);

    try {
        consumer.subscribe(Collections.singletonList("topicName"), new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            // TODO Auto-generated method stub
        }
        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            // TODO Auto-generated method stub
            
            consumer.seek(tp, beginOffset); 
        }
    }); 
    Thread.sleep(100);    
    boolean flag = true;
    System.out.println("search started......from offset is " beginOffset);
    while(flag) {   
    ConsumerRecords<String, Object> crs = consumer.poll(Duration.ofMillis(100L));
     for (ConsumerRecord<String, Object> record : crs) {
              // search criteria
               if(record.value().toString().contains("01111") amp;amp; record.value().toString().contains("2021-11-06")) {
                   System.out.println("founddddddddddddddddddddddddddddddddddddddd " record.offset());
                   filteredMessages.add(record.value().toString());
               }
               if (record.offset() == endOffset) {
                   flag = false;
                   break;
               }
        }
     }
     System.out.println("doneeeeeeeeeeeeeeeee");
    }catch(Exception e) {
        e.printStackTrace();
    }finally {
        consumer.close();
    }
 

Комментарии:

1. Просто сделайте это MessageListener как компонент. Вставьте его в свой контроллер и вызовите этот метод со своей стороны MVC! В противном случае неясно, хотите ли вы по-прежнему получать доступ к Kafka или нет. Пожалуйста, уточните подробнее, какова ваша реальная цель. Это @KafkaListener действительно предназначено для вызова контейнером прослушивателя Kafka при извлечении записей из темы. В чем причина вызова этого метода вручную, пока не ясно…

2. @ArtemBilan — Моя цель: «У меня есть тема с 20 разделами и данными о сотрудниках. На экране поиска пользовательского интерфейса я передам несколько номеров сотрудников, теперь я хочу выполнить поиск по всем этим разделам, чтобы найти конкретные данные о сотрудниках, которых там нет. Если он соответствует, я хочу поместить его в отдельный список и загрузить как файл. Я заранее знаю beginOffset и endOffest каждого раздела. и посмотрите, насколько эффективно / быстро я могу это выполнить.

Ответ №1:

Теперь я хочу выполнить поиск по всем этим разделам, чтобы найти конкретные данные о сотрудниках, которых там нет.

Тогда @KafkaListener это не для вас. Похоже, что ваше требование выполняется по требованию, когда запрос выполнен. Это @KafkaListener вещь сама по себе: она полагается на постоянный цикл опроса внизу. Это полностью не зависит от запросов конечного пользователя.

Вам нужно подумать о создании KafkaConsumer «на лету» на основе данных, полученных в веб-запросе. ConsumerFactory Вместо этого смотрите, как использовать его напрямую для создания этих потребителей. Затем вы можете вызвать poll() этого потребителя и закрыть его в конце. Однако это уже выходит за рамки Spring для Apache Kafka: https://docs.spring.io/spring-kafka/docs/current/reference/html/#connecting

Или… Вы можете обновить до последней spring-kafka-2.8.0 версии и использовать KafkaTemplate.receive() API: https://docs.spring.io/spring-kafka/docs/current/reference/html/#kafka-template-receive

Комментарии:

1. Я попытался, создав KafkaConsumer и опросив записи, выполнив поиск beginOffset . Но поскольку KafkaConsumer не является потокобезопасным, я могу выполнять поиск только по одному разделу за раз. Таким образом, для завершения 20 разделов требуется больше времени. Пожалуйста, ознакомьтесь с моей логикой в вопросе (вариант 2). Как можно добиться этого более быстрым способом. Как реализовать многопоточность с помощью KafkaConsumer?

2. Вы не можете. KafkaConsumer на самом деле предназначен только для использования в одном потоке. Не похоже, что то, о чем вы говорите, предназначено для Kafka как хранилища. Подумайте о том, чтобы иметь некоторую базу данных между ними. Kafka — это брокер обмена сообщениями, его нельзя искать простым способом по запросу.