Счетчик переходов с меткой времени

#apache-flink #flink-streaming

#апач-флинк #мерцающий поток #apache-сбой #мгновенная передача

Вопрос:

Я читал пример Flink CountWithTimestamp, и ниже приведен фрагмент кода из примера:

   @Override
    public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // retrieve the current count
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // update the state's count
        current.count  ;

        // set the state's timestamp to the record's assigned event time timestamp
        current.lastModified = ctx.timestamp();

        // write the state back
        state.update(current);

        // schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.lastModified   60000);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
            throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified   60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
}
  

Мой вопрос заключается в том, что если я удалю параметр if timestamp == result.lastModified 60000 (сбор stmt не затронут) в OnTimer и вместо этого заменю его другим параметром if if(ctx.timestamp < current.lastModified 60000) { deleteEventTimeTimer(current.lastModified 60000)} в начале processElement, будет ли семантика программы такой же? есть ли предпочтение одной версии перед другой в случае одинаковой семантики?

Ответ №1:

Вы правы, полагая, что реализация, которая удаляет таймер, имеет ту же семантику. И на самом деле я недавно изменил пример, используемый в наших учебных материалах, чтобы сделать именно это, поскольку я предпочитаю этот подход. Причина, по которой я нахожу это предпочтительным, заключается в том, что тогда вся сложная бизнес-логика находится в одном месте (в processElement ), и всякий раз, когда onTimer вызывается, вы точно знаете, что делать, без вопросов. Кроме того, он более производителен, поскольку для контрольной точки и, в конечном итоге, запуска требуется меньше таймеров.

Этот пример был написан для документов еще до того, как таймеры могли быть удалены, и не обновлялся.

Вы можете найти переработанный пример, который я упоминал на этих слайдах —https://training.ververica.com/decks/process-function / — как только вы пройдете страницу регистрации.

Черт возьми, я также недавно переработал эталонное решение для соответствующего учебного упражнения в том же духе:https://github.com/apache/flink-training/tree/master/long-ride-alerts.