#scala #apache-spark
#scala #apache-spark
Вопрос:
У меня есть фрейм данных spark, который я хочу применить с помощью агрегатных функций foldLeft
(или любого другого метода) к каждому столбцу. Агрегированные функции, применяемые к столбцу, будут зависеть от типа данных столбца.
Обратите внимание, что, поскольку я буду работать с большим фреймом данных, я не хочу использовать .collect()
или что-либо, что записывает много вещей в драйвер.
Начальный фрейм данных выглядит следующим образом:
---------------- ----------------- ------------------
| id(StringType) | lat(DoubleType) | long(DoubleType) |
---------------- ----------------- ------------------
| ID1 | 10.2 | 20.1 |
| ID2 | 11.1 | 50.1 |
| ID3 | null | null |
---------------- ----------------- ------------------
Для этого примера я хочу вычислить количество нулей для всех типов данных, вычислить только среднее значение DoubleType
и вычислить мощность только для StringType
столбцов.
Вот скелетный код, который у меня есть, который реализует foldLeft
, но это может быть неправильный путь.
def ourMethod(df: DataFrame): DataFrame = {
val columns = df.schema.fields
val initDf = spark.emptyDataFrame
columns.foldLeft(...)((tempDf, column) => {
column match {
case StructField(name, dataType, _, _) => {
dataType match {
case StringType => ... //something like df.select("column").approx_count_distinct(), though writes in driver.
case DoubleType => ... //something like df.agg(avg(column))
}
}
}
})
}
Ожидаемый результат выглядит следующим образом:
---------- --------- ------- -------------
| col_name | is_null | mean | cardinality |
---------- --------- ------- -------------
| id | 0 | null | 3 |
| lat | 1 | 10.65 | null |
| long | 1 | 35.1 | null |
---------- --------- ------- -------------
Ответ №1:
Не уверен foldLeft
, помогает ли здесь, но это определенно выполнимо. Данный фрейм данных
val df =
Seq(("ID1", Some(10.2), Some(20.1)),
("ID2", Some(11.1), Some(50.1)),
("ID3", None, None))
.toDF("id", "lat", "lon")
мы можем использовать несколько подходов.
- Программно создайте агрегатные функции. Довольно просто
val aggs = df.schema.fields.flatMap {
case StructField(name, DoubleType, _, _) =>
Seq(max(col(name).isNull) as s"${name}_is_null",
mean(col(name)) as s"${name}_mean")
case StructField(name, StringType, _, _) =>
Seq(max(col(name).isNull) as s"${name}_is_null",
max(length(col(name))) as s"${name}_cardinality")
}
df.agg(aggs.head, aggs.tail: _*).show()
Однако вывод будет в одной строке, а не точно таким, какой был задан. Конечно, эта единственная строка может быть, например, собрана в драйвере и изменена или преобразована в нужный формат. Это необработанный вывод:
---------- -------------- ----------- ------------------ ----------- --------
|id_is_null|id_cardinality|lat_is_null| lat_mean|lon_is_null|lon_mean|
---------- -------------- ----------- ------------------ ----------- --------
| false| 3| true|10.649999999999999| true| 35.1|
---------- -------------- ----------- ------------------ ----------- --------
- Преобразование строк в формат, в котором имя строки является столбцом, который можно использовать для группировки, а возможные значения заключены в поля с возможностью обнуления. Это работает, поскольку
null
значения не включаются в агрегированные данные
case class FlatRow(name: String, d: Option[Double], s: Option[String])
df.flatMap { row: Row =>
row.schema.fields.zipWithIndex.map {
case (StructField(name, DoubleType, _, _), index) =>
FlatRow(name,
if (row.isNullAt(index)) None
else Some(row.getDouble(index)),
None)
case (StructField(name, StringType, _, _), index) =>
FlatRow(name,
None,
if (row.isNullAt(index)) None
else Some(row.getString(index)))
}
}
.groupBy($"name")
.agg(max($"d".isNull amp;amp; $"s".isNull) as "is_null",
mean($"d") as "mean",
max(length($"s")) as "cardinality")
.show()
Немного больше кода, но он выводит запрошенный формат:
---- ------- ------------------ -----------
|name|is_null| mean|cardinality|
---- ------- ------------------ -----------
| lat| true|10.649999999999999| null|
| lon| true| 35.1| null|
| id| false| null| 3|
---- ------- ------------------ -----------
Комментарии:
1. Спасибо за решение. Второй метод особенно полезен. В качестве продолжения, возможно ли иметь пустой регистр по умолчанию, который игнорирует столбец (т.е. Не будет добавлен в выходной фрейм данных), если тип данных не Double или String ?
2. @igeass Да, вероятно, проще всего использовать
collect
, поэтому небольшое изменениеrow.schema.fields.zipWithIndex.map
->row.schema.fields.zipWithIndex.collect
должно сработать