#apache-kafka #kafka-consumer-api #apache-kafka-streams
#apache-kafka #kafka-consumer-api #apache-kafka-streams
Вопрос:
Я использую kafka stream для некоторого приложения.
Поток потока выглядит следующим образом
kafkaProducer---->StreamerConsumer1->finalCosumer
У меня есть производитель, который очень быстро записывает данные, и мой StreamConsumer сопоставит каждый поток с каким-либо процессом и перенаправит поток в другую тему.
в моей карте StreamCosumer я добавил свою собственную функцию сопоставления, которая фактически пытается сохранить соответствующие данные, как показано ниже
public void checkRecord(T1 key, T2 value) {
switch(T1.toString()){
case "key1":
//Get relavant fileds from value and perisit in db
break;
case "key2":
//Get relavant fileds from value and perisit in db
break;
}
}
KStream<String, KafkaStatusRecordWrapper> pDStream[] = myStream.map(this::checkRecord).branch((key, value)-> value.isSuccess(),(key, value)-> !value.isSuccess());
pDStream[0].mapValues(value -> transformer(value)).to("other_topic",Produced.with(stringSerde, stringSerde));
Теперь моя функция-потребитель записи checkRecord является однопоточной, и для возврата требуется почти 300 мс (из-за некоторой бизнес-логики и сохранения базы данных, чего я не могу избежать).
Я не могу увеличить количество разделов, поскольку в нашей инфраструктуре было некоторое ограничение, а также из-за ограничений ниже
More Partitions Requires More Open File Handles
More Partitions May Increase Unavailability
More Partitions May Increase End-to-end Latency
итак, я планирую написать многопоточный поток-потребитель.
Но меня беспокоят следующие моменты.
- Мне нужно обработать запись только один раз
- Передача другому потоку вызовет проблемы с управлением смещением.
Итак, как увеличить пропускную способность?
У меня достаточно ресурсов на моем потребителе, используется только 40% его ресурсов.
Ответ №1:
Вы можете настроить конфигурацию потока num.stream.threads
, чтобы настроить количество потоков. Максимальное значение может быть максимальным количеством разделов. Это помогает увеличить параллелизм экземпляра приложения.
Допустим, если в вашей теме есть 4 раздела, вы можете установить следующее:
properties.set("num.stream.threads",4);
Комментарии:
1. По какой-то причине я не могу увеличить раздел более чем на 10. Мне нужно иметь некоторую логику, основанную на написании нескольких потоков для каждой потоковой задачи .. но я не знаю, как управлять фиксацией offest в нескольких потоках для каждой потоковой задачи