API потоков Кафки: Избегайте дополнительного хранилища состояний в KTable.mapValues

#apache-kafka #apache-kafka-streams #ktable

Вопрос:

В настоящее время мы используем следующее в нашем приложении Kafka Streams:

 streamsBuilder.table(inputTopic)
              .join(...)
              .mapValues(valueMapper) // <-- this causes another state store
              .groupBy(...)
              .aggregate(...)
              .mapValues(...)

[...]
              .toStream()
              .to(outputTopic)
 

и я только что понял, что значения карты после объединения создают дополнительное хранилище состояний.

Если вычисление в valueMapper каким-то образом тривиально (например, удалить поле в объекте и т. Д.), Не лучше ли было бы избежать дополнительного хранилища состояний? Нужно ли мне конвертировать в KStream и использовать значения KStream.map, чтобы избежать хранилища состояний, т. Е.

 streamsBuilder.table(inputTopic)
              .join(...)
              .toStream
              .mapValues(valueMapper) // <-- no more additional statestore
              .groupBy(...)
              .aggregate(...)
              .mapValues(...)

[...]
              .toStream()
              .to(outputTopic)
 

или есть лучшая альтернатива применению дополнительного сопоставления после объединения?

Ответ №1:

Почему вы хотите использовать значения карты после шага присоединения? Если возможно обработать эту логику в соединителе значений соединения.

 streamsBuilder.table(inputTopic)
              .join( anotherTable, (a ,b) ->  c )  <--- Here you can perform any mapping process
              .toStream
              .groupBy(...)
              .aggregate(...)
              .mapValues(...)

[...]
              .toStream()
              .to(outputTopic)
 

Комментарии:

1. Спасибо за ваш ответ и предложение выполнить сопоставление в ValueJoiner. Это, вероятно, правильный и наиболее легкий подход. ИМХО, нелогично делать это в функции соединения DSL Кафки. Правильно ли я понимаю, что при каждом вызове метода в таблице K создается новое хранилище состояний, в котором хранятся результаты?

2. Да, вы правы, согласно документации, каждая функция KTable, возвращающая KTable, создает новую KTable, это связано с тем, что API потоков Кафки должен знать, каков последний статус для данного ключа. KTable filteredTable = StreamsBuilder.таблица().фильтр ( ….); filteredTable. toStream() filteredTable.groupBy(). // Кака должен знать, каков статус отфильтрованной таблицы. Даже проверка описания каждого метода всегда говорит о создании новой таблицы K kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/…