#spring-boot #rest #apache-kafka #kafka-consumer-api #spring-kafka
#spring-boot #rest #apache-kafka #kafka-consumer-api #spring-kafka
Вопрос:
У меня есть простая служба Spring boot, которая вызывается по требованию и использует указанное количество сообщений из темы. Количество потребляемых сообщений передается в качестве параметра. Служба вызывается каждые 30 минут. Размер каждого сообщения составляет ~ 1,6 кб. Когда я вызываю службу и передаю параметр 3000, я ожидаю, что будет возвращено 3000 тысяч сообщений, но я всегда получаю около 1100 или 1200 сообщений каждый раз. У меня есть одна тема только с одним разделом. Это служба по требованию, поэтому она не использует цикл while и установила время опроса на 30 секунд. Но ответ возвращается в течение 10 секунд, и количество возвращенных записей составляет ~ 1200, хотя MAX_POLL_RECORDS_CONFIG равно 3000, 4000 или 5000. Просто любопытно, существует ли какое-либо ограничение на размер сообщений, даже если время опроса составляет 30 секунд, как я могу добиться большей пропускной способности или приблизиться к пределу. Приведенное ниже выполняется при каждом вызове службы. вот как вызывается службаhttp://example.com/messages?limit=5000
Properties p = new Properties();
//limit is the value coming in as a query paramter and can be 3000, 4000 or 5000
p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, limit);
p.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG , 15000);
p.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG , 22 * 1024 * 1024);
p.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG , 50 * 1024 * 1024);
p.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG , 500);
p.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG , 50 * 1024 * 1024);
consumer = consumerFactory.createConsumer("my-group-id", null, null, p);
consumer.assign(Collections.singleton(new TopicPartition("test-topic", 0)));
ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofSeconds(30));
// processing the messages somehow I always get ~1200 messages
.........................................
.........................................
.................................
consumer.commitAsync();
// return list of messages
Спасибо
Комментарии:
1. Вы устанавливаете
ConsumerConfig.FETCH_MIN_BYTES_CONFIG
дважды, и в итоге получается одно и то же значение конфигурации сConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG
. Вы уверены, что ваши сообщения занимают 1,6 КБ?2. Я изменил его, но все тот же эффект не может использовать более 1200 сообщений p.put(ConsumerConfig. FETCH_MAX_BYTES_CONFIG , 50 * 1024 * 1024 ); Размер моего сообщения составляет от 1,6 КБ до 2 КБ.