#java #state #apache-flink #flink-streaming
#java #состояние #apache-flink #flink-потоковая передача
Вопрос:
Рассмотрим следующий пример класса, экземпляры которого хранятся в ListState
:
class BusinessObject {
long clientId;
String region;
Instant lastDealDate;
bool isActive;
}
Приложение требует, чтобы этот объект не находился в состоянии flink, если прошло 1 год с момента заключения последней сделки ( lastDealDate
) с конкретным клиентом, и клиент не активен, т.е. isActive == false
Каков был бы правильный способ сделать это и сообщить flink об этих 2 факторах, чтобы он автоматически удалял эти записи? В настоящее время я считываю все элементы в состоянии, очищаю состояние, а затем добавляю обратно соответствующие, однако это начнет занимать много времени по мере увеличения числа клиентов и увеличения размера состояния. Большинство моих поисковых запросов в Интернете говорят об использовании time-to-live
и настройке его для моего состояния через descriptor
. Однако моя логика не может полагаться на время обработки / события / приема, и мне нужно также проверить, является ли isActive
значение false.
Дополнительная информация: контекст не задан, а серверная часть — RocksDB. Причина ListState
, по которой используется a, заключается в том, что все относительное состояние / история в соответствии с вышеуказанными условиями должны сбрасываться каждый день.
Есть предложения?
Комментарии:
1. Можете ли вы рассказать нам, почему вы используете ListState (и как он используется), работаете ли вы в контексте с ключом и какой сервер состояния вы используете?
2. @DavidAnderson спасибо, что спросили, добавил ответы на ваши вопросы в тело вопроса.
Ответ №1:
С помощью серверной части состояния RocksDB Flink может добавлять к ListState без прохождения сериализации / десериализации, но любое чтение или модификация, отличные от добавления, являются дорогостоящими из-за ser / de.
Вам будет лучше, если вы сможете переделать вещи так, чтобы эти BusinessObjects хранились в mapState, даже если вам иногда приходится перебирать всю карту. Каждая пара ключ / значение в mapState будет отдельной записью RocksDB, и вы сможете индивидуально создавать / обновлять / удалять их без необходимости проходить ser / de для всей карты (если вам не нужно ее сканировать). (Для чего это стоит, итерация по mapState в RocksDB выполняется по карте в порядке сортировки по сериализованному ключу.)
mapState доступно только как состояние с ключом (или широковещательное), поэтому для этого изменения потребуется ввести ключ потока. Использование keyBy вызывает перетасовку сети (и ser / de), поэтому это будет дорого, но не так дорого, как использование ListState.
Комментарии:
1. Если мы игнорируем
isActive
часть is, можно ли направить flink на использование ourlastDealDate
в качестве времени для измерения, чтобы определить, истек ли срок действия объекта или нет?2. Нет, это невозможно. Срок действия TTL состояния истекает в зависимости от времени, прошедшего с момента последней записи или последнего чтения или записи, и использует внутренние таймеры, к которым у вас нет доступа.
3. Итак, я должен сделать это ключевым контекстом, и в моей функции процесса я могу зарегистрировать таймер для запуска в желаемое время, например
ctx.timestamp() myTtl
, и выполнить очистку вonTimer
методе? Это мой единственный способ?4. Единственное, что я могу придумать, что могло бы сработать, — это написать фильтр сжатия RocksDB для реализации вашей логики истечения срока действия. Но я недостаточно знаю о фильтрах уплотнения, чтобы знать, будет ли это работать или насколько хорошо.
5. Правильно. Но, по крайней мере, подход с таймером может быть разумным обходным путем?