Соединитель Kafka Elasticsearch для массовых операций

#elasticsearch #apache-kafka #apache-kafka-connect

#elasticsearch #apache-kafka #apache-kafka-connect

Вопрос:

Я использую соединитель приемника Elasticsearch для операций (индексирования, обновления, удаления) с отдельными записями.

Elasticsearch также имеет конечную точку /_bulk, которую можно использовать для создания, обновления, индексирования или удаления нескольких записей одновременно. Документация здесь.

Поддерживает ли соединитель Elasticsearch Sink эти типы массовых операций? Если да, то какая конфигурация мне нужна или есть ли какой-либо пример кода, который я могу просмотреть?

Ответ №1:

Внутри Elasticsearch sink connector создает массовый процессор, который используется для отправки записей в пакетном режиме. Для управления этим процессором необходимо настроить следующие свойства:

  • batch.size : Количество записей, обрабатываемых как пакет при записи в Elasticsearch.
  • max.in.flight.requests : Максимальное количество запросов на индексацию, которые могут быть отправлены в Elasticsearch, прежде чем блокировать дальнейшие запросы.
  • max.buffered.records : Максимальное количество записей, которые каждая задача будет буферизовать, прежде чем блокировать прием дополнительных записей. Эту конфигурацию можно использовать для ограничения использования памяти для каждой задачи.
  • linger.ms : Записи, поступающие между отправками запросов, объединяются в один запрос на массовую индексацию на основе batch.size конфигурации. Обычно это происходит только при загрузке, когда записи поступают быстрее, чем они могут быть отправлены. Однако может быть желательно уменьшить количество запросов даже при небольшой нагрузке и извлечь выгоду из массового индексирования. Этот параметр помогает достичь этого — когда ожидающий пакет не заполнен, вместо немедленной отправки его задача будет ждать до заданной задержки, чтобы разрешить добавление других записей, чтобы их можно было объединить в один запрос.
  • flush.timeout.ms : Тайм-аут в миллисекундах, используемый для периодической очистки и при ожидании, пока буферное пространство будет доступно по завершенным запросам при добавлении записей. Если это время ожидания превышено, задача завершится ошибкой.