Деактивировать OnTimer для KeyedProcessFunction в Flink

#java #apache-flink #flink-streaming #flink-cep

#java #apache-flink #flink-потоковая передача #flink-cep

Вопрос:

Давайте рассмотрим это в качестве примера:

Я был вызван onTimer функцией в KeyedProcessFunction , основанной на этой концепции:

(when a == "start" -> ctx.timerService().registerProcessingTimeTimer(some time in long)) ,

но затем появляется новая запись с этой концепцией, которая означает конец поездки:

(when a == "end" -> ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime();))

Этот второй таймер запускает действие немедленно, и идея состоит в том, чтобы очистить таймер, который был установлен ранее, потому что у меня нет ощущения, что предыдущий таймер остается в силе.

Дело в том, что если действие два не выполняется в течение одного часа, например, мне нужно что-то сделать (ctx.timerService().registerProcessingTimeTimer(some time in long)) , но если ожидаемое значение поступает в течение этого часа, тогда нет необходимости запускать таймер или запускать таймер мгновенно и забывать о другом таймере, запрограммированном ранее (ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime();)) , но у меня возникают проблемы, потому что таймерсрабатывает независимо от того, что происходит, предыдущий таймер тоже срабатывает.

Я бы попытался использовать ctx.timerService().deleteProcessingTimeTimer(some time in long); , но, похоже, это не работает.

Смотрите пример:

Порядок событий: событие A всегда будет поступать раньше, чем B. объяснение: событие B должно поступать в диапазоне одного часа после прибытия события A, в противном случае таймер будет запущен через час после прибытия A, но если B поступает после того, как таймер был установлен на один час, таймер должен быть запущен мгновенно ипредыдущий определенный таймер никогда не должен вызываться (удаляться).

  SingleOutputStreamOperator<Events> abandonment = stream.keyBy(e -> e.id)
.process(new KeyedProcessFunctionName());


public class KeyedProcessFunctionNameextends KeyedProcessFunction<String, Event, Event> {

@Override
    public void processElement(Event e, Context ctx, Collector<Event> out) throws Exception {
        if (e.value.equalsIgnoreCase("B")) {
{
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime());
        }

        if (stateTwo.value() == null amp;amp; e.value.equalsIgnoreCase("A")) {
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()   SOME_FIXED_TIME_IN_LONG);
        }
}

}


@Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Event> out) throws Exception {

/*when this timer is been called because of B it must not been called because of a previous timer set because of A*/
}
}
 

Есть идеи?

Комментарии:

1. Одно из учебных упражнений Flink охватывает этот вариант использования. См. github.com/apache/flink-training/tree/master/long-ride-alerts . Решение предоставляется.

Ответ №1:

Таймеры всегда (неявно) привязаны к ключу. Когда вы создаете таймер, он ассоциируется с ключом обрабатываемого события (или ключом таймера, который в данный момент запускается). Аналогично, вы можете удалять только таймеры, связанные с ключом, находящимся в данный момент в контексте. Если удаление таймера, похоже, не работает, возможно, именно поэтому.

Еще один факт, который следует иметь в виду, заключается в том, что таймеры дедуплицируются. Другими словами, для любой заданной пары (ключ, временная метка) может быть не более одного таймера времени события и одного таймера времени обработки. Последующие попытки зарегистрировать другой таймер для того же ключа и метки времени будут проигнорированы.

Иногда полезно использовать состояние с ключом, чтобы помнить, что должно быть сделано, когда срабатывает таймер (для того же ключа). Если у вас много таймеров на ключ, вы можете использовать MapState индексирование по метке времени, чтобы сохранить некоторое состояние для каждого таймера.

Комментарии:

1. Я добавляю пример к вопросу после этой строки: Смотрите Пример: Пожалуйста, дайте мне знать, если теперь идея более ясна, что мне нужно сделать. С уважением