Десериализуйте данные JSON из FetchDistributedCacheMap для объединения в потоковый файл Apache Nifi

#apache-nifi

#apache-nifi

Вопрос:

введите описание изображения здесьИзвлечение данных Json из переменной атрибута У меня есть файл потока, который необходимо обогатить, поэтому я создал api, который можно вызвать. полученный json затем сохраняется в атрибуте, а затем записывается запись DistributedCacheMap с использованием PutDistributedMapCache, чтобы избежать повторных вызовов api для одного и того же устройства, поскольку многие потокифайлы поступают.

Пара KV выглядит следующим образом Key DeviceName=ABC123 Value = {«device»:»ABC123″,»State»: «NSW», «location»:»SYD55″}

Когда файлы потока обрабатываются, выполняется поиск, чтобы узнать, находятся ли данные устройства уже в распределенном MapCache, если это атрибут с именем cached-значение затем присваивается с помощью процесса FetchDistributedMapCache, результирующая переменная атрибута возвращается в следующем виде. cached-value: {«устройство»: «ABC123», «Состояние»: «Новый Южный Уэльс», «местоположение»: «SYD55»}

Затем мне нужно извлечь json из атрибута cached-value и объединить его обратно в файл потока, чтобы обогатить данные в этом файле потока. Я изо всех сил пытаюсь найти способ десериализовать этот json из атрибута cached-value, чтобы использовать его обратно в файле потока. Пробовал AttributesToJson без какой-либо удачи

Любые предложения будут оценены.

Ответ №1:

Большое спасибо Up_One за ответ на мой вопрос. В итоге я решил это немного по-другому, и я подумал, что поделюсь своим ответом, поскольку у кого-то еще могут быть идеи для упрощения.

Я также поделюсь шаблоном, который я создал, чтобы попытаться уточнить.

  1. Создайте файл потока для имитации принимаемых данных. вывод выглядит так, мы обогатим состояние, стойку, местоположение

введите описание изображения здесь

  1. Извлеките идентификатор устройства из данных, используя EvaluateJsonPath введите описание изображения здесь
  2. Попытайтесь прочитать идентификатор устройства из cachemap с помощью FetchDistributedCacheMap и сохраните его в атрибуте cached-value

введите описание изображения здесь

результат выглядит так

введите описание изображения здесь

  1. Затем я добавляю результаты чтения кэша в конец файла потока, используя процессор ReplaceText (оба являются json). Свойства введите описание изображения здесьРезультат добавления

введите описание изображения здесь

  1. Затем я выполняю второй ReplaceText, чтобы исправить синтаксис данных json после операции добавления, используя следующие свойства. введите описание изображения здесь Потоковый файл теперь выглядит следующим образом введите описание изображения здесь
  2. Затем я извлекаю переменные метаданных, которые планирую использовать для обогащения исходного файла потока с помощью процессора EvaluateJsonPath.

введите описание изображения здесь

  1. Наконец, я выполняю JoltTransform для данных файла потока, чтобы правильно отформатировать данные Свойства

введите описание изображения здесь

Теперь данные отформатированы в расширенном потоковом файле

введите описание изображения здесь

Ответ №2:

Вам нужно будет вставить результат атрибута поиска в содержимое потока, для этого используйте replacetext processor. (значение замены должно быть результатом вашего атрибута поиска)

После этого вам необходимо использовать процессор evaluatejsonpath для разбора содержимого входящего потока на отдельные атрибуты.

AttributesToJson создает полезную нагрузку json из атрибутов потока, вы будете использовать ее в конце для восстановления окончательного json, если он вам нужен.