#java #apache-kafka #kafka-producer-api
#Ява #апач-кафка #kafka-producer-api
Вопрос:
Я хочу проверить статус отправки записей Кафке. Мне нужно убедиться, что все записи были успешно сохранены в Кафке. Я думал об использовании механизма обратного вызова, например, о создании класса обратного вызова, такого как
public class MyCallback implememts Callback { private AtomicReferencelt;Exceptiongt; exceptionRef; public MyCallback(){ exceptionRef=new AtomicReferencelt;gt;(null); } @Override public void onCompletion(final RecordMetadata metadata,final Exception exception){ if (exception!=null){ exceptionRef.set(exception); } } public void check() throws Exception { Exception exception=exceptionRef.get(); if (exception!=null) throw exception; } }
а затем есть основная программа, такая как
try{ Producerlt;Object,Objectgt; producer=new KafkaProducerlt;gt;(props); MyCallback callback=new MyCallback(); for (ProducerRecordlt;Object,Objectgt; rec:myRecords){ producer.send(rec,callback); } producer.flush(); callback.check(); } catch(Exception e){ handle(e); }
Мой вопрос: Могу ли я быть уверен, что обратный вызов был вызван для всех отправленных записей при flush()
возврате?
Я должен добавить, что используется эта настройка acks=all
.
Комментарии:
1. Сброс действительно гарантирует только очистку буфера производителя. Не обязательно, что брокер проверяет записи из буфера. Вам нужно будет установить
acks=1
илиall
для этого2. Я отредактировал вопрос, чтобы было ясно, что
acks=all
используется.