Как долго производитель kafka остается в живых между сообщениями?

#java #apache-kafka #kafka-producer-api

#java #apache-kafka #kafka-producer-api

Вопрос:

Я открываю производителя kafka со свойствами конфигурации —

 KafkaProducer<String, MyValue>   producer = new KafkaProducer<String, MyValue>(kafkaProperties);
 

затем синхронная отправка записей с использованием — (чтобы избежать пакетной обработки, а также сохранить исходный порядок сообщений)

    //create myValue instance //omited for simplicity
   //create myrecord instance using topicname and myvalue
   producer.send(myRecord).get();
   producer.flush();  //send message as soon as record is available to producer
 

теперь моя проблема в том, что у меня есть несколько записей для отправки, и между отправками мне, возможно, придется долго ждать — от нескольких минут до нескольких часов (по каким-либо причинам, по крайней мере, чтобы лучше изучить и понять kafka).

Я хочу знать, как долго будет работать соединение производителя с сервером кластера / начальной загрузки. В любом случае я могу настроить его, используя конфигурации производителя.
(Подробные объяснения будут очень благодарны — даже если это должно перейти на уровни TCP-соединений, добро пожаловать)

(у потребителей kafka есть концепция сердцебиения. Есть ли у производителей аналогичная концепция. Поиск в Google для «производителя kafka heartbeat.interval.ms «возвращен только результат для потребителя).

Ответ №1:

Метод KafkaProducer.send является асинхронным, по умолчанию он добавляет все записи в буферную память и отправляет их сразу, поэтому, согласно документам, производитель устанавливает соединение при отправке пакета в кластер

Метод send() является асинхронным. При вызове он добавляет запись в буфер ожидающих отправки записей и немедленно возвращает. Это позволяет производителю объединять отдельные записи для повышения эффективности.

Производитель поддерживает буферы неотправленных записей для каждого раздела. Эти буферы имеют размер, указанный конфигурацией batch.size . Увеличение этого размера может привести к увеличению количества пакетов, но требует больше памяти (поскольку у нас обычно будет один из этих буферов для каждого активного раздела).

По умолчанию буфер доступен для немедленной отправки, даже если в буфере есть дополнительное неиспользуемое пространство. Однако, если вы хотите уменьшить количество запросов, вы можете установить linger.ms к чему-то большему, чем 0.

Это даст указание производителю подождать до указанного количества миллисекунд перед отправкой запроса в надежде, что поступит больше записей для заполнения того же пакета. Это аналогично алгоритму Нэгла в TCP.

Например, в приведенном выше фрагменте кода, вероятно, все 100 записей будут отправлены в одном запросе, поскольку мы установили время задержки в 1 миллисекунду. Однако этот параметр добавит 1 миллисекунду задержки к нашему запросу, ожидающему поступления дополнительных записей, если мы не заполнили буфер.

Обратите внимание, что записи, которые поступают близко друг к другу по времени, обычно объединяются вместе даже с задержкой.ms= 0, поэтому при большой нагрузке пакетирование будет происходить независимо от конфигурации задержки; однако установка этого значения на что-то большее, чем 0, может привести к меньшему количеству, более эффективных запросов, когда они не находятся под максимальной нагрузкой, за счет небольшогоколичество задержек.

От KafkaProducer.сброс, вызов flush не означает, что производитель отправляет каждую запись в кластер, вызов flush делает все буферизованные записи немедленно доступными для отправки

Вызов этого метода делает все буферизованные записи немедленно доступными для отправки (даже если linger.ms больше 0) и блокирует выполнение запросов, связанных с этими записями. Постусловие flush() заключается в том, что любая ранее отправленная запись будет завершена (например, Future.isDone() == true). Запрос считается завершенным, когда он успешно подтвержден в соответствии с указанной вами конфигурацией acks, в противном случае это приводит к ошибке.

Комментарии:

1. в качестве примечания — без producer.flush() , producer.send(topicname, myKey, myValue).get(); также приведет к пакетной обработке.

2. обновил мой ответ, и да, записи всегда будут отправляться в пакетном режиме с использованием flush или нет @Vamsh

3. Я вижу, что это намного больше. flush makes all buffered records immediately available to send итак, что будет делать send, когда он действительно отправляет.

4. это обсуждение идет по другому пути — я согласен. Но это более важно уточнить.

5. Согласно пониманию, «доступно для отправки» означает отправку их в кластер kafka, а не повторное предоставление доступа к методу отправки kafka @Vamsh