#java #apache-flink #flink-streaming #flink-cep
#java #apache-flink #flink-потоковая передача #flink-cep
Вопрос:
Давайте рассмотрим это в качестве примера:
Я был вызван onTimer
функцией в KeyedProcessFunction
, основанной на этой концепции:
(when a == "start" -> ctx.timerService().registerProcessingTimeTimer(some time in long))
,
но затем появляется новая запись с этой концепцией, которая означает конец поездки:
(when a == "end" -> ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime();))
Этот второй таймер запускает действие немедленно, и идея состоит в том, чтобы очистить таймер, который был установлен ранее, потому что у меня нет ощущения, что предыдущий таймер остается в силе.
Дело в том, что если действие два не выполняется в течение одного часа, например, мне нужно что-то сделать (ctx.timerService().registerProcessingTimeTimer(some time in long))
, но если ожидаемое значение поступает в течение этого часа, тогда нет необходимости запускать таймер или запускать таймер мгновенно и забывать о другом таймере, запрограммированном ранее (ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime();))
, но у меня возникают проблемы, потому что таймерсрабатывает независимо от того, что происходит, предыдущий таймер тоже срабатывает.
Я бы попытался использовать ctx.timerService().deleteProcessingTimeTimer(some time in long);
, но, похоже, это не работает.
Смотрите пример:
Порядок событий: событие A всегда будет поступать раньше, чем B. объяснение: событие B должно поступать в диапазоне одного часа после прибытия события A, в противном случае таймер будет запущен через час после прибытия A, но если B поступает после того, как таймер был установлен на один час, таймер должен быть запущен мгновенно ипредыдущий определенный таймер никогда не должен вызываться (удаляться).
SingleOutputStreamOperator<Events> abandonment = stream.keyBy(e -> e.id)
.process(new KeyedProcessFunctionName());
public class KeyedProcessFunctionNameextends KeyedProcessFunction<String, Event, Event> {
@Override
public void processElement(Event e, Context ctx, Collector<Event> out) throws Exception {
if (e.value.equalsIgnoreCase("B")) {
{
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime());
}
if (stateTwo.value() == null amp;amp; e.value.equalsIgnoreCase("A")) {
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() SOME_FIXED_TIME_IN_LONG);
}
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Event> out) throws Exception {
/*when this timer is been called because of B it must not been called because of a previous timer set because of A*/
}
}
Есть идеи?
Комментарии:
1. Одно из учебных упражнений Flink охватывает этот вариант использования. См. github.com/apache/flink-training/tree/master/long-ride-alerts . Решение предоставляется.
Ответ №1:
Таймеры всегда (неявно) привязаны к ключу. Когда вы создаете таймер, он ассоциируется с ключом обрабатываемого события (или ключом таймера, который в данный момент запускается). Аналогично, вы можете удалять только таймеры, связанные с ключом, находящимся в данный момент в контексте. Если удаление таймера, похоже, не работает, возможно, именно поэтому.
Еще один факт, который следует иметь в виду, заключается в том, что таймеры дедуплицируются. Другими словами, для любой заданной пары (ключ, временная метка) может быть не более одного таймера времени события и одного таймера времени обработки. Последующие попытки зарегистрировать другой таймер для того же ключа и метки времени будут проигнорированы.
Иногда полезно использовать состояние с ключом, чтобы помнить, что должно быть сделано, когда срабатывает таймер (для того же ключа). Если у вас много таймеров на ключ, вы можете использовать MapState
индексирование по метке времени, чтобы сохранить некоторое состояние для каждого таймера.
Комментарии:
1. Я добавляю пример к вопросу после этой строки: Смотрите Пример: Пожалуйста, дайте мне знать, если теперь идея более ясна, что мне нужно сделать. С уважением