#apache-flink #flink-streaming
#apache-flink #flink-потоковая передача
Вопрос:
У меня есть сообщение, поступающее из Kafka в flink, и я хотел бы создать EventTimeSessionWindows.withDynamicGap(), которое со временем адаптируется с учетом плотности данных. Для этого я должен создать расширенное сообщение, содержащее мое «Событие» «разрыв», которое я должен вычислять динамически.
Затем расширенное сообщение будет выглядеть следующим образом: Tuple2<Событие, Long>> где Event: — это pojo, содержащий CSV-файл из kafka [tom, 53, 1.70, 18282822, …], а Long: — параметр разрыва в миллисекундах [129293838]
В настоящее время эта часть моего кода:
DataStream<Tuple2<Event, Long>> enriched = stream
.keyBy((Event ride) -> ride.CorrID)
.map(new StatefulSessionCalculator());
Где StatefulSessionCalculator()
обогащает сообщение, создающее описанный выше Tuple2.
После этого я должен удалить вычисленный разрыв, используя что-то вроде этого:
DataStream<Tuple2<Event, Long>> result = enriched
.keyBy((...) -> ride.CorrID)
.window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows())
Моя функция DynamicSessionWindows() должна выполнять обратную передачу для flink the long, но я не понимаю как. Это был бы просто класс, который расширяет SessionWindowTimeGapExtractor<Tuple2<MyEvent, Long>> и возвращает разрыв из метода extract() .
У меня есть теория, но мне нужен пример того, как это сделать.
Если кто-нибудь может помочь мне с этим, написав некоторый код, это было бы очень оценено.
Спасибо
Ответ №1:
Поехали, я нашел, как это сделать. Это был простой вопрос, но, будучи новичком в JAVA, и FLINK заставил меня немного напрячься. Я также создал keySelector
WindowedStream<Tuple2<Event, Long>, String, TimeWindow> result = enriched
.keyBy(new MyKeySelector())
.window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows()));
И мой DynamicSessionWindows () — это:
public class DynamicSessionWindows implements SessionWindowTimeGapExtractor<Tuple2<Event, Long>> {
@Override
public long extract(Tuple2<Event, Long> value){
return value.f1;
}
}