#apache-kafka-streams
#apache-kafka-streams
Вопрос:
Я застрял в одном варианте использования. Мне нужно рассчитать% от суммы, потраченной на развлечения.
В моем потоке я получаю записи с отраслевыми кодами и потраченной суммой (отраслевые коды основаны на развлечениях и не основаны на развлечениях)
например> коды индустрии развлечений > Количество отраслевых кодов
157 100
257 200
157 300
коды отрасли, не связанной с развлечениями > Количество отраслевых кодов
457 100
657 200
457 300
Итак, мне нужно подсчитать, сколько% было потрачено на развлечения, что должно составлять общую сумму развлечений / (общая сумма развлечений общая сумма, не связанная с развлечениями)
Решение, которое я пытаюсь: создать две Ktables One entertainment- сгруппировать по коду entertainment, т.е. 157 400 (100 300)
257 100
Неинтересная другая KTable 457 400
657 200
Теперь у меня есть две KTables, но как я могу вычислить%? Верен ли этот подход?
Ответ №1:
Я не совсем уверен, что именно вы пробовали до сих пор. Когда вы создаете обе таблицы, в обеих таблицах есть несколько строк (т. Е. по одной на код) или вы уже «объединили» разные коды вместе? Если каждая таблица содержит несколько строк, вам необходимо объединить все строки вместе, установив выдуманный суррогатный ключ (например, целочисленное значение 0 для всех строк):
KStream sumSpendingEntertainment = spendingEntertainment.groupBy((k,v) -> 0)
.aggregate(...);
KStream sumSpendingAll = spendingAll.groupBy((k,v) -> 0)
.aggregate(...);
В итоге вы получите два KTable
с одной строкой; одна содержит общие расходы, а другая — «расходы на развлечения», и обе таблицы KT будут использовать один и тот же выдуманный суррогатный ключ (целое число 0 в нашем примере).
На последнем шаге вы можете объединить обе таблицы, чтобы разделить обе суммы:
sumSpendingEntertainment.join(sumSpendingAll,
(sumEnt, sumAll) -> sumEnt / sumAll); // this is the `ValueJoiner` expressed as lambda