Корректирующий агрегированный вид в потоковых данных

#stream #streaming #apache-kafka-streams #ksqldb

#поток #потоковая передача #apache-kafka-streams #ksqldb

Вопрос:

Этот вопрос связан с агрегированными представлениями KSQL или технологией потоковой обработки. Когда мы получаем события, мы применяем предложение group by для их агрегирования. Теперь приходит событие, которое является исправлением к некоторому предыдущему событию. Это оставит мое агрегированное представление в несогласованном состоянии. Это не случай поступления события не по порядку. Например. У меня есть событие (e), которое состоит из атрибута идентификатора объекта (t), категории (c) и количества (q). Следующие события

 1) e1 —> t1, c1, q1
2) e2 -> t2, c2, q2
3) e3 -> t3, c1, q3
4) e4 -> t1, c1, q4 correction to e1
5) e5 -> t5, c2, q5
  

Моим агрегированным представлением будет суммирование количественной группы по категориям

 c1 -> q1   q3   q4

c2 -> q2   q5.
  

c1 теперь в несогласованном состоянии. c1 должен быть только q3 q4.

Существуют ли какие-либо способы решения таких проблем. Я знаю, что могу сохранить все события в некотором кэше, а затем создать агрегированное представление, но это данные реального времени, поэтому все мои представления необходимо обновлять каждый раз.

Комментарии:

1. Хорошо, как вы отличаете, является ли q4 исправлением для q1 или q3? Есть ли какой-либо конкретный атрибут, с которым вы сравниваете?

2. Идентификатор базового объекта одинаков в случае e1 и e4. Итак, e4 является последним исправленным обновлением для этого идентификатора объекта

Ответ №1:

В потоках Kafka вы могли бы выполнить KStream#groupBy()#aggregate()#mapValue() . Aggregate() не будет вычислять агрегацию, но вернет сопоставление id-> value. В mapValue() вы вычисляете агрегацию по всем значениям карты. Таким образом, при поступлении обновлений if заменит старое значение на новое значение в Map, и mapValue() правильно пересчитает результат агрегирования.

Комментарии:

1. Не могли бы вы подробнее остановиться на этом. Вы имеете в виду пример вычитания из Stream DSL, где в категории Alice изменяется с E на A. В моем требовании количество изменяется. Я относительно новичок в этом, можете ли вы указать на какой-нибудь пример кода, который я могу посмотреть. Спасибо

2. Это немного отличается от случая, о котором вы упомянули, потому что здесь мы используем KStream в качестве входных данных, в то время как пример Alice основан на входной KTable. Возможно, вы могли бы использовать входную KTable, хотя (не уверен). Я не знаю ни одного примера кода для этого. 🙁

3. Я загрузил свои данные в KTable как ключевое значение (t1, (c1, q1)), (t2, (c2, q2)). Создан новый поток со значением ключа как (c1,q1),(c2, q2). Применил группировку по ключу к категории, а затем использовал агрегатную функцию. Но функция агрегирования выдает агрегированное значение по категории, как при обновлении базовой KTable. Моя потоковая KTable#KStream#groupby()#aggregate(). Он возвращает мне ключ как категорию и сумму всех величин как значение. Я что-то пропустил, когда вы сказали, что он не будет вычислять агрегацию.

4. Каковы ваши ключ и значение? ключ = t1? В вашей базовой таблице? Кроме того, вы можете обойтись KTable#groupBy без предварительного преобразования его в поток (если вы сначала перейдете к потоку, вы потеряете поведение обновления).