Совместное использование состояния между операторами (KeyedProcessFunction и RichMapFunction) при переполнении RocksDB

#java #apache-flink #flink-streaming #flink-cep

#java #apache-flink #flink-потоковая передача #flink-cep

Вопрос:

введите описание изображения здесь

Исходя из приведенного выше изображения, на данный момент мне нужно разделить состояние между двумя операторами от одного KeyedProcessFunction , который будет обрабатывать события и преобразовывать их из класса X в класс Y и сохранять состояния для входящих записей, чтобы всегда отправлять самую свежую информацию класса Y в функцию вывода Python.

Результат функции вывода необходимо отобразить обратно в класс Y и обновить состояние этого объекта, уже созданного при ProcessFunction предыдущем выполнении Sink .

Насколько я читал, состояние трансляции невозможно при RocksDB. «Нет серверной части состояния RocksDB: состояние трансляции сохраняется в памяти во время выполнения, и выделение памяти должно выполняться соответствующим образом. Это справедливо для всех состояний оператора «.

вопросы:

  1. Каков наилучший способ сделать это, поскольку я использую RocksDB в качестве серверной части состояния?
  2. Возможно ли совместное использование состояний между a KeyedProcessFunction и a RichMapFunction ?

Ответ №1:

Вы можете использовать состояние трансляции при использовании RocksDB в качестве серверной части состояния. Состояние трансляции не будет сохранено в RocksDB — оно будет в куче — но оно будет помечено контрольной точкой. Таким образом, состояние широковещательной передачи должно быть достаточно маленьким, чтобы поместиться в памяти. (Более того, каждая задача будет независимо проверять копию состояния широковещательной передачи.)

Однако я не думаю, что состояние трансляции поможет в этом случае использования. Он передает состояние только всем экземплярам одного оператора.

Вы не можете делиться состоянием между операторами. Состояние строго локальное. Вы можете передавать выходные данные функции процесса в RichMapFunction, чтобы она содержала необходимую информацию. Карта не сможет напрямую влиять на состояние, хранящееся в функции процесса, но у нее может быть своя собственная копия этого состояния.

Однако, похоже, вы хотите, чтобы выходные данные функции вывода изменяли состояние в функции процесса. API потока данных не допускает подобных циклов в потоке данных. Но у вас есть несколько вариантов:

(1) Передайте результаты функции вывода во что-то вроде kafka / kinesis, а затем добавьте этот поток в качестве другого ввода в функцию процесса. (Другими словами, циклы возможны, если вы используете внешнюю очередь сообщений для разделения вещей. Конечно, это увеличивает задержку.)

(2) Используйте API функций с отслеживанием состояния. Он предлагает произвольные шаблоны взаимодействия между компонентами с отслеживанием состояния (вы не ограничены DAG), имеет отличную поддержку Python и многое другое. И все это поверх среды выполнения Flink, так что вы получаете те же преимущества в отношении согласованности, ровно один раз, масштабируемости и т.д.

Комментарии:

1. Я не уверен, объяснил ли я себя или нет, но идея заключается в том, что состояние, созданное в ProccessFunction , должно обновляться из RichFlatMap (или любого другого оператора) после того, как функция вывода Python отправляет результаты. Поэтому мне нужно каким-то образом перехватить состояние и обновить его из RichFlatMap (или любого другого оператора), чтобы всегда иметь последнее состояние пользователя в ProccessFunction .

2. Я прямо сейчас видел презентацию Стефана Юэна Stateful Functions , и, как вы упомянули, она выглядит очень похоже на то, что мне нужно. Есть ли какие-либо примеры того, как обмениваться информацией между операторами с помощью Stateful Functions ?

3. Есть ли какой-либо пример того, как интегрировать API потока данных с API функций с отслеживанием состояния?

4. Я полагаю, что есть тесты, которые используют этот механизм, но я не уверен, есть ли какие-либо примеры как таковые.