#apache-kafka #kafka-producer-api
#apache-kafka #kafka-producer-api
Вопрос:
я немного смущен конфигурацией размера сообщения в Kafka 2.6.0. Но давайте расскажем историю:
Мы используем кластер Kafka, состоящий из 3 узлов. Пока со стандартной конфигурацией для сообщений. Активировано «сжатие zstd».
Соответствующая конфигурация брокера проста:
compression.type=zstd
Конфигурация производителя также проста на этом этапе:
compression.type=zstd
Теперь мы хотим поместить сообщения размером 8 Мбайт в определенную тему. Сжатый размер этих данных составляет всего 200 кбайт.
Если я помещу эти данные в тему, возникнет следующая ошибка:
sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --broker-list broker < kafka/new 2.txt
[2020-11-05 13:43:34,500] ERROR Error when sending message to topic XXX with key: null, value: 8722456 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.RecordTooLargeException: The message is 8722544 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
Итак, я изменил конфигурацию производителя следующим образом:
compression.type=zstd
max.request.size=10485760
Теперь производитель принимает сообщения большего размера. Но это все равно не работает:
sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --broker-list broker < kafka/new 2.txt
[2020-11-05 15:10:01,513] ERROR Error when sending message to topic Komsa.Kafka.Test with key: null, value: 8722544 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
Это еще одно сообщение об ошибке. Я не понимаю, почему это происходит.
Я думаю, что это сообщение связано со свойством «message.max.bytes». Но я не понимаю, почему. Это документация для этого свойства:
Наибольший размер пакета записей, разрешенный Kafka (после сжатия, если сжатие включено). Если это значение увеличено и есть потребители старше 0.10.2, размер выборки потребителей также должен быть увеличен, чтобы они могли извлекать пакеты записей такого размера. В последней версии формата сообщений записи всегда группируются в пакеты для повышения эффективности. В предыдущих версиях формата сообщений несжатые записи не группировались в пакеты, и в этом случае это ограничение применяется только к одной записи.Это может быть установлено для каждой темы с помощью конфигурации уровня темы max.message.bytes.
Я думаю, это означает, что параметр связан с размером сжатого сообщения, который составляет несколько кбайт.
Кто-нибудь может мне помочь?
Ответ №1:
Наш опыт показывает, что если вы задаете тип сжатия на уровне брокера, как вы делали с
compression.type=zstd
брокер распакует все, что поступает от производителя, и снова сожмет данные, используя этот тип сжатия. Даже если производитель уже использовал zstd, произойдет распаковка и «повторное сжатие».
Поэтому вам необходимо установить значение compression.type на уровне брокера producer
равным .
Ответ №2:
Я нашел решение:
Проблема в kafka-console-producer.sh игнорирует сжатие.введите в конфигурации производителя. Если я явно вызываю
sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --compression-codec=zstd --broker-list broker < kafka/new 2.txt
с compression.codec=zstd это работает, потому что производитель сжал сообщение.