Сгладить последовательность карт для отображения, используя полиморфизм типов в Scala, Spark UDF

#scala #apache-spark #generics

#scala #apache-spark #обобщения

Вопрос:

У меня есть следующая функция, которая сглаживает последовательность отображений строки до double. Как я могу преобразовать тип string в double generic?

 val flattenSeqOfMaps = udf { values: Seq[Map[String, Double]] => values.flatten.toMap }
flattenSeqOfMaps: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,MapType(StringType,DoubleType,false),Some(List(ArrayType(MapType(StringType,DoubleType,false),true))))
  

Мне нужно что-то вроде,

 val flattenSeqOfMaps[S,D] = udf { values: Seq[Map[S, D]] => values.flatten.toMap }
  

Спасибо.

Редактировать 1: Я использую spark 2.3. Я знаю о функциях более высокого порядка в spark 2.4

Правка 2: я подошел немного ближе. Что мне нужно вместо f _ in val flattenSeqOfMaps = udf { f _} . Пожалуйста, сравните joinMap подпись типа и flattenSeqOfMaps подпись типа ниже

 scala> val joinMap = udf { values: Seq[Map[String, Double]] => values.flatten.toMap }
joinMap: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,MapType(StringType,DoubleType,false),Some(List(ArrayType(MapType(StringType,DoubleType,false),true))))

scala> def f[S,D](values: Seq[Map[S, D]]): Map[S,D] = { values.flatten.toMap}
f: [S, D](values: Seq[Map[S,D]])Map[S,D]

scala> val flattenSeqOfMaps = udf { f _}
flattenSeqOfMaps: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,MapType(NullType,NullType,true),Some(List(ArrayType(MapType(NullType,NullType,true),true))))
  

Правка 3: следующий код сработал для меня.

 scala> val flattenSeqOfMaps = udf { f[String,Double] _}
flattenSeqOfMaps: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,MapType(StringType,DoubleType,false),Some(List(ArrayType(MapType(StringType,DoubleType,false),true))))
  

Ответ №1:

Хотя вы могли бы определить свою функцию как

 import scala.reflect.runtime.universe.TypeTag

def flattenSeqOfMaps[S : TypeTag, D: TypeTag] = udf { 
  values: Seq[Map[S, D]] => values.flatten.toMap
}
  

а затем используйте конкретные экземпляры:

 val df = Seq(Seq(Map("a" -> 1), Map("b" -> 1))).toDF("val")

val flattenSeqOfMapsStringInt = flattenSeqOfMaps[String, Int]

df.select($"val", flattenSeqOfMapsStringInt($"val") as "val").show
  
  -------------------- ---------------- 
|                 val|             val|
 -------------------- ---------------- 
|[[a -> 1], [b -> 1]]|[a -> 1, b -> 1]|
 -------------------- ----------------|
  

также возможно использовать встроенные функции без какой-либо необходимости в явных обобщениях:

 import org.apache.spark.sql.functions.{expr, flatten, map_from_arrays}

def flattenSeqOfMaps_(col: String) = {
  val keys = flatten(expr(s"transform(`$col`, x -> map_keys(x))"))
  val values = flatten(expr(s"transform(`$col`, x -> map_values(x))"))
  map_from_arrays(keys, values)
}

df.select($"val", flattenSeqOfMaps_("val") as "val").show
  
  -------------------- ---------------- 
|                 val|             val|
 -------------------- ---------------- 
|[[a -> 1], [b -> 1]]|[a -> 1, b -> 1]|
 -------------------- ---------------- 
  

Комментарии:

1. Моя ошибка. Я пропустил упоминание версии spark 2.3. Редактирую вопрос.

Ответ №2:

Следующий код сработал для меня.

 scala> def f[S,D](values: Seq[Map[S, D]]): Map[S,D] = { values.flatten.toMap}
f: [S, D](values: Seq[Map[S,D]])Map[S,D]

scala> val flattenSeqOfMaps = udf { f[String,Double] _}
flattenSeqOfMaps: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,MapType(StringType,DoubleType,false),Some(List(ArrayType(MapType(StringType,DoubleType,false),true))))