Разделение массива, содержащего вложенные массивы, с использованием Scala в Azure Databricks

#scala #apache-spark #apache-spark-sql

#scala #apache-spark #apache-spark-sql

Вопрос:

В настоящее время я работаю над проектом, в котором мне приходится извлекать некоторые ужасно вложенные данные из документа json (вывод из вызова REST API Log Analytics), пример структуры документа ниже (у меня намного больше столбцов):

 {
"tables": [
    {
        "name": "PrimaryResult",
        "columns": [
            {
                "name": "Category",
                "type": "string"
            },
            {
                "name": "count_",
                "type": "long"
            }
        ],
        "rows": [
            [
                "Administrative",
                20839
            ],
            [
                "Recommendation",
                122
            ],
            [
                "Alert",
                64
            ],
            [
                "ServiceHealth",
                11
            ]
        ]
    }
] }
 

Мне удалось извлечь этот документ json во фрейм данных, но я в тупике относительно того, куда идти дальше.

Цель, которую я пытаюсь достичь, — это результат, подобный приведенному ниже:

 [{
"Category": "Administrative",
"count_": 20839
},
{
    "Category": "Recommendation",
    "count_": 122
},
{
    "Category": "Alert",
    "count_": 64
},
{
    "Category": "ServiceHealth",
    "count_": 11
}]
 

В идеале я хотел бы использовать свой массив столбцов в качестве заголовков для каждой записи. Затем я хочу разделить каждый массив записей из массива родительских строк на его собственную запись.

До сих пор я пытался сгладить свой необработанный импортированный фрейм данных, но это не сработает, поскольку данные строк представляют собой массив массивов.

Как мне решить эту головоломку?

Ответ №1:

С этим немного сложно справиться, но вот способ сделать это:

 val df = spark.read.option("multiline",true).json("filepath")

val result = df.select(explode($"tables").as("tables"))
    .select($"tables.columns".as("col"), explode($"tables.rows").as("row"))
    .selectExpr("inline(arrays_zip(col, row))")
    .groupBy()
    .pivot($"col.name")
    .agg(collect_list($"row"))
    .selectExpr("inline(arrays_zip(Category, count_))")

result.show
 -------------- ------ 
|      Category|count_|
 -------------- ------ 
|Administrative| 20839|
|Recommendation|   122|
|         Alert|    64|
| ServiceHealth|    11|
 -------------- ------ 
 

Чтобы получить вывод JSON, вы можете сделать

 val result_json = result.agg(to_json(collect_list(struct("Category", "count_"))).as("json"))

result_json.show(false)
 --------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|json                                                                                                                                                                       |
 --------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|[{"Category":"Administrative","count_":"20839"},{"Category":"Recommendation","count_":"122"},{"Category":"Alert","count_":"64"},{"Category":"ServiceHealth","count_":"11"}]|
 --------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
 

Или вы можете сохранить как JSON, например, с result.save.json("output") помощью .

Ответ №2:

Другой способ с помощью transform функции :

 import org.apache.spark.sql.functions._

val df = spark.read.option("multiline",true).json(inPath)

val df1 = df.withColumn("tables", explode($"tables"))
            .select($"tables.rows".as("rows"))
            .select(expr("inline(transform(rows, x -> struct(x[0] as Category, x[1] as _count)))"))

df1.show
// -------------- ------ 
//|      Category|_count|
// -------------- ------ 
//|Administrative| 20839|
//|Recommendation|   122|
//|         Alert|    64|
//| ServiceHealth|    11|
// -------------- ------ 
 

Затем сохранение в файл json:

 df1.save.json(outPath)