#google-cloud-dataflow #apache-beam
#google-cloud-dataflow #apache-beam
Вопрос:
Я использую Apache beam python SDK и работаю с потоком данных GCP. Как мне применить агрегатные функции к нескольким столбцам на основе одного ключа? Например, набор данных из 10 столбцов, в которых мои данные выглядят как
User_id,product_id,year,quantity,price,...
101,1,2018,10,15,...
101,2,2019,1,10,...
102,1,2019,2,16,...
Для каждого идентификатора пользователя Как мне рассчитать количество приобретенных им различных продуктов, max (количество), min (цена) и т.д..
Я видел примеры wordcount и т.д., Где вы можете применить сумму к значениям в паре (ключ, значение). Что, если я хочу выполнять разные преобразования в разных столбцах, например, sum / mean / count и т.д.
Ответ №1:
Beam использует PCollection
параллельную коллекцию, которую в Python вы можете рассматривать как (обычно) список элементов (обычно кортежей или dicts ).
В вашем случае это может быть список «row», поэтому вы бы
- извлеките ключ строки. Если бы это было так
User_id
, то сопоставьте что-то вроде этого лямбда, например
x -> (x[0], x)
Обратите внимание, что x используется в качестве значения в паре k, v, а также все еще содержит ключ, но это нормально, если вы хотите, вы можете удалить его и перепаковать кортеж значений без него. т. Е. возвращаемый кортеж будет похож на кортеж типа[str, Tuple[int, int, int, float, float]], предполагая, что это правильные типы User_id, product_id,year, quantity, price
- применить окно
-
группировать по ключу (очень важно определить window перед группировкой по ключу и знать, что окно вступает в силу только при группировании по ключу)
-
используйте что-нибудь для извлечения интересующих вас столбцов (значений в кортеже), применения агрегатов и перепаковки для всего, что находится ниже по потоку.
Кажется странным использовать агрегатную функцию для одного значения кортежа, но агрегация будет отображена / применена ко всей группе ключей в окне.
Этот базовый пример можно легко расширить https://github.com/apache/beam/blob/ee96f66e14866f9642e9c67bf2ef231be7e7d55b/sdks/python/apache_beam/examples/wordcount.py#L99
Если вам нужно сделать что-то простое, просто сопоставьте функцию, если вам нужно больше, чем просто, вы можете создать DoFn. Это просто.
например, !предупреждение, непроверенный код написан в процессе передачи!
def multi_agg(element):
(key, row ) = element
return (key, (max(row[3]), min(row[4])))
В этом случае я взял идентификатор пользователя в качестве ключа с предыдущего шага, а также максимальное количество и минимальную цену, затем упаковал его обратно в кортеж из k, v пар. Пара k, v — это кортеж, который является элементом последующей PCollection. Основная причина, по которой вам нужны пары k, v, заключается в том, что такие вещи, как GroupByKey
неявно используют первое значение в качестве ключа для группировки. Весь элемент неявно используется в качестве значения для сопоставления с функцией. Эти две вещи не очевидны при просмотре примеров Apache Beam.
Вы могли бы либо перепаковать в пару k, v для дальнейшей последующей обработки, либо поместить в структуру, готовую к записи, например, bigquery или bigtable, или, возможно, в файл в корзине облачного хранилища. В любом случае рекомендуется использовать подсказки по типу.
Комментарии:
1. строка [3] не будет содержать список значений количества. max (строка [3]) — неправильный способ вычисления максимального количества. группировка этих трех элементов (101,(1,2018,10,15)) , (101,(2,2019,1,10)) и (102,(1,2019,2,16)) приводит к [ (101,[ (1,2018, xx,15), (2,2019,xx,10) ]), (102, [(1,2019, xx,16)]) ] где xx — значения величины 10,1,2