Конвейер Beam — разница по сравнению с совокупностью последовательных окон с фиксированным временем

#apache-beam

#apache-beam

Вопрос:

Я застрял на реализации простого конвейера. Я сгруппировал свои данные в пределах фиксированного временного интервала в один час для вычисления различной агрегации. Теперь мне нужно вычислить общую энергию, потребляемую счетчиком. (См. Пример ниже)

Например, энергия, потребляемая счетчиком «А» в период с 14:00 до 15:00, представляет собой разницу между показаниями энергии в 15:00 и показаниями энергии в 14:00.

(800 МВтч — 600 МВтч) (900 МВтч — 800 МВтч) (1300 МВтч — 900 МВтч) (1600 МВтч — 1300 МВтч) = 1000 МВтч .

 Meter Time Energy, mWh 
A 2019-06-01 14:00 600 
A 2019-06-01 14:15 800 
A 2019-06-01 14:30 900 
A 2019-06-01 14:45 1300 
A 2019-06-01 15:00 1600 
  

Есть идеи, как я мог бы реализовать что-то подобное? Я использую python SDK.

Спасибо за вашу помощь,

Ответ №1:

Вы думали об использовании SlidingWindow продолжительностью 1 час и DoFn с сохранением состояния (обратите внимание, что состояния указаны для каждого ключа и для каждого окна)? Например:

 class Consumption(DoFn):
  first_gauge = ReadModifyWriteStateSpec('first', TupleCoder([VarIntCoder(), VarIntCoder()]))
  
  def process(self, element, initial=DoFn.StateParam(first_gauge)):
    initial_gauge = initial.read()
    if not initial_gauge:
      initial.write((element.timestamp, element.energy))
    elif element.timestamp - initial_gauge[0] == 3600:
      yield element.energy - initial_gauge[1]

gauge_events | 'window' >> WindowInto(SlidingWindow(3600, #or larger
                                                    900))
             | 'Calculate Consumption' >> beam.ParDo(Consumption())