#triggers #window #apache-flink #flink-streaming #early-return
#триггеры #окно #apache-flink #flink-streaming #ранний возврат
Вопрос:
Я работаю с кодом, который использует повторяющееся окно продолжительностью в один день, и хотел бы отправлять ранние результаты в другой поток данных ежечасно. Я понимаю, что триггеры — это способ пойти сюда, но на самом деле не понимаю, как это будет работать.
Текущий код выглядит следующим образом:
myStream
.keyBy(..)
.window(TumblingEventTimeWindows.of(Time.days(1)))
.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())
В моем понимании, я должен зарегистрировать триггер, а затем в его методе onEventTime получить доступ к TriggerContext, и я могу отправлять данные на помеченный вывод оттуда. Но как мне получить оттуда текущее состояние MyAggregateFunction? Или мне нужно было бы выполнить мои собственные вычисления здесь внутри onEventTime()?
Кроме того, в документации указано, что "By specifying a trigger using trigger() you are overwriting the default trigger of a WindowAssigner."
. Будет ли мое однодневное окно по-прежнему срабатывать правильно, или мне нужно запустить его как-то по-другому?
Другой способ сделать это — создать два разных оператора — один, который запускается на 1 час, а другой, который запускается на 1 день. Будут ли триггеры предпочтительным подходом к этому?
Ответ №1:
Вместо использования пользовательского Trigger
интерфейса было бы проще иметь два уровня управления окнами, где почасовые результаты дополнительно объединяются в ежедневные результаты. Что-то вроде этого:
hourlyResults = myStream
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())
dailyResults = hourlyResults
.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.days(1)))
.aggregate(new MyAggregateFunction(), new MyProcessWindowFunction())
hourlyResults.addSink(...)
dailyResults.addSink(...)
Обратите внимание, что результат окна не является a KeyedStream
, поэтому вам нужно будет снова использовать keyBy, если вы не можете организовать использование reinterpretAsKeyedStream
(документы).
Комментарии:
1. Это определенно довольно просто реализовать, но не приведет ли это к большой дополнительной обработке при повторном вводе
hourlyResults
потока?2. Повторный ввод является неудачным, и его можно избежать при тщательном использовании
reinterpretAsKeyedStream
( docs ).3. Что, если мы создадим
keyedStream = myStream.keyBy(...)
, а затем применим к нему разные окнаkeyedStream
для полученияhourlyResults
иdailyResults
потоков?4. Да, это сработает, хотя и с некоторыми дублирующими усилиями.
5. Будет ли это состояние окна, которое реплицируется (на основе каждого ключа), или это хуже, чем это?
Ответ №2:
Обычно, когда я добираюсь до более сложного поведения, подобного этому, я использую KeyedProcessFunction
. Вы можете агрегировать (и сохранять в состоянии) почасовые и ежедневные результаты, устанавливать таймеры по мере необходимости и использовать дополнительный вывод для почасовых результатов по сравнению с обычным выводом для ежедневных результатов.
Ответ №3:
Здесь довольно много вопросов. Я постараюсь спросить их всех. Прежде всего, если вы укажете свой собственный триггер, используя trigger()
это, вы собираетесь эффективно переопределить триггер по умолчанию, и, следовательно, окно может работать не так, как по умолчанию. Итак, если вы, например, создаете окно изменения времени события за 1 день, но переопределяете триггер, чтобы он срабатывал для каждого 20-го элемента, он никогда не срабатывает в зависимости от времени события.
Теперь, после срабатывания вашего пользовательского триггера, вывод from MyAggregateFunction
будет передан в MyProcessWindowFunction
, поэтому он будет работать так же, как и для триггера по умолчанию, вам не нужно обращаться к MyAggregateFunction
from внутри триггера.
Наконец, хотя технически возможно реализовать триггер для запуска частичных результатов каждый час, мое личное мнение таково, что вам, вероятно, следует использовать два отдельных окна. Хотя это решение может привести к немного большим накладным расходам и может привести к большему состоянию, оно должно быть намного понятнее, проще в реализации и, наконец, намного более устойчивым к ошибкам.