Как запустить процессор только тогда, когда другой процессор не выполняется?

#apache-nifi

#apache-nifi

Вопрос:

Я вставляю / обновляю данные в таблицу. Система баз данных не обеспечивает функциональность «Upsert». Таким образом, я использую промежуточную таблицу для вставки, за которой следует слияние с «окончательной» таблицей, и, наконец, я усекаю промежуточную таблицу.

Это приводит к состоянию гонки. Если новые данные вставляются в промежуточную таблицу между слиянием усечение, эти данные теряются.

Как я могу убедиться, что этого не произойдет?

Я пытался смоделировать это с помощью Wait / Notify, но это тоже не чистое решение. Очередь для процессора PutDatabaseRecord «Поместить данные в промежуточную таблицу» может быть заполнена, и «MergeVertica для вставки / обновления» ExecuteSQL все еще может выполняться.

Поток Nifi

Комментарии:

1. просто идея: добавьте processed столбец в таблицу этапов. вставить со значением 0. и при слиянии использовать: update stage set processed=1; merge where processed=1; delete where processed=1; или вы могли бы добавить больше статусов… итак: используйте возможности базы данных для управления состоянием данных в базе данных.

Ответ №1:

Я бы использовал процессор MonitorActivity с порогом 60 или 30 секунд и использовал неактивный вывод с постоянной отправкой сообщений, установленным на «false».

Добейтесь успеха SQL inserts into staging connection в вашем MonitorActivity, таким образом, если в течение последних X секунд не будет замечено никакой активности, он запустит потоковый файл, который запустит ваш процесс слияния.

Загрузите шаблон сhttps://codeshare.io/aJNNkn

введите описание изображения здесь

Комментарии:

1. Спасибо за ваш ответ, я раньше не слышал о «MonitorActivity». Все еще существует случай, когда потоковый файл «застрял» в каком-то вышестоящем процессоре, и как только мы начинаем слияние, он передается в процессор PutDatabaseRecord. Может ли это произойти, если, например, процессор занят?