Как мне применить агрегатные функции к нескольким столбцам на основе одного ключа в Apache beam?

#google-cloud-dataflow #apache-beam

#google-cloud-dataflow #apache-beam

Вопрос:

Я использую Apache beam python SDK и работаю с потоком данных GCP. Как мне применить агрегатные функции к нескольким столбцам на основе одного ключа? Например, набор данных из 10 столбцов, в которых мои данные выглядят как
User_id,product_id,year,quantity,price,...
101,1,2018,10,15,...
101,2,2019,1,10,...
102,1,2019,2,16,...

Для каждого идентификатора пользователя Как мне рассчитать количество приобретенных им различных продуктов, max (количество), min (цена) и т.д..

Я видел примеры wordcount и т.д., Где вы можете применить сумму к значениям в паре (ключ, значение). Что, если я хочу выполнять разные преобразования в разных столбцах, например, sum / mean / count и т.д.

Ответ №1:

Beam использует PCollection параллельную коллекцию, которую в Python вы можете рассматривать как (обычно) список элементов (обычно кортежей или dicts ).

В вашем случае это может быть список «row», поэтому вы бы

  1. извлеките ключ строки. Если бы это было так User_id , то сопоставьте что-то вроде этого лямбда, например

x -> (x[0], x)

Обратите внимание, что x используется в качестве значения в паре k, v, а также все еще содержит ключ, но это нормально, если вы хотите, вы можете удалить его и перепаковать кортеж значений без него. т. Е. возвращаемый кортеж будет похож на кортеж типа[str, Tuple[int, int, int, float, float]], предполагая, что это правильные типы User_id, product_id,year, quantity, price

  1. применить окно
  2. группировать по ключу (очень важно определить window перед группировкой по ключу и знать, что окно вступает в силу только при группировании по ключу)

  3. используйте что-нибудь для извлечения интересующих вас столбцов (значений в кортеже), применения агрегатов и перепаковки для всего, что находится ниже по потоку.

Кажется странным использовать агрегатную функцию для одного значения кортежа, но агрегация будет отображена / применена ко всей группе ключей в окне.

Этот базовый пример можно легко расширить https://github.com/apache/beam/blob/ee96f66e14866f9642e9c67bf2ef231be7e7d55b/sdks/python/apache_beam/examples/wordcount.py#L99

Если вам нужно сделать что-то простое, просто сопоставьте функцию, если вам нужно больше, чем просто, вы можете создать DoFn. Это просто.

например, !предупреждение, непроверенный код написан в процессе передачи!

 def multi_agg(element):
    (key, row ) = element
    return (key, (max(row[3]), min(row[4])))
  

В этом случае я взял идентификатор пользователя в качестве ключа с предыдущего шага, а также максимальное количество и минимальную цену, затем упаковал его обратно в кортеж из k, v пар. Пара k, v — это кортеж, который является элементом последующей PCollection. Основная причина, по которой вам нужны пары k, v, заключается в том, что такие вещи, как GroupByKey неявно используют первое значение в качестве ключа для группировки. Весь элемент неявно используется в качестве значения для сопоставления с функцией. Эти две вещи не очевидны при просмотре примеров Apache Beam.

Вы могли бы либо перепаковать в пару k, v для дальнейшей последующей обработки, либо поместить в структуру, готовую к записи, например, bigquery или bigtable, или, возможно, в файл в корзине облачного хранилища. В любом случае рекомендуется использовать подсказки по типу.

Комментарии:

1. строка [3] не будет содержать список значений количества. max (строка [3]) — неправильный способ вычисления максимального количества. группировка этих трех элементов (101,(1,2018,10,15)) , (101,(2,2019,1,10)) и (102,(1,2019,2,16)) приводит к [ (101,[ (1,2018, xx,15), (2,2019,xx,10) ]), (102, [(1,2019, xx,16)]) ] где xx — значения величины 10,1,2