#apache-flink #flink-streaming #flink-cep
#apache-flink #flink-потоковая передача #flink-cep
Вопрос:
давайте предположим, что у меня есть эта конфигурация для дескриптора, и отсюда были предприняты действия:
ValueStateDescriptor<Event> descriptor = ...;
StateTtlConfig ttlConfigOneHourAndReturnExpire = StateTtlConfig.newBuilder(Time.hours(1))
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build();
descriptor.enableTimeToLive(ttlConfigOneHourAndReturnExpire);
/*after one hour when the state is expired*/
Event e = state.value(); (step 1 and 2)
e.count = e.count 1; (step 3)
value.update(e); (step 4)
Будет ли это означать, что через 1 час, когда состояние уже устарело, все будет происходить в таком порядке:
- Возвращать предыдущее состояние записи в состояние, кроме того, не рекомендуется.
- Предыдущее состояние записи будет очищено после этого чтения.
- Обновите объект после того, как предыдущее состояние было доставлено и очищено (в режиме чтения).
- Обновить состояние в этом случае будет означать создать состояние снова, потому что предыдущее уже было удалено, и это значение займет еще один час, или состояние будет очищено в этот момент, а не в точке 1, и объект не будет включать обновление, и он будет сохранен в состоянии, в котором он прибыл?
Переход я мог бы объяснить сам, потому что документация мне не ясна.
Начиная с того момента, когда мне нужно очищать состояния, когда происходит смена дня, и нет способа сделать это с помощью TTL, я хочу очищать состояние через каждый час, но получать состояние перед удалением, обновлять текущее значение, а затем снова создавать состояние еще на один час, но всегда сохраняя предыдущее состояние перед его потерей.
Надеюсь, что это имеет смысл и возможно каким-то образом сделать. С уважением!
Ответ №1:
Если вам нужно изменять состояние каждый час, создайте пользовательское ProcessFunction
и используйте таймер для запуска этого действия.
Комментарии:
1. Используя
ProcessFunction
, нужно ли мне объявлять окно на 1 час, чтобы использоватьonTimer
метод каждый час, а затем очищать состояние без потери его фактического состояния и повторно обновлять значение? если это так, я не могу сделать это таким образом, потому что вариант использования должен запускаться как можно быстрее, а не каждый час. Правильно ли я используюonTimer
?2. Похоже, что использование
KeyedProcessFunction
не требуется для объявленияwindow
времени3. Если я сделаю что-то вроде этого:
long cleanupTime = getEndOfDay() - event.timestamp.getTime(); ctx.timerService().registerEventTimeTimer(cleanupTime);
когда методgetEndOfDay
предоставит мне последнее время дня в формате long, смогу ли я очистить свои ключи в конце дня? с уважением4. Я не понимаю ваших требований. Вы сказали «Я хочу очищать состояние через каждый час», но в последнем фрагменте кода звучит так, как будто вы просто хотите очистить вещи в конце каждого дня.
5. да, извините за путаницу, мне нужно очистить все состояния в конце дня и начать новый день вообще без состояний. Правильно ли я делаю с приведенным выше фрагментом? С уважением!