Вариант использования агрегации — группировка по двум разным потокам kafka и вычисление среднего значения

#apache-kafka-streams

#apache-kafka-streams

Вопрос:

Я застрял в одном варианте использования. Мне нужно рассчитать% от суммы, потраченной на развлечения.

В моем потоке я получаю записи с отраслевыми кодами и потраченной суммой (отраслевые коды основаны на развлечениях и не основаны на развлечениях)

например> коды индустрии развлечений > Количество отраслевых кодов

                                                          157       100

                                                         257        200

                                                         157         300
  

коды отрасли, не связанной с развлечениями > Количество отраслевых кодов

                                                          457       100

                                                          657       200

                                                          457       300
  

Итак, мне нужно подсчитать, сколько% было потрачено на развлечения, что должно составлять общую сумму развлечений / (общая сумма развлечений общая сумма, не связанная с развлечениями)

Решение, которое я пытаюсь: создать две Ktables One entertainment- сгруппировать по коду entertainment, т.е. 157 400 (100 300)

                                    257    100
  

Неинтересная другая KTable 457 400

                                  657      200
  

Теперь у меня есть две KTables, но как я могу вычислить%? Верен ли этот подход?

Ответ №1:

Я не совсем уверен, что именно вы пробовали до сих пор. Когда вы создаете обе таблицы, в обеих таблицах есть несколько строк (т. Е. по одной на код) или вы уже «объединили» разные коды вместе? Если каждая таблица содержит несколько строк, вам необходимо объединить все строки вместе, установив выдуманный суррогатный ключ (например, целочисленное значение 0 для всех строк):

 KStream sumSpendingEntertainment = spendingEntertainment.groupBy((k,v) -> 0)
                                                        .aggregate(...);
KStream sumSpendingAll = spendingAll.groupBy((k,v) -> 0)
                                    .aggregate(...);
  

В итоге вы получите два KTable с одной строкой; одна содержит общие расходы, а другая — «расходы на развлечения», и обе таблицы KT будут использовать один и тот же выдуманный суррогатный ключ (целое число 0 в нашем примере).

На последнем шаге вы можете объединить обе таблицы, чтобы разделить обе суммы:

 sumSpendingEntertainment.join(sumSpendingAll,
                              (sumEnt, sumAll) -> sumEnt / sumAll); // this is the `ValueJoiner` expressed as lambda