#scala #apache-spark #apache-spark-sql
#scala #apache-spark #apache-spark-sql
Вопрос:
Я прочитал файл JSON в Spark. Этот файл имеет следующую структуру:
root
|-- engagement: struct (nullable = true)
| |-- engagementItems: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- availabilityEngagement: struct (nullable = true)
| | | | |-- dimapraUnit: struct (nullable = true)
| | | | | |-- code: string (nullable = true)
| | | | | |-- constrained: boolean (nullable = true)
| | | | | |-- id: long (nullable = true)
| | | | | |-- label: string (nullable = true)
| | | | | |-- ranking: long (nullable = true)
| | | | | |-- type: string (nullable = true)
| | | | | |-- version: long (nullable = true)
| | | | | |-- visible: boolean (nullable = true)
Я создал рекурсивную функцию для выравнивания схемы с помощью столбцов, имеющих вложенный StructType
def flattenSchema(schema: StructType, prefix: String = null):Array[Column]=
{
schema.fields.flatMap(f => {
val colName = if (prefix == null) f.name else (prefix "." f.name)
f.dataType match {
case st: StructType => flattenSchema(st, colName)
case _ => Array(col(colName).alias(colName))
}
})
}
val newDF=SIWINSDF.select(flattenSchema(SIWINSDF.schema):_*)
val secondDF=newDF.toDF(newDF.columns.map(_.replace(".", "_")): _*)
Как я могу сгладить ArrayType, который содержит вложенный StructType, например engagementItems: array (nullable = true)
Приветствуется любая помощь.
Комментарии:
1. Имеет ли массив фиксированную длину? Если нет, то то, что вы пытаетесь сделать, будет сложным… Чтобы помочь нам помочь вам, не могли бы вы предоставить некоторый пример ввода и ожидаемый результат? Вы также могли бы упростить свой вопрос до минимальной схемы, для которой возникает ваша проблема.
2. если это так, то
ArrayType
надexplode
должно быть выполнено действиеdataframe
.
Ответ №1:
Проблема здесь в том, что вам нужно управлять обращением к ArrayType
и после преобразовать его в StructType
. Поэтому вы можете использовать преобразование среды выполнения Scala для этого.
Сначала я сгенерировал сценарий как следующий (кстати, было бы очень полезно включить это в ваш вопрос, поскольку это значительно упрощает воспроизведение проблемы):
case class DimapraUnit(code: String, constrained: Boolean, id: Long, label: String, ranking: Long, _type: String, version: Long, visible: Boolean)
case class AvailabilityEngagement(dimapraUnit: DimapraUnit)
case class Element(availabilityEngagement: AvailabilityEngagement)
case class Engagement(engagementItems: Array[Element])
case class root(engagement: Engagement)
def getSchema(): StructType ={
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.ScalaReflection
val schema = ScalaReflection.schemaFor[root].dataType.asInstanceOf[StructType]
schema.printTreeString()
schema
}
Это выведет:
root
|-- engagement: struct (nullable = true)
| |-- engagementItems: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- availabilityEngagement: struct (nullable = true)
| | | | |-- dimapraUnit: struct (nullable = true)
| | | | | |-- code: string (nullable = true)
| | | | | |-- constrained: boolean (nullable = false)
| | | | | |-- id: long (nullable = false)
| | | | | |-- label: string (nullable = true)
| | | | | |-- ranking: long (nullable = false)
| | | | | |-- _type: string (nullable = true)
| | | | | |-- version: long (nullable = false)
| | | | | |-- visible: boolean (nullable = false)
Затем я модифицировал вашу функцию, добавив дополнительную проверку для ArrayType и преобразовав ее в StructType с помощью asInstanceOf
:
import org.apache.spark.sql.types._
def flattenSchema(schema: StructType, prefix: String = null):Array[Column]=
{
schema.fields.flatMap(f => {
val colName = if (prefix == null) f.name else (prefix "." f.name)
f.dataType match {
case st: StructType => flattenSchema(st, colName)
case at: ArrayType =>
val st = at.elementType.asInstanceOf[StructType]
flattenSchema(st, colName)
case _ => Array(new Column(colName).alias(colName))
}
})
}
И, наконец, результаты:
val s = getSchema()
val res = flattenSchema(s)
res.foreach(println(_))
Вывод:
engagement.engagementItems.availabilityEngagement.dimapraUnit.code AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.code`
engagement.engagementItems.availabilityEngagement.dimapraUnit.constrained AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.constrained`
engagement.engagementItems.availabilityEngagement.dimapraUnit.id AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.id`
engagement.engagementItems.availabilityEngagement.dimapraUnit.label AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.label`
engagement.engagementItems.availabilityEngagement.dimapraUnit.ranking AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.ranking`
engagement.engagementItems.availabilityEngagement.dimapraUnit._type AS `engagement.engagementItems.availabilityEngagement.dimapraUnit._type`
engagement.engagementItems.availabilityEngagement.dimapraUnit.version AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.version`
engagement.engagementItems.availabilityEngagement.dimapraUnit.visible AS `engagement.engagementItems.availabilityEngagement.dimapraUnit.visible`
Комментарии:
1. Я предпочитаю общий код, для которого я использовал функцию explode для фрейма данных. Мне нужно знать, как я могу получить ВСЕ имена массивов?
2. Привет @J-kram ответ основан на вопросе: Как я могу сгладить ArrayType, который содержит вложенный StructType, например engagementItems: array (nullable = true) Ваша версия останавливала процесс внутренних элементов в engagementItems, таким образом, возвращая данные для availabilityEngagement. Итак, я изменил, чтобы иметь возможность обрабатывать и этот случай, чтобы достичь последнего уровня dimapraUnit. Так что, если вы попытаетесь сделать
val newDF=SIWINSDF.select(flattenSchema(SIWINSDF.schema):_*)
, вы получите сплющенную структуру dimapraUnit. Не об этом ли вы спрашивали?3. Привет @Alexandros Biratsis да, это правильно. также существует функция разнесения фрейма данных, более простая в использовании. providersDF=SIWINSDF.select(explode(col(«engagementItems»)).as(«collection»)).select(col(«collection.*») ) чтобы разбить все массивы, для этого я хочу получить все имена массивов
4. Точно, @J-kram, так мой пост ответил на твой вопрос или нет :)? Я не уверен, что понял
5. Приятно @Alexandros Biratsis, теперь мой вопрос заключается в том, как получить все имена массивов в фрейме данных, например engagementItems?