Как отследить, что изменилось между двумя событиями в теме?

#apache-kafka #apache-kafka-streams

#apache-kafka #apache-kafka-streams

Вопрос:

My STORE_TOPIC — это сжатая тема, которая содержит текущее состояние моих объектов на основе событий, полученных из EVENTS_TOPIC (приложением Kafka Streams). Основная цель этого STORE_TOPIC — загрузить как (глобальную) KTable.

 EVENTS_TOPIC                    |    STORE_TOPIC
                                |    
VALUE                           |     KEY      VALUE
{"entity":"A", "a":1, "b":2}    |     A        {"a":1, "b":2}
{"entity":"B", "c":3}           |     B        {"c":3}
{"entity":"A", "d":4}           |     A        {"a":1, "b":2, "d":4}
{"entity":"A", "a":0, "e":5}    |     A        {"a":0, "b":2, "d":4, "e":5}
  

Новый потребитель должен получать уведомления, когда изменения происходят в определенном подмножестве атрибутов.

В качестве примера, если бы мы решили отслеживать изменения в атрибутах «a», «b» и «c», ожидаемый результат был бы :

 A        {"a":1, "b":2}
B        {"c":3}
(Nothing)
A        {"a":0, "b":2}
  

Для этого я написал новое приложение Kafka Streams, которое:

  • Обрабатывает «локальное хранилище», которое содержит необходимые атрибуты для каждого ключа
  • Загружается STORE_TOPIC в виде потока
  • Сравните входящее сообщение с содержимым «локального хранилища» (благодаря .transform() )
  • Если обнаружено изменение, записываем входящее сообщение (только атрибуты, которые нам нужно отслеживать) в новое OUT_TOPIC и добавляем / заменяем запись в «локальном хранилище»

Есть ли более простой и элегантный способ добиться этого?

Можете ли вы подтвердить, что если я загружу STORE_TOPIC непосредственно как KTable : .table(STORE_TOPIC) , я смогу получить доступ только к текущему состоянию объекта, и нет способа получить доступ к предыдущей версии объекта и выполнить сравнение с ней?

Комментарии:

1. Вы говорите, что ваше приложение загружает store_topic как поток. Вы должны загружать events_topic как поток. Я предполагаю, что это опечатка.

2. Мое первое приложение (то, которое загружает STORE_TOPIC то, что поступает в EVENTS_TOPIC ) загружает STORE_TOPIC как KTable и передает то, что поступает в EVENTS_TOPIC , и отправляет новое событие в разделе «сохранить», если это необходимо. Моему второму приложению необходимо обнаружить изменения в том, что находится в STORE_TOPIC . И чтобы обнаружить изменения в этом STORE_TOPIC , я транслирую его и перестраиваю локальное хранилище, только с подмножеством атрибутов.

3. ОК. Понятно. Хотя не уверен, зачем вам для этого нужны два приложения. Я совершенно уверен, что это можно сделать в рамках одной топологии с небольшим количеством кода.

4. ДА. Согласен, это можно легко сделать в том же приложении. (Или мое второе приложение могло бы транслироваться из EVENTS_TOPIC с топологией, очень похожей на первое приложение.) Но STORE_TOPIC широко используется другими приложениями / потребителями, в то время как новый так называемый OUT_TOPIC очень специфичен для одного потребителя и, вероятно, исчезнет в ближайшем будущем. Итак, с точки зрения дизайна, я бы предпочел не объединять 2 функции в одной топологии. (Пусть это будет глупо, просто!) Я имел в виду что-то вроде «Когда состояние изменяется ( STORE_TOPIC ), то при необходимости что-то запускайте OUT_TOPIC «.