Клиент производителя Кафки: Будут ли все обратные вызовы выполняться до возврата flush()

#java #apache-kafka #kafka-producer-api

#Ява #апач-кафка #kafka-producer-api

Вопрос:

Я хочу проверить статус отправки записей Кафке. Мне нужно убедиться, что все записи были успешно сохранены в Кафке. Я думал об использовании механизма обратного вызова, например, о создании класса обратного вызова, такого как

 public class MyCallback implememts Callback {  private AtomicReferencelt;Exceptiongt; exceptionRef;  public MyCallback(){  exceptionRef=new AtomicReferencelt;gt;(null);  }  @Override  public void onCompletion(final RecordMetadata metadata,final Exception exception){  if (exception!=null){  exceptionRef.set(exception);  }  }  public void check()  throws Exception  {  Exception exception=exceptionRef.get();  if (exception!=null) throw exception;  } }  

а затем есть основная программа, такая как

 try{  Producerlt;Object,Objectgt; producer=new KafkaProducerlt;gt;(props);  MyCallback callback=new MyCallback();  for (ProducerRecordlt;Object,Objectgt; rec:myRecords){  producer.send(rec,callback);  }  producer.flush();  callback.check(); } catch(Exception e){  handle(e); }  

Мой вопрос: Могу ли я быть уверен, что обратный вызов был вызван для всех отправленных записей при flush() возврате?

Я должен добавить, что используется эта настройка acks=all .

Комментарии:

1. Сброс действительно гарантирует только очистку буфера производителя. Не обязательно, что брокер проверяет записи из буфера. Вам нужно будет установить acks=1 или all для этого

2. Я отредактировал вопрос, чтобы было ясно, что acks=all используется.