Как передать DataSet (ы) в функцию, которая принимает DataFrame (ы) в качестве аргументов в Apache Spark с использованием Scala?

#scala #apache-spark #apache-spark-sql #apache-spark-dataset

#scala #apache-spark #apache-spark-sql #apache-spark-dataset

Вопрос:

У меня есть библиотека в Scala для Spark, которая содержит много функций. Одним из примеров является следующая функция для объединения двух фреймов данных, которые имеют разные столбцы:

 def appendDF(df2: DataFrame): DataFrame = {

  val cols1 = df.columns.toSeq
  val cols2 = df2.columns.toSeq

  def expr(sourceCols: Seq[String], targetCols: Seq[String]): Seq[Column] = {
    targetCols.map({
      case x if sourceCols.contains(x) => col(x)
      case y                           => lit(null).as(y)
    })
  }

  // both df's need to pass through `expr` to guarantee the same order, as needed for correct unions.
  df.select(expr(cols1, cols1): _*).union(df2.select(expr(cols2, cols1): _*))

}
  

Я хотел бы использовать эту функцию (и многие другие) для Dataset[CleanRow] , а не для фреймов данных. CleanRow здесь есть простой класс, который определяет имена и типы столбцов.
Мое обоснованное предположение состоит в том, чтобы преобразовать Dataset в Dataframe с помощью .toDF() метода. Тем не менее, я хотел бы знать, есть ли лучшие способы сделать это.

Насколько я понимаю, между Dataset и Dataframe не должно быть много различий, поскольку Dataset — это просто Dataframe [Строка]. Кроме того, я думаю, что из Spark 2.x API для DF и DS были унифицированы, поэтому я подумал, что я мог бы передать любой из них взаимозаменяемо, но это не так.

Комментарии:

1. Если подпись метода не может быть изменена (например, для принятия универсального типа), я думаю, вам нужно это сделать Dataset.toDF() , в противном случае, если вы можете изменить подпись, можете ли вы сделать ее def appendDF(ds: DataSet[A]) , которая может принимать Dataset[Row] и Dataset[T] ?

2. Понятно. Так что это единственный вариант, исключающий изменение сигнатуры метода. Считается ли это хорошей практикой или нет (например: для кода производственного уровня)? Кроме того, о вашем предложении по изменению подписи, если я изменю его на Dataset [A], тогда он может также использоваться в качестве аргумента somedata.toDF() , верно? Это просто из любопытства.

3. Да, я отправил ответ.

Ответ №1:

Возможно ли изменение подписи:

 import spark.implicits._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset

def f[T](d: Dataset[T]): Dataset[T] = {d}

// You are able to pass a dataframe:
f(Seq(0,1).toDF()).show
// res1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [value: int]

// You are also able to pass a dataset:
f(spark.createDataset(Seq(0,1)))
// res2: org.apache.spark.sql.Dataset[Int] = [value: int]