#apache-nifi
#apache-nifi
Вопрос:
Извлечение данных Json из переменной атрибута У меня есть файл потока, который необходимо обогатить, поэтому я создал api, который можно вызвать. полученный json затем сохраняется в атрибуте, а затем записывается запись DistributedCacheMap с использованием PutDistributedMapCache, чтобы избежать повторных вызовов api для одного и того же устройства, поскольку многие потокифайлы поступают.
Пара KV выглядит следующим образом Key DeviceName=ABC123 Value = {«device»:»ABC123″,»State»: «NSW», «location»:»SYD55″}
Когда файлы потока обрабатываются, выполняется поиск, чтобы узнать, находятся ли данные устройства уже в распределенном MapCache, если это атрибут с именем cached-значение затем присваивается с помощью процесса FetchDistributedMapCache, результирующая переменная атрибута возвращается в следующем виде. cached-value: {«устройство»: «ABC123», «Состояние»: «Новый Южный Уэльс», «местоположение»: «SYD55»}
Затем мне нужно извлечь json из атрибута cached-value и объединить его обратно в файл потока, чтобы обогатить данные в этом файле потока. Я изо всех сил пытаюсь найти способ десериализовать этот json из атрибута cached-value, чтобы использовать его обратно в файле потока. Пробовал AttributesToJson без какой-либо удачи
Любые предложения будут оценены.
Ответ №1:
Большое спасибо Up_One за ответ на мой вопрос. В итоге я решил это немного по-другому, и я подумал, что поделюсь своим ответом, поскольку у кого-то еще могут быть идеи для упрощения.
Я также поделюсь шаблоном, который я создал, чтобы попытаться уточнить.
- Создайте файл потока для имитации принимаемых данных. вывод выглядит так, мы обогатим состояние, стойку, местоположение
- Извлеките идентификатор устройства из данных, используя EvaluateJsonPath
- Попытайтесь прочитать идентификатор устройства из cachemap с помощью FetchDistributedCacheMap и сохраните его в атрибуте cached-value
результат выглядит так
- Затем я добавляю результаты чтения кэша в конец файла потока, используя процессор ReplaceText (оба являются json). Свойства Результат добавления
- Затем я выполняю второй ReplaceText, чтобы исправить синтаксис данных json после операции добавления, используя следующие свойства. Потоковый файл теперь выглядит следующим образом
- Затем я извлекаю переменные метаданных, которые планирую использовать для обогащения исходного файла потока с помощью процессора EvaluateJsonPath.
- Наконец, я выполняю JoltTransform для данных файла потока, чтобы правильно отформатировать данные Свойства
Теперь данные отформатированы в расширенном потоковом файле
Ответ №2:
Вам нужно будет вставить результат атрибута поиска в содержимое потока, для этого используйте replacetext processor. (значение замены должно быть результатом вашего атрибута поиска)
После этого вам необходимо использовать процессор evaluatejsonpath для разбора содержимого входящего потока на отдельные атрибуты.
AttributesToJson создает полезную нагрузку json из атрибутов потока, вы будете использовать ее в конце для восстановления окончательного json, если он вам нужен.