Выполнить обратный вызов метода Kafka

#java #spring #apache-kafka #spring-kafka #kafka-producer-api

#java #spring #apache-kafka #spring-kafka #kafka-producer-api

Вопрос:

У меня есть RestController, который вызывает KafkaSendMethod() для отправки сообщения в Kafka MQ

  @RestController
@RequestMapping("string-rest")
public class SimpleBookRestController {

   @Autowire
    private KafkaSendClass kafkaSendClass;
    
    @GetMapping( produces = "application/json")
    public void postMethod() {
    
    ArrayList<String> gfg = new ArrayList<String>(); 

    gfg.add("a"); 
    gfg.add("b"); 
    gfg.add("c");
    
    kafkaSendClass.KafkaSendMethod(gfg);
    
    }
  

Вот класс, который отправляет сообщение в Kafka

 @Service
    class KafkaSendClass
    {
    
    @Autowired
private KafkaTemplate<String, String> kafkaTemplate;


    void KafkaSendMethod(List<Strings> strList)
    {
    
    for ( String str : strList )
    (
    ListenableFuture<SendResult<String,String>>   future = kafkaTemplate.send(str , str );
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
    
        @Override
        public void onSuccess(SendResult<String, String> result) {
            syso("sent success");
            m1(); //call only once for one call to KafkaSendMethod() ;
        }
    
        @Override
        public void onFailure(Throwable ex) {
            System.out.println(" sending failed");
        }
    });
  

);
Итак, мой вопрос в том, как мне вызывать m1() один раз для каждого вызова KafkaSendMethod(List strList).

Комментарии:

1. Неясно, о чем вы спрашиваете: метод GET не предназначен для отправки данных в Kafka. Не ясно, откуда вы это взяли, List<Books> поскольку ваш getBook() пуст. Вы не показываете, как на самом деле вы вызываете KafkaTemplate для этого books . Поэтому, пожалуйста, рассмотрите возможность пересмотра вашего вопроса с более подробной информацией и более кратким потоком кода.

2. @Artem Bilan я изменил свой код / Допустим, я отправляю список строк в KafkaSendMethod(). В i я хочу использовать m1() только один раз в методе onsucess. Как я могу этого добиться. Например. если у меня есть 3 строки в списке, и onSuccess будет вызван 3 раза, тогда мне нужно вызвать метод m1() только 1 раз. Как я могу этого добиться?

Ответ №1:

Посмотрим, сможете ли вы использовать CompletableFuture.allOf(...) .

Соберите эти Future файлы в список и используйте их completable() адаптацию. Затем вызвать List.toArray() для этого CompletableFuture . CompletableToListenableFutureAdapter для добавления вашего окончательного обратного вызова.

Комментарии:

1. Спасибо за указатель. Я думаю, что это похоже на то, о чем я думал. Ожидание завершения всех фьючерсов с использованием CountDownLatch.

2. Да… Разница только в том, что вы не блокируете свой поток для этого обратного отсчета. Он будет выполняться асинхронно, когда все базовые Futures будут завершены.