Kafka увеличивает пропускную способность с помощью нескольких разделов и нескольких потоков-потребителей

#apache-kafka #kafka-consumer-api #apache-kafka-streams

#apache-kafka #kafka-consumer-api #apache-kafka-streams

Вопрос:

Я использую kafka stream для некоторого приложения.

Поток потока выглядит следующим образом

 kafkaProducer---->StreamerConsumer1->finalCosumer
 

У меня есть производитель, который очень быстро записывает данные, и мой StreamConsumer сопоставит каждый поток с каким-либо процессом и перенаправит поток в другую тему.

в моей карте StreamCosumer я добавил свою собственную функцию сопоставления, которая фактически пытается сохранить соответствующие данные, как показано ниже

 public void checkRecord(T1 key, T2 value) {
 switch(T1.toString()){
 case "key1":
  //Get relavant fileds from value and perisit in db 
   break;
   case "key2":
     //Get relavant fileds from value and perisit in db 
   break;
 }
}


KStream<String, KafkaStatusRecordWrapper> pDStream[] = myStream.map(this::checkRecord).branch((key, value)-> value.isSuccess(),(key, value)-> !value.isSuccess());

pDStream[0].mapValues(value -> transformer(value)).to("other_topic",Produced.with(stringSerde, stringSerde));   
 

Теперь моя функция-потребитель записи checkRecord является однопоточной, и для возврата требуется почти 300 мс (из-за некоторой бизнес-логики и сохранения базы данных, чего я не могу избежать).

Я не могу увеличить количество разделов, поскольку в нашей инфраструктуре было некоторое ограничение, а также из-за ограничений ниже

 More Partitions Requires More Open File Handles
More Partitions May Increase Unavailability
More Partitions May Increase End-to-end Latency
 

итак, я планирую написать многопоточный поток-потребитель.

Но меня беспокоят следующие моменты.

  1. Мне нужно обработать запись только один раз
  2. Передача другому потоку вызовет проблемы с управлением смещением.

Итак, как увеличить пропускную способность?

У меня достаточно ресурсов на моем потребителе, используется только 40% его ресурсов.

Ответ №1:

Вы можете настроить конфигурацию потока num.stream.threads , чтобы настроить количество потоков. Максимальное значение может быть максимальным количеством разделов. Это помогает увеличить параллелизм экземпляра приложения.

Допустим, если в вашей теме есть 4 раздела, вы можете установить следующее:

 properties.set("num.stream.threads",4);
 

Комментарии:

1. По какой-то причине я не могу увеличить раздел более чем на 10. Мне нужно иметь некоторую логику, основанную на написании нескольких потоков для каждой потоковой задачи .. но я не знаю, как управлять фиксацией offest в нескольких потоках для каждой потоковой задачи