Преобразование запроса spark.sql в запрос spark / scala

#scala #apache-spark #apache-spark-sql

#scala #apache-spark #apache-spark-sql

Вопрос:

Я добавляю столбец в spark dataframe, используя некоторую бизнес-логику, которая возвращает true / false в scala. Реализация выполняется с использованием UDF, а UDF имеет более 10 аргументов, поэтому нам нужно сначала зарегистрировать UDF, прежде чем использовать его. Было выполнено следующее

 spark.udf.register("new_col", new_col)

// writing the UDF
val new_col(String, String, ..., Timestamp) => Boolean = (col1: String, col2: String, ..., col12: Timestamp) => {
     if ( ... ) true
     else false
}
  

Теперь, когда я пытаюсь написать следующее задание spark / Scala, оно не работает

 val result = df.withColumn("new_col", new_col(col1, col2, ..., col12))
  

Я получаю следующую ошибку

 <console>:56: error: overloaded method value udf with alternatives:
  (f: AnyRef,dataType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF10[_, _, _, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF9[_, _, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF8[_, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF7[_, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF6[_, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF5[_, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF4[_, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF3[_, _, _, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF2[_, _, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF1[_, _],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and>
  (f: org.apache.spark.sql.api.java.UDF0[_],returnType: org.apache.spark.sql.types.DataType)org.apache.spark.sql.expressions.UserDefinedFunction <and> ...
  

С другой стороны, если я создаю временное представление и использую spark.sql, он отлично работает следующим образом

 df.createOrReplaceTempView("data")
val result = spark.sql(
    s"""
    SELECT *, new_col(col1, col2, ..., col12) AS new_col FROM data
    """
    )
  

Я что-то упустил? Как можно заставить такой запрос работать в spark / scala?

Комментарии:

1. docs.databricks.com/spark/latest/spark-sql/udf-scala.html

Ответ №1:

Существуют различные способы регистрации UDF для использования в DataFrames и SparkSQL

Для использования в Spark Sql udf должен быть зарегистрирован как

 spark.sqlContext.udf.register("function_name", function)
  

Для использования в DataFrames

 val my_udf = org.apache.spark.sql.functions.udf(function)
  

Поскольку вы используете spark.SQLContext.udf.register, он доступен в Spark SQL.

РЕДАКТИРОВАТЬ: следующий код должен работать, я использовал только 2 бита col, он должен работать до 22 столбцов

 val new_col :(String, String) => Boolean = (col1: String, col2: String) => {
  true
}

val new_col_udf = udf(new_col)
spark.sqlContext.udf.register("new_col", new_col)

var df = Seq((1,2,3,4,5,6,7,8,9,10,11)).toDF()
df.createOrReplaceTempView("data")
val result = spark.sql(
  s"""SELECT *, new_col(_1, _2) AS new_col FROM data"""
)
result.show()
df = df.withColumn("test", new_col_udf($"_1",$"_2") )
df.show()

  

Комментарии:

1. обновлено кодом в разделе редактирования, который имеет udf и работает в df и sql, обратите внимание, что udf имеет ограничение на использование параметров до 22