#apache-kafka #apache-kafka-streams #spring-kafka #kafka-producer-api
#apache-kafka #apache-kafka-streams #spring-kafka #kafka-producer-api
Вопрос:
У нас есть вариант использования, в котором мы должны прочитать некоторые сообщения в KStreams, затем преобразовать сообщение и условно перенести его в другую тему.
В нашем примере использования для преобразования объекта мы выполняем нисходящий вызов API. Если вызов API выполнен успешно, затем создайте newTopic1, иначе создайте newTopic2. Как можно достичь того же??
На данный момент мы используем приведенный ниже стиль создания обогащенных (т. Е. Преобразованных объектов) для новой темы Kafka, используя метод to, предоставляемый Streams API.
KStream<String, Customer> transformedStream = sourceKafkaStream
.mapValues(cust -> {
try {
logger.info("Hitting to the downstream to fetch additional information, will take few seconds.");
Thread.sleep(7000);
return RecordEnrichmentAndTransformationBuilder.enrichTheCustomer(cust);
} catch (InterruptedException e) {
e.printStackTrace();
}
return cust;
});
.to('newTopic1', Produced.with(AppSerdes.String(), AppSerdes.serdeForEnrichedCustomer()));
Оцените ответ на это.
Ответ №1:
Используя DSL api, вы используете KStream::filter
или KStream:to(TopicNameExtractor<K, V> topicExtractor, Produced<K, V> produced)
.
Пример кода будет выглядеть так, если оба формата одинаковы:
KStream<String, Customer> transformedStream = sourceKafkaStream
.mapValues(cust -> {
try {
logger.info("Hitting to the downstream to fetch additional information, will take few seconds.");
Thread.sleep(7000);
return RecordEnrichmentAndTransformationBuilder.enrichTheCustomer(cust);
} catch (InterruptedException e) {
e.printStackTrace();
}
return cust;
});
.to((key, value, recordContext) -> topicNameCalculation(key, value), Produced.with(AppSerdes.String(), AppSerdes.serdeForEnrichedCustomer()));
topicNameCalculation(...)
будет на основе ключа и значения выбрана правильная тема.
Как правило, не рекомендуется выполнять внешний вызов в потоках Kafka.
Комментарии:
1. Спасибо @Bartosz за ваш ответ. Не могли бы вы пояснить, почему не рекомендуется выполнять внешние вызовы в потоках Kafka. Я был бы признателен за вашу дизайнерскую философию. Мы активно планируем взимать потоки с помощью внешних вызовов API.
2. @AdityaGoel, есть несколько причин, например: — Если вызов будет длиться «слишком» долго, может произойти перебалансировка — Если вызов не является идемпотентным, бизнес-логика может быть нарушена. Потоки Kafka могут вызывать call несколько раз для одного и того же сообщения (ровно один раз только в Kafka).
3. Спасибо за ваш ответ. Нет ли способа контролировать время ожидания / сердцебиения для потоковой задачи?
4. @AdityaGoel, ты можешь все это контролировать.
5. Является ли это свойство max.poll.interval.ms значение по умолчанию не равно целому числу. MAX_VALUE ? @Bartosz