#apache-flink
#apache-flink
Вопрос:
Я внедрил процессор Flink, который объединяет события в сеансы, а затем записывает их в приемник. Теперь я хотел бы расширить его, чтобы я мог получать количество одновременных сеансов каждые пять минут.
События, поступающие в мою систему, отображаются в форме:
{
"SessionId": "UniqueUUID",
"Customer": "CustomerA",
"EventType": "EventTypeA",
[ ... ]
}
И один сеанс обычно содержит несколько событий разных типов событий. Затем я объединяю события в сеансы, выполняя следующие действия в Flink.
DataStream<Session> sessions = events
.keyBy((KeySelector<HashMap, String>) event -> (String) event.get(Field.SESSION_ID))
.window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(5)))
.trigger(SessionProcessingTimeTrigger.create())
.aggregate(new SessionAggregator())
Каждый сеанс генерируется (с помощью SessionProcessingTimeTrigger) при обработке события с определенным типом события («EventType»:»Session.Завершено»). И, наконец, поток отправляется в приемник и записывается Kafka.
Теперь я хочу написать аналогичный процессор Flink, но вместо того, чтобы отправлять сеанс только после его завершения, я вместо этого хочу отправлять все сеансы каждые 5 минут, чтобы отслеживать, сколько одновременных сеансов у нас есть каждые 5 минут. Так что в некотором смысле я предполагаю, что мне нужно SessionWindow, которое также выдает его содержимое через регулярные промежутки времени без очистки содержимого.
Я в тупике, как это сделать в Flink, и поэтому ищу какую-то помощь.
Комментарии:
1. Я бы попытался удалить
trigger
функцию и добавить еще одно окно через 5 минут послеaggregate
функции.
Ответ №1:
Всякий раз, когда вы хотите, чтобы окно Flink выдавало результаты не по умолчанию, вы можете сделать это, внедрив пользовательский триггер. Ваш триггер просто должен срабатывать каждый раз, когда срабатывает 5-минутный таймер, в дополнение к его первоначальной логике. Вы захотите зарегистрировать этот таймер, когда первое событие назначается окну, и снова при каждом срабатывании таймера.
В случае окон сеанса это может быть более сложным из-за способа объединения окон сеанса. Но я считаю, что в случае окон сеанса во время обработки то, что я изложил выше, будет работать.
Комментарии:
1. Привет, я пытаюсь сделать именно это, но проблема возникает при оконном отображении этих отправленных событий, они приходят с исходной меткой времени события, а не с временем отправки. Есть ли обходной путь, которого мне не хватает?