#arrays #json #apache-nifi #jsonpath
#массивы #json #apache-nifi #jsonpath
Вопрос:
Я хочу преобразовать JSON с массивом в формат csv. Количество элементов внутри массива является динамическим для каждой строки. Я пытался использовать этот поток (прикрепил файл потока xml к сообщению).
GetFile —> ConvertRecord —> UpdateAttribute —> PutFile
Есть ли другие альтернативы?
Формат JSON:
{ "LogData": {
"Location": "APAC",
"product": "w1" }, "Outcome": [
{
"limit": "0",
"pri": "3",
"result": "pass"
},
{
"limit": "1",
"pri": "2",
"result": "pass"
},
{
"limit": "5",
"priority": "1",
"result": "fail"
} ], "attr": {
"vers": "1",
"datetime": "2018-01-10 00:36:00" }}
Ожидаемый результат в формате csv:
location, product, limit, pri, result, vers, datetime
APAC w1 0 3 pass 1 2018-01-10 00:36:00
APAC w1 1 2 pass 1 2018-01-10 00:36:00
APAC w1 5 1 fail 1 2018-01-10 00:36:00
Вывод из подключенного потока:
Данные журнала, результат, attr
«MapRecord[{product= w1, Location= APAC}]»,»[MapRecord[{limit=0, result=pass, pri=3}], MapRecord[{limit=1, result=pass, pri=2}], MapRecord[{limit=5, result=fail}]]», «MapRecord[{datetime=2018-01-10 00:36:00, vers =1}]»
ConvertRecord — я использую конфигурации JSONTreereader и CSVRecordSSetwriter, как показано ниже:
Конфигурация службы контроллера JSONTreereader: Конфигурация службы контроллера CSVRecordSetwriter: Конфигурация службы контроллера AvroschemaRegistry:
Схема Avro: { «имя»: «myschema», «тип»: «запись», «пространство имен»: «myschema», «поля»: [{«имя»: «LogData», «тип»: { «имя»: «LogData», «тип»: «запись», «поля»: [{ «имя»: «Местоположение», «тип»: «строка»},{ «имя»: «продукт», «тип»: «строка»}] }},{«имя»: «Результат»,»тип»: { «тип»: «массив», «элементы»: {«имя»: «Итоговая запись», «тип»: «запись», «поля»: [ {«имя»: «ограничение», «тип»: «строка»}, {«имя»: «pri»,»тип»: [«строка»,»null»]}, {«имя»: «результат»,»тип»: «строка» }] }}},{«имя»: » attr», «тип»: { «имя»: «attr», «тип»: «запись», «поля»: [{ «имя»: «верс», «тип»: «строка»},{ «имя»: «дата-время», «тип»: «строка»} ]}} ]}
Ответ №1:
Попробуйте эту спецификацию в JoltTransformJSON перед ConvertRecord:
{
"operation": "shift",
"spec": {
"Outcome": {
"*": {
"@(3,LogData.Location)": "[#2].location",
"@(3,LogData.product)": "[#2].product",
"@(3,attr.vers)": "[#2].vers",
"@(3,attr.datetime)": "[#2].datetime",
"*": "[#2].amp;"
}
}
}
}
]```
Комментарии:
1. Огромное спасибо @mattyb за помощь. Это работает как шарм. Один вопрос: Если у меня будет более 1300 элементов массива внутри результата: [], будет ли преобразование jolt работать хорошо или будут какие-либо накладные расходы при преобразовании 1 входящей записи json в более чем 1300 записей json?
2. Не уверен, что вы подразумеваете под служебными данными, но я могу сказать вам, что все 1300 файлов потока будут переданы одновременно, как только все они завершат обработку и сеанс будет зафиксирован.
Ответ №2:
Кажется, что вам нужно выполнить JoltTransform перед преобразованием в CSV, если нет, это не сработает.