#apache-kafka #apache-kafka-streams
#apache-kafka #apache-kafka-streams
Вопрос:
My STORE_TOPIC
— это сжатая тема, которая содержит текущее состояние моих объектов на основе событий, полученных из EVENTS_TOPIC
(приложением Kafka Streams). Основная цель этого STORE_TOPIC
— загрузить как (глобальную) KTable.
EVENTS_TOPIC | STORE_TOPIC
|
VALUE | KEY VALUE
{"entity":"A", "a":1, "b":2} | A {"a":1, "b":2}
{"entity":"B", "c":3} | B {"c":3}
{"entity":"A", "d":4} | A {"a":1, "b":2, "d":4}
{"entity":"A", "a":0, "e":5} | A {"a":0, "b":2, "d":4, "e":5}
Новый потребитель должен получать уведомления, когда изменения происходят в определенном подмножестве атрибутов.
В качестве примера, если бы мы решили отслеживать изменения в атрибутах «a», «b» и «c», ожидаемый результат был бы :
A {"a":1, "b":2}
B {"c":3}
(Nothing)
A {"a":0, "b":2}
Для этого я написал новое приложение Kafka Streams, которое:
- Обрабатывает «локальное хранилище», которое содержит необходимые атрибуты для каждого ключа
- Загружается
STORE_TOPIC
в виде потока - Сравните входящее сообщение с содержимым «локального хранилища» (благодаря
.transform()
) - Если обнаружено изменение, записываем входящее сообщение (только атрибуты, которые нам нужно отслеживать) в новое
OUT_TOPIC
и добавляем / заменяем запись в «локальном хранилище»
Есть ли более простой и элегантный способ добиться этого?
Можете ли вы подтвердить, что если я загружу STORE_TOPIC
непосредственно как KTable : .table(STORE_TOPIC)
, я смогу получить доступ только к текущему состоянию объекта, и нет способа получить доступ к предыдущей версии объекта и выполнить сравнение с ней?
Комментарии:
1. Вы говорите, что ваше приложение загружает
store_topic
как поток. Вы должны загружатьevents_topic
как поток. Я предполагаю, что это опечатка.2. Мое первое приложение (то, которое загружает
STORE_TOPIC
то, что поступает вEVENTS_TOPIC
) загружаетSTORE_TOPIC
как KTable и передает то, что поступает вEVENTS_TOPIC
, и отправляет новое событие в разделе «сохранить», если это необходимо. Моему второму приложению необходимо обнаружить изменения в том, что находится вSTORE_TOPIC
. И чтобы обнаружить изменения в этомSTORE_TOPIC
, я транслирую его и перестраиваю локальное хранилище, только с подмножеством атрибутов.3. ОК. Понятно. Хотя не уверен, зачем вам для этого нужны два приложения. Я совершенно уверен, что это можно сделать в рамках одной топологии с небольшим количеством кода.
4. ДА. Согласен, это можно легко сделать в том же приложении. (Или мое второе приложение могло бы транслироваться из
EVENTS_TOPIC
с топологией, очень похожей на первое приложение.) НоSTORE_TOPIC
широко используется другими приложениями / потребителями, в то время как новый так называемыйOUT_TOPIC
очень специфичен для одного потребителя и, вероятно, исчезнет в ближайшем будущем. Итак, с точки зрения дизайна, я бы предпочел не объединять 2 функции в одной топологии. (Пусть это будет глупо, просто!) Я имел в виду что-то вроде «Когда состояние изменяется (STORE_TOPIC
), то при необходимости что-то запускайтеOUT_TOPIC
«.