#apache-flink #flink-streaming
#апач-флинк #мерцающий поток #apache-сбой #мгновенная передача
Вопрос:
Я читал пример Flink CountWithTimestamp, и ниже приведен фрагмент кода из примера:
@Override
public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// retrieve the current count
CountWithTimestamp current = state.value();
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// update the state's count
current.count ;
// set the state's timestamp to the record's assigned event time timestamp
current.lastModified = ctx.timestamp();
// write the state back
state.update(current);
// schedule the next timer 60 seconds from the current event time
ctx.timerService().registerEventTimeTimer(current.lastModified 60000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// get the state for the key that scheduled the timer
CountWithTimestamp result = state.value();
// check if this is an outdated timer or the latest timer
if (timestamp == result.lastModified 60000) {
// emit the state on timeout
out.collect(new Tuple2<String, Long>(result.key, result.count));
}
}
}
Мой вопрос заключается в том, что если я удалю параметр if timestamp == result.lastModified 60000
(сбор stmt не затронут) в OnTimer и вместо этого заменю его другим параметром if if(ctx.timestamp < current.lastModified 60000) { deleteEventTimeTimer(current.lastModified 60000)}
в начале processElement, будет ли семантика программы такой же? есть ли предпочтение одной версии перед другой в случае одинаковой семантики?
Ответ №1:
Вы правы, полагая, что реализация, которая удаляет таймер, имеет ту же семантику. И на самом деле я недавно изменил пример, используемый в наших учебных материалах, чтобы сделать именно это, поскольку я предпочитаю этот подход. Причина, по которой я нахожу это предпочтительным, заключается в том, что тогда вся сложная бизнес-логика находится в одном месте (в processElement
), и всякий раз, когда onTimer
вызывается, вы точно знаете, что делать, без вопросов. Кроме того, он более производителен, поскольку для контрольной точки и, в конечном итоге, запуска требуется меньше таймеров.
Этот пример был написан для документов еще до того, как таймеры могли быть удалены, и не обновлялся.
Вы можете найти переработанный пример, который я упоминал на этих слайдах —https://training.ververica.com/decks/process-function / — как только вы пройдете страницу регистрации.
Черт возьми, я также недавно переработал эталонное решение для соответствующего учебного упражнения в том же духе:https://github.com/apache/flink-training/tree/master/long-ride-alerts.