Apache Flink: как закрыть окно с фиксированным размером, если данные не получены в течение определенного периода времени

#java #apache-kafka #apache-flink

#java #apache-kafka #apache-flink

Вопрос:

Я пытаюсь рассчитать скорость входящих событий в минуту из темы Kafka на основе времени события. Для этого я использую TumblingEventTimeWindows 1 минуту. Фрагмент кода приведен ниже. Я заметил, что если я не получаю никаких событий для определенного окна, например, с 2.34 по 2.35, то предыдущее окно с 2.33 по 2.34 не закрывается. Я понимаю риск потери данных для окна с 2.33 по 2.34 (может произойти из-за сбоя системы, увеличения задержки Kafka и т. Д.), Но я не могу ждать бесконечно. Мне нужно закрыть это окно после ожидания определенного периода времени, и последующие окна могут продолжаться после восстановления системы. Как я могу этого добиться?

Я пытаюсь использовать следующий код, который дает количество событий в минуту для непрерывного потока событий.

     StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(
            3,
            org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)
    ));
    executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    executionEnvironment.setParallelism(1);
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "AllEventCountConsumerGroup");
    FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("event_input_topic", new SimpleStringSchema(), properties);
    DataStreamSource<String> kafkaDataStream = environment.addSource(kafkaConsumer);
    kafkaDataStream
            .flatMap(new EventFlatter())
            .filter(Objects::nonNull)
            .assignTimestampsAndWatermarks(WatermarkStrategy
                    .<Entity>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                    .withTimestampAssigner((SerializableTimestampAssigner<Entity>) (element, recordTimestamp) -> element.getTimestamp()))
            .keyBy((KeySelector<Entity, String>) Entity::getTenant)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .allowedLateness(Time.seconds(10))
            .aggregate(new EventCountAggregator())
            .addSink(eventRateProducer);
 

Ответ №1:

Учитывая forBoundedOutOfOrderness(Duration.ofSeconds(2)) , что окно для интервала [t, t 1 minute) не закроется до timestamp >= t 1 minute 2 seconds тех пор, пока не будет обработано событие с.

Если у вашего входного потока могут быть длительные периоды простоя, и вы не можете дождаться возобновления потока, то вам придется либо искусственно увеличить водяной знак после обнаружения простоя, либо использовать пользовательское окно Trigger , в котором используется комбинация таймеров времени события и времени обработки.

Вот пример генератора водяных знаков, который обнаруживает бездействие, но он не был обновлен до нового WatermarkStrategy API.

Комментарии:

1. Насколько я понимаю, является ли новое свойство idlenessDetectionDuration более удобным для этого случая использования или я что-то упускаю?

2. Новая withIdleness опция будет обрабатывать случай, когда раздел простаивает, путем организации игнорирования простоя этого раздела. Таким образом, все разделы, которые все еще имеют входные данные, будут управлять водяными знаками. Но это не приведет к увеличению водяного знака в случае, когда все разделы простаивают.