Как вычислить среднее значение выбранных столбцов по строке в spark scala?

#scala #apache-spark

#scala #apache-spark

Вопрос:

Допустим, у меня есть такой фрейм данных, где не каждый столбец имеет числовой тип

   val df_test = Seq((1,2,"A"),(1,2,"B"),(3,4,"C")).toDF("num1","num2","let3")
  

введите описание изображения здесь

как бы мне создать новый столбец с именем «avg_col», который выбирает определенные столбцы по имени (в данном случае num1 и num2) и принимает среднее значение по строкам из них.

Я хотел бы передать столбцы, которые будут выбраны в виде списка.

Спасибо и хорошего дня!

Ответ №1:

Я не уверен, что существует определенная функция для строк, но вы всегда можете это сделать:

 val df_test = Seq((1,2,4,"A"),(1,2,3, "B"),(3,4,5,"C")).toDF("num1","num2","num3", "let3")

//  ---- ---- ---- ---- 
// |num1|num2|num3|let3|
//  ---- ---- ---- ---- 
// |   1|   2|   4|   A|
// |   1|   2|   3|   B|
// |   3|   4|   5|   C|
//  ---- ---- ---- ---- 

// I will assume that you have a list of columns that you want to average.

val cols = List("num1", "num3", "num3")

// Map list of columns to [Column]

val test_cols = cols.map(name => col(name))

val avg_func = test_cols.foldLeft(lit(0)){(x, y) => x y}/test_cols.length

df_test.withColumn("avg_col", avg_func).show(false)

//  ---- ---- ---- ---- ------------------ 
// |num1|num2|num3|let3|avg_col           |
//  ---- ---- ---- ---- ------------------ 
// |1   |2   |4   |A   |3.0               |
// |1   |2   |3   |B   |2.3333333333333335|
// |3   |4   |5   |C   |4.333333333333333 |
//  ---- ---- ---- ---- ------------------ 
  

Комментарии:

1. Хорошо, спасибо, но как мне сделать это, передавая список столбцов, а не жестко кодируя имена столбцов?

2. Я только что обновил ответ, чтобы учесть передачу списка столбцов.

3. По какой-то причине я получаю все нули с новым avg_col. Хм

4. импортированы ли у вас функции lit и col?

5. Я только что обновил код другим столбцом, работает просто отлично. Кстати, я использую spark версии 2.4.3.

Ответ №2:

Функция среднего значения по строке (с обработкой null) выглядит следующим образом:

  • Среднее значение 1,2,3 должно быть 1 2 3/3 =2
  • Среднее значение 0,0,0 должно быть 0 0 0/3 =0
  • Среднее значение 3,3,0 должно быть 3 3 0/3 =2
  • Среднее значение 8, null, 4 должно быть 8 4/2 = 6
  • Среднее значение null, null,4 должно быть 4/1 = 4
  • Среднее значение null, null, null должно быть null

Код-

 import org.apache.spark.sql.functions._
import org.apache.spark.sql._

val df_test = spark.read.option("header",true)
      .csv("src/test/resources/row_wise_average.csv")

//csv contents for testing
//num1,num2,num3,let3
//1,2,4,"A"
//1,2,3,"B"
//3,4,5,"C"
//8,,4,"D"
//,,4,"E"
//,,,"F"
//0,0,0,"G"
//3,3,0,"H"

df_test.show()

 ---- ---- ---- ---- 
|num1|num2|num3|let3|
 ---- ---- ---- ---- 
|   1|   2|   4|   A|
|   1|   2|   3|   B|
|   3|   4|   5|   C|
|   8|null|   4|   D|
|null|null|   4|   E|
|null|null|null|   F|
|   0|   0|   0|   G|
|   3|   3|   0|   H|
 ---- ---- ---- ---- 

def columnAverage(colList: List[String]): Column = {
  var denominator = lit(0)
  var numerator = lit(0)
  colList.foreach{colName =>
    denominator = denominator   when(col(colName).isNull,lit(0)).otherwise(lit(1))
    numerator = numerator   when(col(colName).isNull,lit(0)).otherwise(col(colName))
  }
  when(denominator === lit(0),lit(null)).otherwise(numerator/denominator)
}

df_test.withColumn("avg_col",columnAverage(List("num1", "num2", "num3"))).show(false)

 ---- ---- ---- ---- ------------------ 
|num1|num2|num3|let3|avg_col           |
 ---- ---- ---- ---- ------------------ 
|1   |2   |4   |A   |2.3333333333333335|
|1   |2   |3   |B   |2.0               |
|3   |4   |5   |C   |4.0               |
|8   |null|4   |D   |6.0               |
|null|null|4   |E   |4.0               |
|null|null|null|F   |null              |
|0   |0   |0   |G   |0.0               |
|3   |3   |0   |H   |2.0               |
 ---- ---- ---- ---- ------------------