Преобразовать массив Json в csv с помощью apache Nifi

#arrays #json #apache-nifi #jsonpath

#массивы #json #apache-nifi #jsonpath

Вопрос:

Я хочу преобразовать JSON с массивом в формат csv. Количество элементов внутри массива является динамическим для каждой строки. Я пытался использовать этот поток (прикрепил файл потока xml к сообщению).

GetFile —> ConvertRecord —> UpdateAttribute —> PutFile

Есть ли другие альтернативы?

Формат JSON:

 {  "LogData": {
"Location": "APAC",
"product": "w1"  },  "Outcome": [
{
  "limit": "0",
  "pri": "3",
  "result": "pass"
},
{
  "limit": "1",
  "pri": "2",
  "result": "pass"
},
{
  "limit": "5",
  "priority": "1",
  "result": "fail"
}  ],  "attr": {
"vers": "1",
"datetime": "2018-01-10 00:36:00"  }}
  

Ожидаемый результат в формате csv:

 location,   product,    limit,  pri,    result, vers,   datetime
APAC        w1          0       3       pass    1       2018-01-10 00:36:00
APAC        w1          1       2       pass    1       2018-01-10 00:36:00
APAC        w1          5       1       fail    1       2018-01-10 00:36:00
  

Вывод из подключенного потока:
Данные журнала, результат, attr
«MapRecord[{product= w1, Location= APAC}]»,»[MapRecord[{limit=0, result=pass, pri=3}], MapRecord[{limit=1, result=pass, pri=2}], MapRecord[{limit=5, result=fail}]]», «MapRecord[{datetime=2018-01-10 00:36:00, vers =1}]»

введите описание изображения здесь

ConvertRecord — я использую конфигурации JSONTreereader и CSVRecordSSetwriter, как показано ниже: введите описание изображения здесь

Конфигурация службы контроллера JSONTreereader: введите описание изображения здесь Конфигурация службы контроллера CSVRecordSetwriter: введите описание изображения здесь Конфигурация службы контроллера AvroschemaRegistry: введите описание изображения здесь

Схема Avro: { «имя»: «myschema», «тип»: «запись», «пространство имен»: «myschema», «поля»: [{«имя»: «LogData», «тип»: { «имя»: «LogData», «тип»: «запись», «поля»: [{ «имя»: «Местоположение», «тип»: «строка»},{ «имя»: «продукт», «тип»: «строка»}] }},{«имя»: «Результат»,»тип»: { «тип»: «массив», «элементы»: {«имя»: «Итоговая запись», «тип»: «запись», «поля»: [ {«имя»: «ограничение», «тип»: «строка»}, {«имя»: «pri»,»тип»: [«строка»,»null»]}, {«имя»: «результат»,»тип»: «строка» }] }}},{«имя»: » attr», «тип»: { «имя»: «attr», «тип»: «запись», «поля»: [{ «имя»: «верс», «тип»: «строка»},{ «имя»: «дата-время», «тип»: «строка»} ]}} ]}

Ответ №1:

Попробуйте эту спецификацию в JoltTransformJSON перед ConvertRecord:

   {
    "operation": "shift",
    "spec": {
      "Outcome": {
        "*": {
          "@(3,LogData.Location)": "[#2].location",
          "@(3,LogData.product)": "[#2].product",
          "@(3,attr.vers)": "[#2].vers",
          "@(3,attr.datetime)": "[#2].datetime",
          "*": "[#2].amp;"
        }
      }
    }
  }
]```
  

Комментарии:

1. Огромное спасибо @mattyb за помощь. Это работает как шарм. Один вопрос: Если у меня будет более 1300 элементов массива внутри результата: [], будет ли преобразование jolt работать хорошо или будут какие-либо накладные расходы при преобразовании 1 входящей записи json в более чем 1300 записей json?

2. Не уверен, что вы подразумеваете под служебными данными, но я могу сказать вам, что все 1300 файлов потока будут переданы одновременно, как только все они завершат обработку и сеанс будет зафиксирован.

Ответ №2:

Кажется, что вам нужно выполнить JoltTransform перед преобразованием в CSV, если нет, это не сработает.