Сокращение состояния flink на основе атрибутов хранимых объектов

#java #state #apache-flink #flink-streaming

#java #состояние #apache-flink #flink-потоковая передача

Вопрос:

Рассмотрим следующий пример класса, экземпляры которого хранятся в ListState :

 class BusinessObject {
  long clientId;
  String region;
  Instant lastDealDate; 
  bool isActive;
}
  

Приложение требует, чтобы этот объект не находился в состоянии flink, если прошло 1 год с момента заключения последней сделки ( lastDealDate ) с конкретным клиентом, и клиент не активен, т.е. isActive == false

Каков был бы правильный способ сделать это и сообщить flink об этих 2 факторах, чтобы он автоматически удалял эти записи? В настоящее время я считываю все элементы в состоянии, очищаю состояние, а затем добавляю обратно соответствующие, однако это начнет занимать много времени по мере увеличения числа клиентов и увеличения размера состояния. Большинство моих поисковых запросов в Интернете говорят об использовании time-to-live и настройке его для моего состояния через descriptor . Однако моя логика не может полагаться на время обработки / события / приема, и мне нужно также проверить, является ли isActive значение false.

Дополнительная информация: контекст не задан, а серверная часть — RocksDB. Причина ListState , по которой используется a, заключается в том, что все относительное состояние / история в соответствии с вышеуказанными условиями должны сбрасываться каждый день.

Есть предложения?

Комментарии:

1. Можете ли вы рассказать нам, почему вы используете ListState (и как он используется), работаете ли вы в контексте с ключом и какой сервер состояния вы используете?

2. @DavidAnderson спасибо, что спросили, добавил ответы на ваши вопросы в тело вопроса.

Ответ №1:

С помощью серверной части состояния RocksDB Flink может добавлять к ListState без прохождения сериализации / десериализации, но любое чтение или модификация, отличные от добавления, являются дорогостоящими из-за ser / de.

Вам будет лучше, если вы сможете переделать вещи так, чтобы эти BusinessObjects хранились в mapState, даже если вам иногда приходится перебирать всю карту. Каждая пара ключ / значение в mapState будет отдельной записью RocksDB, и вы сможете индивидуально создавать / обновлять / удалять их без необходимости проходить ser / de для всей карты (если вам не нужно ее сканировать). (Для чего это стоит, итерация по mapState в RocksDB выполняется по карте в порядке сортировки по сериализованному ключу.)

mapState доступно только как состояние с ключом (или широковещательное), поэтому для этого изменения потребуется ввести ключ потока. Использование keyBy вызывает перетасовку сети (и ser / de), поэтому это будет дорого, но не так дорого, как использование ListState.

Комментарии:

1. Если мы игнорируем isActive часть is, можно ли направить flink на использование our lastDealDate в качестве времени для измерения, чтобы определить, истек ли срок действия объекта или нет?

2. Нет, это невозможно. Срок действия TTL состояния истекает в зависимости от времени, прошедшего с момента последней записи или последнего чтения или записи, и использует внутренние таймеры, к которым у вас нет доступа.

3. Итак, я должен сделать это ключевым контекстом, и в моей функции процесса я могу зарегистрировать таймер для запуска в желаемое время, например ctx.timestamp() myTtl , и выполнить очистку в onTimer методе? Это мой единственный способ?

4. Единственное, что я могу придумать, что могло бы сработать, — это написать фильтр сжатия RocksDB для реализации вашей логики истечения срока действия. Но я недостаточно знаю о фильтрах уплотнения, чтобы знать, будет ли это работать или насколько хорошо.

5. Правильно. Но, по крайней мере, подход с таймером может быть разумным обходным путем?