#java #spring #apache-kafka #spring-kafka #kafka-producer-api
#java #spring #apache-kafka #spring-kafka #kafka-producer-api
Вопрос:
У меня есть RestController, который вызывает KafkaSendMethod() для отправки сообщения в Kafka MQ
@RestController
@RequestMapping("string-rest")
public class SimpleBookRestController {
@Autowire
private KafkaSendClass kafkaSendClass;
@GetMapping( produces = "application/json")
public void postMethod() {
ArrayList<String> gfg = new ArrayList<String>();
gfg.add("a");
gfg.add("b");
gfg.add("c");
kafkaSendClass.KafkaSendMethod(gfg);
}
Вот класс, который отправляет сообщение в Kafka
@Service
class KafkaSendClass
{
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
void KafkaSendMethod(List<Strings> strList)
{
for ( String str : strList )
(
ListenableFuture<SendResult<String,String>> future = kafkaTemplate.send(str , str );
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
syso("sent success");
m1(); //call only once for one call to KafkaSendMethod() ;
}
@Override
public void onFailure(Throwable ex) {
System.out.println(" sending failed");
}
});
);
Итак, мой вопрос в том, как мне вызывать m1() один раз для каждого вызова KafkaSendMethod(List strList).
Комментарии:
1. Неясно, о чем вы спрашиваете: метод GET не предназначен для отправки данных в Kafka. Не ясно, откуда вы это взяли,
List<Books>
поскольку вашgetBook()
пуст. Вы не показываете, как на самом деле вы вызываетеKafkaTemplate
для этогоbooks
. Поэтому, пожалуйста, рассмотрите возможность пересмотра вашего вопроса с более подробной информацией и более кратким потоком кода.2. @Artem Bilan я изменил свой код / Допустим, я отправляю список строк в KafkaSendMethod(). В i я хочу использовать m1() только один раз в методе onsucess. Как я могу этого добиться. Например. если у меня есть 3 строки в списке, и onSuccess будет вызван 3 раза, тогда мне нужно вызвать метод m1() только 1 раз. Как я могу этого добиться?
Ответ №1:
Посмотрим, сможете ли вы использовать CompletableFuture.allOf(...)
.
Соберите эти Future
файлы в список и используйте их completable()
адаптацию. Затем вызвать List.toArray()
для этого CompletableFuture
. CompletableToListenableFutureAdapter
для добавления вашего окончательного обратного вызова.
Комментарии:
1. Спасибо за указатель. Я думаю, что это похоже на то, о чем я думал. Ожидание завершения всех фьючерсов с использованием CountDownLatch.
2. Да… Разница только в том, что вы не блокируете свой поток для этого обратного отсчета. Он будет выполняться асинхронно, когда все базовые
Futures
будут завершены.