Окно сеанса Apache Beam ничего не возвращает

#python #apache-beam

#python #apache-beam

Вопрос:

В настоящее время я пытаюсь рассчитать общую сумму, оставшуюся в корзине покупок электронной коммерции. Для каждого случая у меня есть идентификатор сеанса и цена продукта. Затем я создал окно сеанса с задержкой в 15 секунд и, наконец, комбинированный ключ, суммирующий все это. Однако после создания окна вывод не создается (использование beam.Map(print) ничего не печатает). Код выполняется без каких-либо ошибок, и я не знаю, что делать!

Вот пример моих данных:

 ('2b88b00a-892a-4639-bce4-1ea17a7d6221', 2.54)
('324c685c-4281-48d5-8783-7a7416f7d2b3', 3.97)
('c99a50e8-2fac-4c4d-89ec-41c05f114554', 1.27)
('324c685c-4281-48d5-8783-7a7416f7d2b3', 4.29)
('c99a50e8-2fac-4c4d-89ec-41c05f114554', 1.27)
  

Вот мой конвейер:

 pipe = beam.Pipeline(options=options)

def encode_byte_string(element):
   print(element)
   element = str(element)
   return element.encode('utf-8')

ecommerce_data = (
        pipe
        | "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=input_subscription)
        | "Decode utf-8" >> beam.Map(lambda row: row.decode('utf-8'))
        | "Window" >> beam.WindowInto(beam.window.Sessions(15))
        | "Combine per key - sum" >> beam.CombinePerKey(sum)
        | beam.Map(print)
        | "Encode to byte string" >> beam.Map(encode_byte_string)
        | "Write to output" >> beam.io.WriteToPubSub(topic=output)
    )
  

Ответ №1:

Предполагая, что вы предоставили общий доступ к полному коду, это потому, что нет ключа (или он установлен неправильно). Обратите внимание, что row.decode('utf-8') возвращает тип строки. Чтобы исправить, преобразуйте строку в кортеж следующим образом:

 ecommerce_data = (
        pipe
        | "Read from PubSub" >> beam.io.ReadFromPubSub(subscription=input_subscription)
        | "Decode utf-8" >> beam.Map(lambda row: tuple(row.decode('utf-8')))
        | "Window" >> beam.WindowInto(beam.window.Sessions(15))
        | "Combine per key - sum" >> beam.CombinePerKey(sum)
        | beam.Map(print)
        | "Encode to byte string" >> beam.Map(encode_byte_string)
        | "Write to output" >> beam.io.WriteToPubSub(topic=output)
    )
  

(Если вы получаете данные Pub / Sub в виде закодированного JSON, вы можете добавить такой ключ):

         | "Decode utf-8" >> beam.Map(lambda row: json.loads(row.decode('utf-8')))
        | "Add key" >> beam.Map(lambda row: (row["id"], row["seconds"]))
        | "Window" >> beam.WindowInto(beam.window.Sessions(15))