#scala #apache-spark #apache-spark-sql #rdd
#scala #apache-spark #apache-spark-sql #rdd
Вопрос:
Я знакомлюсь с Spark и Scala, и моя текущая задача — «суммировать» эти два фрейма данных:
--- -------- -------------------
|cyl|avg(mpg)| var_samp(mpg)|
--- -------- -------------------
| 8| 15.8| 1.0200000000000014|
| 6| 20.9|0.48999999999999966|
| 4| 33.9| 0.0|
--- -------- -------------------
--- ------------------ ------------------
|cyl| avg(mpg)| var_samp(mpg)|
--- ------------------ ------------------
| 8| 13.75| 6.746999999999998|
| 6| 21.4| NaN|
--- ------------------ ------------------
В этом случае «ключ» cyl
и «значения» avg(mpg)
и var_samp(mpg)
.
(Приблизительный) результат для этих двух будет:
--- -------- -------------------
|cyl|avg(mpg)| var_samp(mpg)|
--- -------- -------------------
| 8| 29.55| 7.76712|
| 6| 42.3|0.48999999999999966|
| 4| 33.9| 0.0|
--- -------- -------------------
Обратите внимание, как NaN
считается равным нулю, а также как в некоторых фреймах данных могут отсутствовать «ключи» (во втором отсутствует 4 ключа).
Я подозреваю reduceByKey
, что это правильный путь, но не могу заставить его работать.
Вот мой код на данный момент:
case class Cars(car: String, mpg: String, cyl: String, disp: String, hp: String,
drat: String, wt: String, qsec: String, vs: String, am: String, gear: String, carb: String)
object Bootstrapping extends App {
override def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark and SparkSql").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
// Exploring SparkSQL
// Initialize an SQLContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
import sqlContext.implicits._
// Load a cvs file
val csv = sc.textFile("mtcars.csv")
// Create a Spark DataFrame
val headerAndRows = csv.map(line => line.split(",").map(_.trim))
val header = headerAndRows.first
val mtcdata = headerAndRows.filter(_(0) != header(0))
val mtcars = mtcdata
.map(p => Cars(p(0), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10), p(11)))
.toDF
// Aggregate data after grouping by columns
import org.apache.spark.sql.functions._
mtcars.sort($"cyl").show()
mtcars.groupBy("cyl").agg(avg("mpg"), var_samp("mpg")).sort($"cyl").show()
//sample 25% of the population without replacement
val sampledData = mtcars.sample(false, 0.25)
//bootstrapping loop
for (a <- 1 to 5) {
//get bootstrap sample
val bootstrapSample = sampledData.sample(true, 1)
//HERE!!! I WANT TO SAVE THE AGGREGATED SUM OF THE FOLLOWING:
bootstrapSample.groupBy("cyl").agg(avg("mpg"), var_samp("mpg"))
}
}
}
Это данные, которые я использую: дорожные тесты автомобилей Motor Trend
Ответ №1:
Одним из подходов было бы union
использовать два фрейма данных, использовать when/otherwise
для перевода NaN
и выполнять groupBy
для агрегирования sum
столбцов, как показано ниже:
import org.apache.spark.sql.functions._
import spark.implicits._
val df1 = Seq(
(8, 15.8, 1.0200000000000014),
(6, 20.9, 0.48999999999999966),
(4, 33.9, 0.0)
).toDF("cyl", "avg_mpg", "var_samp_mpg")
val df2 = Seq(
(8, 13.75, 6.746999999999998),
(6, 21.4, Double.NaN)
).toDF("cyl", "avg_mpg", "var_samp_mpg")
(df1 union df2).
withColumn("var_samp_mpg", when($"var_samp_mpg".isNaN, 0.0).otherwise($"var_samp_mpg")).
groupBy("cyl").agg(sum("avg_mpg"), sum("var_samp_mpg")).
show
// --- ------------ -------------------
// |cyl|sum(avg_mpg)| sum(var_samp_mpg)|
// --- ------------ -------------------
// | 6| 42.3|0.48999999999999966|
// | 4| 33.9| 0.0|
// | 8| 29.55| 7.7669999999999995|
// --- ------------ -------------------