Выдача результатов окна сеанса каждые X минут

#apache-flink

#apache-flink

Вопрос:

Я внедрил процессор Flink, который объединяет события в сеансы, а затем записывает их в приемник. Теперь я хотел бы расширить его, чтобы я мог получать количество одновременных сеансов каждые пять минут.

События, поступающие в мою систему, отображаются в форме:

 {
  "SessionId": "UniqueUUID",
  "Customer": "CustomerA",
  "EventType": "EventTypeA",
  [ ... ]
}
  

И один сеанс обычно содержит несколько событий разных типов событий. Затем я объединяю события в сеансы, выполняя следующие действия в Flink.

 DataStream<Session> sessions = events
                .keyBy((KeySelector<HashMap, String>) event -> (String) event.get(Field.SESSION_ID))
                .window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(5)))
                .trigger(SessionProcessingTimeTrigger.create())
                .aggregate(new SessionAggregator())
  

Каждый сеанс генерируется (с помощью SessionProcessingTimeTrigger) при обработке события с определенным типом события («EventType»:»Session.Завершено»). И, наконец, поток отправляется в приемник и записывается Kafka.

Теперь я хочу написать аналогичный процессор Flink, но вместо того, чтобы отправлять сеанс только после его завершения, я вместо этого хочу отправлять все сеансы каждые 5 минут, чтобы отслеживать, сколько одновременных сеансов у нас есть каждые 5 минут. Так что в некотором смысле я предполагаю, что мне нужно SessionWindow, которое также выдает его содержимое через регулярные промежутки времени без очистки содержимого.

Я в тупике, как это сделать в Flink, и поэтому ищу какую-то помощь.

Комментарии:

1. Я бы попытался удалить trigger функцию и добавить еще одно окно через 5 минут после aggregate функции.

Ответ №1:

Всякий раз, когда вы хотите, чтобы окно Flink выдавало результаты не по умолчанию, вы можете сделать это, внедрив пользовательский триггер. Ваш триггер просто должен срабатывать каждый раз, когда срабатывает 5-минутный таймер, в дополнение к его первоначальной логике. Вы захотите зарегистрировать этот таймер, когда первое событие назначается окну, и снова при каждом срабатывании таймера.

В случае окон сеанса это может быть более сложным из-за способа объединения окон сеанса. Но я считаю, что в случае окон сеанса во время обработки то, что я изложил выше, будет работать.

Комментарии:

1. Привет, я пытаюсь сделать именно это, но проблема возникает при оконном отображении этих отправленных событий, они приходят с исходной меткой времени события, а не с временем отправки. Есть ли обходной путь, которого мне не хватает?