#scala #apache-spark #apache-spark-sql #apache-spark-dataset
#scala #apache-spark #apache-spark-sql #apache-spark-dataset
Вопрос:
У меня есть библиотека в Scala для Spark, которая содержит много функций. Одним из примеров является следующая функция для объединения двух фреймов данных, которые имеют разные столбцы:
def appendDF(df2: DataFrame): DataFrame = {
val cols1 = df.columns.toSeq
val cols2 = df2.columns.toSeq
def expr(sourceCols: Seq[String], targetCols: Seq[String]): Seq[Column] = {
targetCols.map({
case x if sourceCols.contains(x) => col(x)
case y => lit(null).as(y)
})
}
// both df's need to pass through `expr` to guarantee the same order, as needed for correct unions.
df.select(expr(cols1, cols1): _*).union(df2.select(expr(cols2, cols1): _*))
}
Я хотел бы использовать эту функцию (и многие другие) для Dataset[CleanRow]
, а не для фреймов данных. CleanRow
здесь есть простой класс, который определяет имена и типы столбцов.
Мое обоснованное предположение состоит в том, чтобы преобразовать Dataset в Dataframe с помощью .toDF()
метода. Тем не менее, я хотел бы знать, есть ли лучшие способы сделать это.
Насколько я понимаю, между Dataset и Dataframe не должно быть много различий, поскольку Dataset — это просто Dataframe [Строка]. Кроме того, я думаю, что из Spark 2.x API для DF и DS были унифицированы, поэтому я подумал, что я мог бы передать любой из них взаимозаменяемо, но это не так.
Комментарии:
1. Если подпись метода не может быть изменена (например, для принятия универсального типа), я думаю, вам нужно это сделать
Dataset.toDF()
, в противном случае, если вы можете изменить подпись, можете ли вы сделать ееdef appendDF(ds: DataSet[A])
, которая может приниматьDataset[Row]
иDataset[T]
?2. Понятно. Так что это единственный вариант, исключающий изменение сигнатуры метода. Считается ли это хорошей практикой или нет (например: для кода производственного уровня)? Кроме того, о вашем предложении по изменению подписи, если я изменю его на Dataset [A], тогда он может также использоваться в качестве аргумента
somedata.toDF()
, верно? Это просто из любопытства.3. Да, я отправил ответ.
Ответ №1:
Возможно ли изменение подписи:
import spark.implicits._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset
def f[T](d: Dataset[T]): Dataset[T] = {d}
// You are able to pass a dataframe:
f(Seq(0,1).toDF()).show
// res1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [value: int]
// You are also able to pass a dataset:
f(spark.createDataset(Seq(0,1)))
// res2: org.apache.spark.sql.Dataset[Int] = [value: int]