Раздел Spring boot REST service Kafka не может использовать указанное количество сообщений

#spring-boot #rest #apache-kafka #kafka-consumer-api #spring-kafka

#spring-boot #rest #apache-kafka #kafka-consumer-api #spring-kafka

Вопрос:

У меня есть простая служба Spring boot, которая вызывается по требованию и использует указанное количество сообщений из темы. Количество потребляемых сообщений передается в качестве параметра. Служба вызывается каждые 30 минут. Размер каждого сообщения составляет ~ 1,6 кб. Когда я вызываю службу и передаю параметр 3000, я ожидаю, что будет возвращено 3000 тысяч сообщений, но я всегда получаю около 1100 или 1200 сообщений каждый раз. У меня есть одна тема только с одним разделом. Это служба по требованию, поэтому она не использует цикл while и установила время опроса на 30 секунд. Но ответ возвращается в течение 10 секунд, и количество возвращенных записей составляет ~ 1200, хотя MAX_POLL_RECORDS_CONFIG равно 3000, 4000 или 5000. Просто любопытно, существует ли какое-либо ограничение на размер сообщений, даже если время опроса составляет 30 секунд, как я могу добиться большей пропускной способности или приблизиться к пределу. Приведенное ниже выполняется при каждом вызове службы. вот как вызывается службаhttp://example.com/messages?limit=5000

 Properties p = new Properties();
//limit is the value coming in as a query paramter and can be 3000, 4000 or 5000
p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, limit);
p.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG , 15000);
p.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG , 22 * 1024 * 1024);
p.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG , 50 * 1024 * 1024);
p.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG , 500);
p.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG , 50 * 1024 * 1024);

consumer = consumerFactory.createConsumer("my-group-id", null, null, p);
consumer.assign(Collections.singleton(new TopicPartition("test-topic", 0)));


ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofSeconds(30));

// processing the messages somehow I always get ~1200 messages 
.........................................
.........................................
.................................
consumer.commitAsync();

// return list of messages
  

Спасибо

Комментарии:

1. Вы устанавливаете ConsumerConfig.FETCH_MIN_BYTES_CONFIG дважды, и в итоге получается одно и то же значение конфигурации с ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG . Вы уверены, что ваши сообщения занимают 1,6 КБ?

2. Я изменил его, но все тот же эффект не может использовать более 1200 сообщений p.put(ConsumerConfig. FETCH_MAX_BYTES_CONFIG , 50 * 1024 * 1024 ); Размер моего сообщения составляет от 1,6 КБ до 2 КБ.