Как я могу получить количество строк соответствия строки и добавить его в качестве нового столбца в Scala?

#scala #apache-spark

#scala #apache-spark

Вопрос:

Вот мой фрейм данных:

   val new_df = Seq(("mike","A","B","B","C","A"),
    ("bob","A","A","B","A","C")).toDF("name","math","science","english","history","art")
  

и вот результирующий фрейм данных, который я пытаюсь создать:

   val new_df2 = Seq(("mike","A","B","B","C","A",2,2,1),
    ("bob","A","A","B","A","C",3,1,1)).toDF("name","math","science","english","history","art","A_count","B_count","C_count")
  

Вот снимки до и после в табличном виде:

введите описание изображения здесь

введите описание изображения здесь

Итак, я хочу добавить столбец, который получает количество A (или любое строковое совпадение) каждой строки. Как бы я это сделал? Я знаю, что что-то с использованием withColumn будет работать, но я не уверен, как сопоставить строку по строке.

Большое спасибо и хорошего дня!

Ответ №1:

Это можно решить динамически за один проход путем плоского сопоставления строк, затем столбцов Dataframe , а затем выполнения a .pivot().count() для результата.

 val grades = Array[String]("A", "B", "C")
val cols = new_df.columns
val gcount = new_df.flatMap( row => {
    val name = row.getAs[String]("name")
    cols.flatMap( c => {
        val grade = row.getAs[String](c)
        if (grades.contains(grade)) {
            Some(name, grade)    
        } else {
            None
        }
    })
}).toDF("name", "grade")
  .groupBy("name")
  .pivot("grade")
  .count()
  .withColumnRenamed("A", "A_count")
  .withColumnRenamed("B", "B_count")
  .withColumnRenamed("C", "C_count")

new_df.join(gcount, "name").show()
  

Это приводит к:

  ---- ---- ------- ------- ------- --- ------- ------- ------- 
|name|math|science|english|history|art|A_count|B_count|C_count|
 ---- ---- ------- ------- ------- --- ------- ------- ------- 
|mike|   A|      B|      B|      C|  A|      2|      2|      1|
| bob|   A|      A|      B|      A|  C|      3|      1|      1|
 ---- ---- ------- ------- ------- --- ------- ------- ------- 
  

— Редактировать

Я могу пройти каждый из основных шагов и объяснить, что они делают:

 val gcount = new_df.flatMap( row => {
    val name = row.getAs[String]("name")
    cols.flatMap( c => {
        val grade = row.getAs[String](c)
        if (grades.contains(grade)) {
            Some(name, grade)    
        } else {
            None
        }
    })
}).toDF("name", "grade")
  

A .flatmap() похоже на a .map() , за исключением того, что вместо соотношения входных записей к выходным записям 1: 1, это соотношение 1: n . Другими словами, он может выдавать 0 или более записей для каждой обрабатываемой записи. В этом случае мы перебираем каждую строку, а затем перебираем каждый столбец, чтобы создать запись для каждой оценки для каждого человека (при условии, что значение ячейки существует в нашем исходном массиве интересных оценок). .toDF() Метод просто преобразует наш результат Dataset[(String, String)] обратно в Dataframe . Этот блок кода приводит к Dataframe тому, что выглядит следующим образом:

  ---- ----- 
|name|grade|
 ---- ----- 
|mike|    A|
|mike|    B|
|mike|    B|
|mike|    C|
|mike|    A|
| bob|    A|
| bob|    A|
| bob|    B|
| bob|    A|
| bob|    C|
 ---- ----- 
  

С помощью этого промежуточного Dataframe звена мы можем сделать

 groupBy("name").pivot("grade").count()
  

groupBy() Создает группы записей по значению, содержащемуся в предоставленном столбце. А затем .pivot() примет различные значения в предоставленном столбце и создаст новый столбец для каждого из них. Наконец, .count() метод определяет, как агрегировать значения различных групп столбцов.

.withColumnRenamed() Методы были там только для того, чтобы сделать окончательный Dataframe «вид» таким же, как тот, который вы запросили, фактически переименовав столбцы (поскольку фактические предыдущие значения оценок стали именами столбцов). Это было бы лучше решить, изменив Some(name, grade) на Some(name, grade "_count") , чтобы избежать переименования статического столбца.

Что касается ошибки CSV, которую вы получаете, мне нужно было бы увидеть фактический код и заголовок CSV, чтобы понять, что может быть причиной этого.

— Альтернативное решение

Я также разработал хакерское альтернативное решение, которое требует, чтобы столбцы в вашем исходном Dataframe коде были исправлены, и, возможно, это не самая эффективная вещь. Это в основном не связано с лучшим решением выше, но я предоставляю его на случай, если это полезно:

 var df = new_df
val grades = Array[String]("A", "B", "C")

grades.foreach(g => {
    df = df.withColumn(g  "_count", ($"math" === lit(g)).cast("Int")   ($"science" === lit(g)).cast("Int")   ($"english" === lit(g)).cast("Int")   ($"history" === lit(g)).cast("Int")   ($"art" === lit(g)).cast("Int"))
})
  

Опять же, это хакерское решение, но оно может работать для небольшого количества «оценок», фиксированного набора столбцов, и если вы не заботитесь об эффективности.

Комментарии:

1. Есть ли у вас пошаговое решение, которое было бы проще понять и разбить на более мелкие шаги? Ваш код отлично работает с этим образцом фрейма данных, но когда я применяю его к своему огромному фрейму данных, я получаю ошибки, что заголовок CSV не соответствует схеме. И да, я дважды проверил, что я изменил все имена переменных соответствующим образом и т. Д.

2. Я добавил некоторое объяснение основных шагов после «— Edit».

Ответ №2:

Лучший способ сделать это — использовать Spark SQL. Хотя вы можете решить эту проблему с помощью flatMap, это намного медленнее, и лучше попробовать сделать все это просто в строке! Я написал это, чтобы вы могли легко расширить до большего количества классов, и там немного меньше проблем.

 import org.apache.spark.sql.Column

def colIsGrade(col:Column, grade:String) = when(col === lit(grade), lit(1)).otherwise(lit(0))

def countOccurenceOf(grade:String) = (List($"math", $"science", $"english", $"history", $"art").foldLeft(lit(0)) {
  case (count, subject) => colIsGrade(subject, grade)   count
}).as(s"${grade}_count")

val grades = List("A","B","C","D","E","F")
val gradesColumnStatement = grades.map(countOccurenceOf)

new_df.select(col("*")  : gradesColumnStatement :_*)