Flink: SessionWindowTimeGapExtractor — динамическое вычисление разрыва с использованием плотности данных

#apache-flink #flink-streaming

#apache-flink #flink-потоковая передача

Вопрос:

У меня есть сообщение, поступающее из Kafka в flink, и я хотел бы создать EventTimeSessionWindows.withDynamicGap(), которое со временем адаптируется с учетом плотности данных. Для этого я должен создать расширенное сообщение, содержащее мое «Событие» «разрыв», которое я должен вычислять динамически.

Затем расширенное сообщение будет выглядеть следующим образом: Tuple2<Событие, Long>> где Event: — это pojo, содержащий CSV-файл из kafka [tom, 53, 1.70, 18282822, …], а Long: — параметр разрыва в миллисекундах [129293838]

В настоящее время эта часть моего кода:

  DataStream<Tuple2<Event, Long>> enriched = stream 
                    .keyBy((Event ride) -> ride.CorrID)        
                    .map(new StatefulSessionCalculator());
  

Где StatefulSessionCalculator() обогащает сообщение, создающее описанный выше Tuple2.

После этого я должен удалить вычисленный разрыв, используя что-то вроде этого:

 DataStream<Tuple2<Event, Long>> result = enriched
                 .keyBy((...) -> ride.CorrID)
                 .window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows())
  

Моя функция DynamicSessionWindows() должна выполнять обратную передачу для flink the long, но я не понимаю как. Это был бы просто класс, который расширяет SessionWindowTimeGapExtractor<Tuple2<MyEvent, Long>> и возвращает разрыв из метода extract() .

У меня есть теория, но мне нужен пример того, как это сделать.

Если кто-нибудь может помочь мне с этим, написав некоторый код, это было бы очень оценено.

Спасибо

Ответ №1:

Поехали, я нашел, как это сделать. Это был простой вопрос, но, будучи новичком в JAVA, и FLINK заставил меня немного напрячься. Я также создал keySelector

  WindowedStream<Tuple2<Event, Long>, String, TimeWindow> result = enriched
                .keyBy(new MyKeySelector())
                .window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows()));
  

И мой DynamicSessionWindows () — это:

     public class DynamicSessionWindows implements SessionWindowTimeGapExtractor<Tuple2<Event, Long>> {
    
            @Override
            public long extract(Tuple2<Event, Long> value){
                return value.f1;
            }
    
    }