О конфигурации TTL для состояний в Flink

#apache-flink #flink-streaming #flink-cep

#apache-flink #flink-потоковая передача #flink-cep

Вопрос:

давайте предположим, что у меня есть эта конфигурация для дескриптора, и отсюда были предприняты действия:

 ValueStateDescriptor<Event> descriptor = ...;

StateTtlConfig ttlConfigOneHourAndReturnExpire = StateTtlConfig.newBuilder(Time.hours(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build();

descriptor.enableTimeToLive(ttlConfigOneHourAndReturnExpire);

/*after one hour when the state is expired*/
Event e = state.value(); (step 1 and 2)
e.count = e.count   1; (step 3)
value.update(e); (step 4)
  

Будет ли это означать, что через 1 час, когда состояние уже устарело, все будет происходить в таком порядке:

  1. Возвращать предыдущее состояние записи в состояние, кроме того, не рекомендуется.
  2. Предыдущее состояние записи будет очищено после этого чтения.
  3. Обновите объект после того, как предыдущее состояние было доставлено и очищено (в режиме чтения).
  4. Обновить состояние в этом случае будет означать создать состояние снова, потому что предыдущее уже было удалено, и это значение займет еще один час, или состояние будет очищено в этот момент, а не в точке 1, и объект не будет включать обновление, и он будет сохранен в состоянии, в котором он прибыл?

Переход я мог бы объяснить сам, потому что документация мне не ясна.

Начиная с того момента, когда мне нужно очищать состояния, когда происходит смена дня, и нет способа сделать это с помощью TTL, я хочу очищать состояние через каждый час, но получать состояние перед удалением, обновлять текущее значение, а затем снова создавать состояние еще на один час, но всегда сохраняя предыдущее состояние перед его потерей.

Надеюсь, что это имеет смысл и возможно каким-то образом сделать. С уважением!

Ответ №1:

Если вам нужно изменять состояние каждый час, создайте пользовательское ProcessFunction и используйте таймер для запуска этого действия.

Комментарии:

1. Используя ProcessFunction , нужно ли мне объявлять окно на 1 час, чтобы использовать onTimer метод каждый час, а затем очищать состояние без потери его фактического состояния и повторно обновлять значение? если это так, я не могу сделать это таким образом, потому что вариант использования должен запускаться как можно быстрее, а не каждый час. Правильно ли я использую onTimer ?

2. Похоже, что использование KeyedProcessFunction не требуется для объявления window времени

3. Если я сделаю что-то вроде этого: long cleanupTime = getEndOfDay() - event.timestamp.getTime(); ctx.timerService().registerEventTimeTimer(cleanupTime); когда метод getEndOfDay предоставит мне последнее время дня в формате long, смогу ли я очистить свои ключи в конце дня? с уважением

4. Я не понимаю ваших требований. Вы сказали «Я хочу очищать состояние через каждый час», но в последнем фрагменте кода звучит так, как будто вы просто хотите очистить вещи в конце каждого дня.

5. да, извините за путаницу, мне нужно очистить все состояния в конце дня и начать новый день вообще без состояний. Правильно ли я делаю с приведенным выше фрагментом? С уважением!