Вывод фрейма данных с использованием foldLeft в существующем фрейме данных

#scala #apache-spark

#scala #apache-spark

Вопрос:

У меня есть фрейм данных spark, который я хочу применить с помощью агрегатных функций foldLeft (или любого другого метода) к каждому столбцу. Агрегированные функции, применяемые к столбцу, будут зависеть от типа данных столбца.

Обратите внимание, что, поскольку я буду работать с большим фреймом данных, я не хочу использовать .collect() или что-либо, что записывает много вещей в драйвер.

Начальный фрейм данных выглядит следующим образом:

  ---------------- ----------------- ------------------ 
| id(StringType) | lat(DoubleType) | long(DoubleType) |
 ---------------- ----------------- ------------------ 
| ID1            | 10.2            | 20.1             |
| ID2            | 11.1            | 50.1             |
| ID3            | null            | null             |
 ---------------- ----------------- ------------------ 
  

Для этого примера я хочу вычислить количество нулей для всех типов данных, вычислить только среднее значение DoubleType и вычислить мощность только для StringType столбцов.

Вот скелетный код, который у меня есть, который реализует foldLeft , но это может быть неправильный путь.

 def ourMethod(df: DataFrame): DataFrame = {
  val columns = df.schema.fields
  val initDf = spark.emptyDataFrame
  columns.foldLeft(...)((tempDf, column) => {
    column match {
      case StructField(name, dataType, _, _) => {
        dataType match {
          case StringType => ... //something like df.select("column").approx_count_distinct(), though writes in driver.
          case DoubleType => ... //something like df.agg(avg(column))
        }
      }
    }
  })
}
  

Ожидаемый результат выглядит следующим образом:

  ---------- --------- ------- ------------- 
| col_name | is_null | mean  | cardinality |
 ---------- --------- ------- ------------- 
| id       |       0 | null  | 3           |
| lat      |       1 | 10.65 | null        |
| long     |       1 | 35.1  | null        |
 ---------- --------- ------- ------------- 
  

Ответ №1:

Не уверен foldLeft , помогает ли здесь, но это определенно выполнимо. Данный фрейм данных

 val df =
  Seq(("ID1", Some(10.2), Some(20.1)),
      ("ID2", Some(11.1), Some(50.1)),
      ("ID3", None, None))
    .toDF("id", "lat", "lon")
  

мы можем использовать несколько подходов.

  1. Программно создайте агрегатные функции. Довольно просто
 val aggs = df.schema.fields.flatMap {
  case StructField(name, DoubleType, _, _) =>
    Seq(max(col(name).isNull) as s"${name}_is_null",
        mean(col(name)) as s"${name}_mean")
  case StructField(name, StringType, _, _) =>
    Seq(max(col(name).isNull) as s"${name}_is_null",
        max(length(col(name))) as s"${name}_cardinality")
}

df.agg(aggs.head, aggs.tail: _*).show()
  

Однако вывод будет в одной строке, а не точно таким, какой был задан. Конечно, эта единственная строка может быть, например, собрана в драйвере и изменена или преобразована в нужный формат. Это необработанный вывод:

  ---------- -------------- ----------- ------------------ ----------- -------- 
|id_is_null|id_cardinality|lat_is_null|          lat_mean|lon_is_null|lon_mean|
 ---------- -------------- ----------- ------------------ ----------- -------- 
|     false|             3|       true|10.649999999999999|       true|    35.1|
 ---------- -------------- ----------- ------------------ ----------- -------- 
  
  1. Преобразование строк в формат, в котором имя строки является столбцом, который можно использовать для группировки, а возможные значения заключены в поля с возможностью обнуления. Это работает, поскольку null значения не включаются в агрегированные данные
 case class FlatRow(name: String, d: Option[Double], s: Option[String])

df.flatMap { row: Row =>
    row.schema.fields.zipWithIndex.map {
      case (StructField(name, DoubleType, _, _), index) =>
        FlatRow(name,
                if (row.isNullAt(index)) None
                else Some(row.getDouble(index)),
                None)
      case (StructField(name, StringType, _, _), index) =>
        FlatRow(name,
                None,
                if (row.isNullAt(index)) None
                else Some(row.getString(index)))
    }
  }
  .groupBy($"name")
  .agg(max($"d".isNull amp;amp; $"s".isNull) as "is_null",
       mean($"d") as "mean",
       max(length($"s")) as "cardinality")
  .show()

  

Немного больше кода, но он выводит запрошенный формат:

  ---- ------- ------------------ ----------- 
|name|is_null|              mean|cardinality|
 ---- ------- ------------------ ----------- 
| lat|   true|10.649999999999999|       null|
| lon|   true|              35.1|       null|
|  id|  false|              null|          3|
 ---- ------- ------------------ ----------- 
  

Комментарии:

1. Спасибо за решение. Второй метод особенно полезен. В качестве продолжения, возможно ли иметь пустой регистр по умолчанию, который игнорирует столбец (т.е. Не будет добавлен в выходной фрейм данных), если тип данных не Double или String ?

2. @igeass Да, вероятно, проще всего использовать collect , поэтому небольшое изменение row.schema.fields.zipWithIndex.map -> row.schema.fields.zipWithIndex.collect должно сработать