#java #apache-kafka #kafka-producer-api
#java #apache-kafka #kafka-producer-api
Вопрос:
Я открываю производителя kafka со свойствами конфигурации —
KafkaProducer<String, MyValue> producer = new KafkaProducer<String, MyValue>(kafkaProperties);
затем синхронная отправка записей с использованием — (чтобы избежать пакетной обработки, а также сохранить исходный порядок сообщений)
//create myValue instance //omited for simplicity
//create myrecord instance using topicname and myvalue
producer.send(myRecord).get();
producer.flush(); //send message as soon as record is available to producer
теперь моя проблема в том, что у меня есть несколько записей для отправки, и между отправками мне, возможно, придется долго ждать — от нескольких минут до нескольких часов (по каким-либо причинам, по крайней мере, чтобы лучше изучить и понять kafka).
Я хочу знать, как долго будет работать соединение производителя с сервером кластера / начальной загрузки. В любом случае я могу настроить его, используя конфигурации производителя.
(Подробные объяснения будут очень благодарны — даже если это должно перейти на уровни TCP-соединений, добро пожаловать)
(у потребителей kafka есть концепция сердцебиения. Есть ли у производителей аналогичная концепция. Поиск в Google для «производителя kafka heartbeat.interval.ms «возвращен только результат для потребителя).
Ответ №1:
Метод KafkaProducer.send является асинхронным, по умолчанию он добавляет все записи в буферную память и отправляет их сразу, поэтому, согласно документам, производитель устанавливает соединение при отправке пакета в кластер
Метод send() является асинхронным. При вызове он добавляет запись в буфер ожидающих отправки записей и немедленно возвращает. Это позволяет производителю объединять отдельные записи для повышения эффективности.
Производитель поддерживает буферы неотправленных записей для каждого раздела. Эти буферы имеют размер, указанный конфигурацией batch.size . Увеличение этого размера может привести к увеличению количества пакетов, но требует больше памяти (поскольку у нас обычно будет один из этих буферов для каждого активного раздела).
По умолчанию буфер доступен для немедленной отправки, даже если в буфере есть дополнительное неиспользуемое пространство. Однако, если вы хотите уменьшить количество запросов, вы можете установить linger.ms к чему-то большему, чем 0.
Это даст указание производителю подождать до указанного количества миллисекунд перед отправкой запроса в надежде, что поступит больше записей для заполнения того же пакета. Это аналогично алгоритму Нэгла в TCP.
Например, в приведенном выше фрагменте кода, вероятно, все 100 записей будут отправлены в одном запросе, поскольку мы установили время задержки в 1 миллисекунду. Однако этот параметр добавит 1 миллисекунду задержки к нашему запросу, ожидающему поступления дополнительных записей, если мы не заполнили буфер.
Обратите внимание, что записи, которые поступают близко друг к другу по времени, обычно объединяются вместе даже с задержкой.ms= 0, поэтому при большой нагрузке пакетирование будет происходить независимо от конфигурации задержки; однако установка этого значения на что-то большее, чем 0, может привести к меньшему количеству, более эффективных запросов, когда они не находятся под максимальной нагрузкой, за счет небольшогоколичество задержек.
От KafkaProducer.сброс, вызов flush не означает, что производитель отправляет каждую запись в кластер, вызов flush делает все буферизованные записи немедленно доступными для отправки
Вызов этого метода делает все буферизованные записи немедленно доступными для отправки (даже если linger.ms больше 0) и блокирует выполнение запросов, связанных с этими записями. Постусловие flush() заключается в том, что любая ранее отправленная запись будет завершена (например, Future.isDone() == true). Запрос считается завершенным, когда он успешно подтвержден в соответствии с указанной вами конфигурацией acks, в противном случае это приводит к ошибке.
Комментарии:
1. в качестве примечания — без
producer.flush()
,producer.send(topicname, myKey, myValue).get();
также приведет к пакетной обработке.2. обновил мой ответ, и да, записи всегда будут отправляться в пакетном режиме с использованием flush или нет @Vamsh
3. Я вижу, что это намного больше.
flush makes all buffered records immediately available to send
итак, что будет делать send, когда он действительно отправляет.4. это обсуждение идет по другому пути — я согласен. Но это более важно уточнить.
5. Согласно пониманию, «доступно для отправки» означает отправку их в кластер kafka, а не повторное предоставление доступа к методу отправки kafka @Vamsh