#apache-spark #apache-spark-sql #spark-dataframe
#apache-spark #apache-spark-sql
Вопрос:
Я довольно новичок в Spark и Scala. Я пытаюсь вызвать функцию как Spark UDF, но я сталкиваюсь с этой ошибкой, которую, похоже, не могу устранить.
Я понимаю, что в Scala Array и Seq не совпадают. WrappedArray является подтипом Seq, и между WrappedArray и Array есть неявные преобразования, но я не уверен, почему этого не происходит в случае UDF.
Я очень ценю любые указания, которые помогут мне понять и решить эту проблему.
Вот фрагмент кода
def filterMapKeysWithSet(m: Map[Int, Int], a: Array[Int]): Map[Int, Int] = {
val seqToArray = a.toArray
val s = seqToArray.toSet
m filterKeys s
}
val myUDF = udf((m: Map[Int, Int], a: Array[Int]) => filterMapKeysWithSet(m, a))
case class myType(id: Int, m: Map[Int, Int])
val mapRDD = Seq(myType(1, Map(1 -> 100, 2 -> 200)), myType(2, Map(1 -> 100, 2 -> 200)), myType(3, Map(3 -> 300, 4 -> 400)))
val mapDF = mapRDD.toDF
mapDF: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>]
root
|-- id: integer (nullable = false)
|-- m: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
case class myType2(id: Int, a: Array[Int])
val idRDD = Seq(myType2(1, Array(1,2,100,200)), myType2(2, Array(100,200)), myType2(3, Array(1,2)) )
val idDF = idRDD.toDF
idDF: org.apache.spark.sql.DataFrame = [id: int, a: array<int>]
root
|-- id: integer (nullable = false)
|-- a: array (nullable = true)
| |-- element: integer (containsNull = false)
import sqlContext.implicits._
/* Hive context is exposed as sqlContext */
val j = mapDF.join(idDF, idDF("id") === mapDF("id")).drop(idDF("id"))
val k = j.withColumn("filteredMap",myUDF(j("m"), j("a")))
k.show
Глядя на фрейм данных «j» и «k», столбцы map и array имеют правильные типы данных.
j: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>]
root
|-- id: integer (nullable = false)
|-- m: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
|-- a: array (nullable = true)
| |-- element: integer (containsNull = false)
k: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>, filteredMap: map<int,int>]
root
|-- id: integer (nullable = false)
|-- m: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
|-- a: array (nullable = true)
| |-- element: integer (containsNull = false)
|-- filteredMap: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
Однако действие над фреймом данных «k», которое вызывает UDF, завершается неудачей со следующей ошибкой —
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 6, ip-100-74-42-194.ec2.internal): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [I
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:60)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Комментарии:
1. Попробуйте сначала распечатать схему для обоих фреймов данных, т. е. mapDF и idDF, и вставьте ее сюда! Это дало бы нам некоторое представление!
2. Добавлена схема для фрейма данных. Спасибо, что изучили это.
3. просто еще одна вещь, добавьте также j-схему!
4. добавлена схема для j
5. Странно, что там написано «……..не может быть преобразован в [I».
Ответ №1:
Изменение типа данных с Array[Int] на Seq[Int] в функции filterMapKeysWithSet, похоже, решает вышеуказанную проблему.
def filterMapKeysWithSet(m: Map[Int, Int], a: Seq[Int]): Map[Int, Int] = {
val seqToArray = a.toArray
val s = seqToArray.toSet
m filterKeys s
}
val myUDF = udf((m: Map[Int, Int], a: Seq[Int]) => filterMapKeysWithSet(m, a))
k: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>, filteredMap: map<int,int>]
root
|-- id: integer (nullable = false)
|-- m: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
|-- a: array (nullable = true)
| |-- element: integer (containsNull = false)
|-- filteredMap: map (nullable = true)
| |-- key: integer
| |-- value: integer (valueContainsNull = false)
--- -------------------- ---------------- --------------------
| id| m| a| filteredMap|
--- -------------------- ---------------- --------------------
| 1|Map(1 -> 100, 2 -...|[1, 2, 100, 200]|Map(1 -> 100, 2 -...|
| 2|Map(1 -> 100, 2 -...| [100, 200]| Map()|
| 3|Map(3 -> 300, 4 -...| [1, 2]| Map()|
--- -------------------- ---------------- --------------------
Таким образом, похоже, что ArrayType во фрейме данных «idDF» на самом деле является WrappedArray, а не массивом — поэтому вызов функции «filterMapKeysWithSet» завершился неудачно, поскольку ожидался массив, но вместо этого был получен WrappedArray / Seq (который неявно преобразуется в массив в Scala 2.8 и выше).
Комментарии:
1. Есть ли какой-либо способ явно преобразовать тип массива в idDF фрейма данных перед передачей функции? Все функции массива в spark 2.6.1, такие как collect_list(), collect_set(), создают WrappedArray.
2. правильно. Если в datarame есть столбцы типа массива (например: Array<String> ) , следует использовать последовательность в функции udf в качестве типа параметра (например, Seq[String] )
3. спасибо, сэр.. у меня тоже сработало (по крайней мере, на данный момент :))
4. Большое вам спасибо!