#apache-kafka #apache-kafka-streams #ktable
Вопрос:
В настоящее время мы используем следующее в нашем приложении Kafka Streams:
streamsBuilder.table(inputTopic)
.join(...)
.mapValues(valueMapper) // <-- this causes another state store
.groupBy(...)
.aggregate(...)
.mapValues(...)
[...]
.toStream()
.to(outputTopic)
и я только что понял, что значения карты после объединения создают дополнительное хранилище состояний.
Если вычисление в valueMapper каким-то образом тривиально (например, удалить поле в объекте и т. Д.), Не лучше ли было бы избежать дополнительного хранилища состояний? Нужно ли мне конвертировать в KStream и использовать значения KStream.map, чтобы избежать хранилища состояний, т. Е.
streamsBuilder.table(inputTopic)
.join(...)
.toStream
.mapValues(valueMapper) // <-- no more additional statestore
.groupBy(...)
.aggregate(...)
.mapValues(...)
[...]
.toStream()
.to(outputTopic)
или есть лучшая альтернатива применению дополнительного сопоставления после объединения?
Ответ №1:
Почему вы хотите использовать значения карты после шага присоединения? Если возможно обработать эту логику в соединителе значений соединения.
streamsBuilder.table(inputTopic)
.join( anotherTable, (a ,b) -> c ) <--- Here you can perform any mapping process
.toStream
.groupBy(...)
.aggregate(...)
.mapValues(...)
[...]
.toStream()
.to(outputTopic)
Комментарии:
1. Спасибо за ваш ответ и предложение выполнить сопоставление в ValueJoiner. Это, вероятно, правильный и наиболее легкий подход. ИМХО, нелогично делать это в функции соединения DSL Кафки. Правильно ли я понимаю, что при каждом вызове метода в таблице K создается новое хранилище состояний, в котором хранятся результаты?
2. Да, вы правы, согласно документации, каждая функция KTable, возвращающая KTable, создает новую KTable, это связано с тем, что API потоков Кафки должен знать, каков последний статус для данного ключа. KTable filteredTable = StreamsBuilder.таблица().фильтр ( ….); filteredTable. toStream() filteredTable.groupBy(). // Кака должен знать, каков статус отфильтрованной таблицы. Даже проверка описания каждого метода всегда говорит о создании новой таблицы K kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/…