#apache-beam
#apache-beam
Вопрос:
Я застрял на реализации простого конвейера. Я сгруппировал свои данные в пределах фиксированного временного интервала в один час для вычисления различной агрегации. Теперь мне нужно вычислить общую энергию, потребляемую счетчиком. (См. Пример ниже)
Например, энергия, потребляемая счетчиком «А» в период с 14:00 до 15:00, представляет собой разницу между показаниями энергии в 15:00 и показаниями энергии в 14:00.
(800 МВтч — 600 МВтч) (900 МВтч — 800 МВтч) (1300 МВтч — 900 МВтч) (1600 МВтч — 1300 МВтч) = 1000 МВтч .
Meter Time Energy, mWh
A 2019-06-01 14:00 600
A 2019-06-01 14:15 800
A 2019-06-01 14:30 900
A 2019-06-01 14:45 1300
A 2019-06-01 15:00 1600
Есть идеи, как я мог бы реализовать что-то подобное? Я использую python SDK.
Спасибо за вашу помощь,
Ответ №1:
Вы думали об использовании SlidingWindow продолжительностью 1 час и DoFn с сохранением состояния (обратите внимание, что состояния указаны для каждого ключа и для каждого окна)? Например:
class Consumption(DoFn):
first_gauge = ReadModifyWriteStateSpec('first', TupleCoder([VarIntCoder(), VarIntCoder()]))
def process(self, element, initial=DoFn.StateParam(first_gauge)):
initial_gauge = initial.read()
if not initial_gauge:
initial.write((element.timestamp, element.energy))
elif element.timestamp - initial_gauge[0] == 3600:
yield element.energy - initial_gauge[1]
gauge_events | 'window' >> WindowInto(SlidingWindow(3600, #or larger
900))
| 'Calculate Consumption' >> beam.ParDo(Consumption())