Размер сообщения Kafka с активированным сжатием

#apache-kafka #kafka-producer-api

#apache-kafka #kafka-producer-api

Вопрос:

я немного смущен конфигурацией размера сообщения в Kafka 2.6.0. Но давайте расскажем историю:

Мы используем кластер Kafka, состоящий из 3 узлов. Пока со стандартной конфигурацией для сообщений. Активировано «сжатие zstd».

Соответствующая конфигурация брокера проста:

 compression.type=zstd
  

Конфигурация производителя также проста на этом этапе:

 compression.type=zstd
  

Теперь мы хотим поместить сообщения размером 8 Мбайт в определенную тему. Сжатый размер этих данных составляет всего 200 кбайт.

Если я помещу эти данные в тему, возникнет следующая ошибка:

 sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --broker-list broker < kafka/new 2.txt

[2020-11-05 13:43:34,500] ERROR Error when sending message to topic XXX with key: null, value: 8722456 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.RecordTooLargeException: The message is 8722544 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
  

Итак, я изменил конфигурацию производителя следующим образом:

 compression.type=zstd
max.request.size=10485760
  

Теперь производитель принимает сообщения большего размера. Но это все равно не работает:

 sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --broker-list broker < kafka/new 2.txt

[2020-11-05 15:10:01,513] ERROR Error when sending message to topic Komsa.Kafka.Test with key: null, value: 8722544 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
  

Это еще одно сообщение об ошибке. Я не понимаю, почему это происходит.

Я думаю, что это сообщение связано со свойством «message.max.bytes». Но я не понимаю, почему. Это документация для этого свойства:

Наибольший размер пакета записей, разрешенный Kafka (после сжатия, если сжатие включено). Если это значение увеличено и есть потребители старше 0.10.2, размер выборки потребителей также должен быть увеличен, чтобы они могли извлекать пакеты записей такого размера. В последней версии формата сообщений записи всегда группируются в пакеты для повышения эффективности. В предыдущих версиях формата сообщений несжатые записи не группировались в пакеты, и в этом случае это ограничение применяется только к одной записи.Это может быть установлено для каждой темы с помощью конфигурации уровня темы max.message.bytes.

Я думаю, это означает, что параметр связан с размером сжатого сообщения, который составляет несколько кбайт.

Кто-нибудь может мне помочь?

Ответ №1:

Наш опыт показывает, что если вы задаете тип сжатия на уровне брокера, как вы делали с

 compression.type=zstd
  

брокер распакует все, что поступает от производителя, и снова сожмет данные, используя этот тип сжатия. Даже если производитель уже использовал zstd, произойдет распаковка и «повторное сжатие».

Поэтому вам необходимо установить значение compression.type на уровне брокера producer равным .

Ответ №2:

Я нашел решение:

Проблема в kafka-console-producer.sh игнорирует сжатие.введите в конфигурации производителя. Если я явно вызываю

 sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --compression-codec=zstd --broker-list broker < kafka/new 2.txt
  

с compression.codec=zstd это работает, потому что производитель сжал сообщение.