Выравнивание очень вложенного фрейма данных Spark Scala

#scala #apache-spark #nested #spark-dataframe

#scala #apache-spark #вложенный #apache-spark-sql

Вопрос:

У меня очень вложенный фрейм данных, который я пытаюсь сгладить. Исходная схема выглядит следующим образом:

  |-- _History: struct (nullable = true)
 |    |-- Article: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- Id: string (nullable = true)
 |    |    |    |-- Timestamp: long (nullable = true)
 |    |-- Channel: struct (nullable = true)
 |    |    |-- Music: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- Sports: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- Style: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
  

Я могу сгладить большинство полей, используя рекурсивную функцию:

 implicit class DataFrameFlattener(df: DataFrame) {
  def flattenSchema: DataFrame = {
    df.select(flatten(Nil, df.schema): _*)
  }

  protected def flatten(path: Seq[String], schema: DataType): Seq[Column] = schema match {
    case s: StructType => s.fields.flatMap(f => flatten(path :  f.name, f.dataType))
    case other => col(path.map(n => s"`$n`").mkString(".")).as(path.mkString(".")) :: Nil
  } 
}
  

Однако, похоже, это не может сгладить _History.Article.Id и _History.Article.Timstamp в схеме выше. Почему это так и как мне сгладить эти два поля в их собственные столбцы внутри фрейма данных?

Ответ №1:

Используя scala spark, вы можете рекурсивно сгладить json:

 import org.apache.spark.sql.{ Row, SaveMode, SparkSession, DataFrame }
def recurs(df: DataFrame): DataFrame = {
  if(df.schema.fields.find(_.dataType match {
    case ArrayType(StructType(_),_) | StructType(_) => true
    case _ => false
  }).isEmpty) df
  else {
    val columns = df.schema.fields.map(f => f.dataType match {
      case _: ArrayType => explode(col(f.name)).as(f.name)
      case s: StructType => col(s"${f.name}.*")
      case _ => col(f.name)
    })
    recurs(df.select(columns:_*))
  }
}
val df = spark.read.json(json_location)
flatten_df = recurs(df)
flatten_df.show()
  

Это создаст массив в пертикулярном столбце.

#

Если вам не нужен массив и добавление в другую строку, есть еще один:

 def flattenDataframe(df: DataFrame): DataFrame = {
    //getting all the fields from schema
    val fields = df.schema.fields
    val fieldNames = fields.map(x => x.name)
    //length shows the number of fields inside dataframe
    val length = fields.length
    for (i <- 0 to fields.length - 1) {
      val field = fields(i)
      val fieldtype = field.dataType
      val fieldName = field.name
      fieldtype match {
        case arrayType: ArrayType =>
          val fieldName1 = fieldName
          val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName1)
          val fieldNamesAndExplode = fieldNamesExcludingArray    Array(s"explode_outer($fieldName1) as $fieldName1")
          //val fieldNamesToSelect = (fieldNamesExcludingArray    Array(s"$fieldName1.*"))
          val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
          return flattenDataframe(explodedDf)

        case structType: StructType =>
          val childFieldnames = structType.fieldNames.map(childname => fieldName   "."   childname)
          val newfieldNames = fieldNames.filter(_ != fieldName)    childFieldnames
          val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_").replace("$", "_").replace("__", "_").replace(" ", "").replace("-", ""))))
          val explodedf = df.select(renamedcols: _*)
          return flattenDataframe(explodedf)
        case _ =>
      }
    }
    df
  }
  

Просто вызовите это, как и предыдущий, импортируйте некоторые библиотеки, если я пропустил.

Комментарии:

1. Спасибо, брат!!! второе сработало для меня. Я много искал вложенный и нашел это! Примечание: кто бы ни использовал этот код, иногда spark выдает исключения, что он не может найти столбец, хотя столбец есть, потому что имя столбца похоже a.b.name или a.b.ранг. Перед выполнением операций с ним обязательно переименуйте имена столбцов. как a.b.name называть и a.b.ранг к рангу, отказываясь от wote как от долга.

Ответ №2:

Я нашел обходной путь: создайте два новых столбца из сглаженных полей:

 val flatDF = df
    .withColumn("_History.Article.Id", df("`_History.Article`.Id")
    .withColumn("_History.Article.Timestamp", df("`_History.Article`.Timestamp")