#scala #apache-spark #apache-spark-sql
#scala #apache-spark #apache-spark-sql
Вопрос:
В настоящее время я работаю над проектом, в котором мне приходится извлекать некоторые ужасно вложенные данные из документа json (вывод из вызова REST API Log Analytics), пример структуры документа ниже (у меня намного больше столбцов):
{
"tables": [
{
"name": "PrimaryResult",
"columns": [
{
"name": "Category",
"type": "string"
},
{
"name": "count_",
"type": "long"
}
],
"rows": [
[
"Administrative",
20839
],
[
"Recommendation",
122
],
[
"Alert",
64
],
[
"ServiceHealth",
11
]
]
}
] }
Мне удалось извлечь этот документ json во фрейм данных, но я в тупике относительно того, куда идти дальше.
Цель, которую я пытаюсь достичь, — это результат, подобный приведенному ниже:
[{
"Category": "Administrative",
"count_": 20839
},
{
"Category": "Recommendation",
"count_": 122
},
{
"Category": "Alert",
"count_": 64
},
{
"Category": "ServiceHealth",
"count_": 11
}]
В идеале я хотел бы использовать свой массив столбцов в качестве заголовков для каждой записи. Затем я хочу разделить каждый массив записей из массива родительских строк на его собственную запись.
До сих пор я пытался сгладить свой необработанный импортированный фрейм данных, но это не сработает, поскольку данные строк представляют собой массив массивов.
Как мне решить эту головоломку?
Ответ №1:
С этим немного сложно справиться, но вот способ сделать это:
val df = spark.read.option("multiline",true).json("filepath")
val result = df.select(explode($"tables").as("tables"))
.select($"tables.columns".as("col"), explode($"tables.rows").as("row"))
.selectExpr("inline(arrays_zip(col, row))")
.groupBy()
.pivot($"col.name")
.agg(collect_list($"row"))
.selectExpr("inline(arrays_zip(Category, count_))")
result.show
-------------- ------
| Category|count_|
-------------- ------
|Administrative| 20839|
|Recommendation| 122|
| Alert| 64|
| ServiceHealth| 11|
-------------- ------
Чтобы получить вывод JSON, вы можете сделать
val result_json = result.agg(to_json(collect_list(struct("Category", "count_"))).as("json"))
result_json.show(false)
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|json |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|[{"Category":"Administrative","count_":"20839"},{"Category":"Recommendation","count_":"122"},{"Category":"Alert","count_":"64"},{"Category":"ServiceHealth","count_":"11"}]|
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Или вы можете сохранить как JSON, например, с result.save.json("output")
помощью .
Ответ №2:
Другой способ с помощью transform
функции :
import org.apache.spark.sql.functions._
val df = spark.read.option("multiline",true).json(inPath)
val df1 = df.withColumn("tables", explode($"tables"))
.select($"tables.rows".as("rows"))
.select(expr("inline(transform(rows, x -> struct(x[0] as Category, x[1] as _count)))"))
df1.show
// -------------- ------
//| Category|_count|
// -------------- ------
//|Administrative| 20839|
//|Recommendation| 122|
//| Alert| 64|
//| ServiceHealth| 11|
// -------------- ------
Затем сохранение в файл json:
df1.save.json(outPath)