#python #apache-beam
#python #apache-beam
Вопрос:
В настоящее время я пытаюсь рассчитать общую сумму, оставшуюся в корзине покупок электронной коммерции. Для каждого случая у меня есть идентификатор сеанса и цена продукта. Затем я создал окно сеанса с задержкой в 15 секунд и, наконец, комбинированный ключ, суммирующий все это. Однако после создания окна вывод не создается (использование beam.Map(print)
ничего не печатает). Код выполняется без каких-либо ошибок, и я не знаю, что делать!
Вот пример моих данных:
('2b88b00a-892a-4639-bce4-1ea17a7d6221', 2.54)
('324c685c-4281-48d5-8783-7a7416f7d2b3', 3.97)
('c99a50e8-2fac-4c4d-89ec-41c05f114554', 1.27)
('324c685c-4281-48d5-8783-7a7416f7d2b3', 4.29)
('c99a50e8-2fac-4c4d-89ec-41c05f114554', 1.27)
Вот мой конвейер:
pipe = beam.Pipeline(options=options)
def encode_byte_string(element):
print(element)
element = str(element)
return element.encode('utf-8')
ecommerce_data = (
pipe
| "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=input_subscription)
| "Decode utf-8" >> beam.Map(lambda row: row.decode('utf-8'))
| "Window" >> beam.WindowInto(beam.window.Sessions(15))
| "Combine per key - sum" >> beam.CombinePerKey(sum)
| beam.Map(print)
| "Encode to byte string" >> beam.Map(encode_byte_string)
| "Write to output" >> beam.io.WriteToPubSub(topic=output)
)
Ответ №1:
Предполагая, что вы предоставили общий доступ к полному коду, это потому, что нет ключа (или он установлен неправильно). Обратите внимание, что row.decode('utf-8')
возвращает тип строки. Чтобы исправить, преобразуйте строку в кортеж следующим образом:
ecommerce_data = (
pipe
| "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=input_subscription)
| "Decode utf-8" >> beam.Map(lambda row: tuple(row.decode('utf-8')))
| "Window" >> beam.WindowInto(beam.window.Sessions(15))
| "Combine per key - sum" >> beam.CombinePerKey(sum)
| beam.Map(print)
| "Encode to byte string" >> beam.Map(encode_byte_string)
| "Write to output" >> beam.io.WriteToPubSub(topic=output)
)
(Если вы получаете данные Pub / Sub в виде закодированного JSON, вы можете добавить такой ключ):
| "Decode utf-8" >> beam.Map(lambda row: json.loads(row.decode('utf-8')))
| "Add key" >> beam.Map(lambda row: (row["id"], row["seconds"]))
| "Window" >> beam.WindowInto(beam.window.Sessions(15))