scala.collection.mutable.WrappedArray $ ofRef не может быть преобразован в целое число

#apache-spark #apache-spark-sql #spark-dataframe

#apache-spark #apache-spark-sql

Вопрос:

Я довольно новичок в Spark и Scala. Я пытаюсь вызвать функцию как Spark UDF, но я сталкиваюсь с этой ошибкой, которую, похоже, не могу устранить.

Я понимаю, что в Scala Array и Seq не совпадают. WrappedArray является подтипом Seq, и между WrappedArray и Array есть неявные преобразования, но я не уверен, почему этого не происходит в случае UDF.

Я очень ценю любые указания, которые помогут мне понять и решить эту проблему.

Вот фрагмент кода

 def filterMapKeysWithSet(m: Map[Int, Int], a: Array[Int]): Map[Int, Int] = {
val seqToArray = a.toArray
val s = seqToArray.toSet
m filterKeys s
}

val myUDF = udf((m: Map[Int, Int], a: Array[Int]) => filterMapKeysWithSet(m, a))

case class myType(id: Int, m: Map[Int, Int])
val mapRDD = Seq(myType(1, Map(1 -> 100, 2 -> 200)), myType(2, Map(1 -> 100, 2 -> 200)), myType(3, Map(3 -> 300, 4 -> 400)))
val mapDF = mapRDD.toDF

mapDF: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>]
root
 |-- id: integer (nullable = false)
 |-- m: map (nullable = true)
 |    |-- key: integer
 |    |-- value: integer (valueContainsNull = false)

case class myType2(id: Int, a: Array[Int])
val idRDD = Seq(myType2(1, Array(1,2,100,200)), myType2(2, Array(100,200)), myType2(3, Array(1,2)) )
val idDF = idRDD.toDF

idDF: org.apache.spark.sql.DataFrame = [id: int, a: array<int>]
root
 |-- id: integer (nullable = false)
 |-- a: array (nullable = true)
 |    |-- element: integer (containsNull = false)

import sqlContext.implicits._
/* Hive context is exposed as sqlContext */

val j = mapDF.join(idDF, idDF("id") === mapDF("id")).drop(idDF("id"))
val k = j.withColumn("filteredMap",myUDF(j("m"), j("a")))
k.show
  

Глядя на фрейм данных «j» и «k», столбцы map и array имеют правильные типы данных.

 j: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>]
root
 |-- id: integer (nullable = false)
 |-- m: map (nullable = true)
 |    |-- key: integer
 |    |-- value: integer (valueContainsNull = false)
 |-- a: array (nullable = true)
 |    |-- element: integer (containsNull = false)

k: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>, filteredMap: map<int,int>]
root
 |-- id: integer (nullable = false)
 |-- m: map (nullable = true)
 |    |-- key: integer
 |    |-- value: integer (valueContainsNull = false)
 |-- a: array (nullable = true)
 |    |-- element: integer (containsNull = false)
 |-- filteredMap: map (nullable = true)
 |    |-- key: integer
 |    |-- value: integer (valueContainsNull = false)
  

Однако действие над фреймом данных «k», которое вызывает UDF, завершается неудачей со следующей ошибкой —

 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 6, ip-100-74-42-194.ec2.internal): java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [I
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:60)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1865)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
  

Комментарии:

1. Попробуйте сначала распечатать схему для обоих фреймов данных, т. е. mapDF и idDF, и вставьте ее сюда! Это дало бы нам некоторое представление!

2. Добавлена схема для фрейма данных. Спасибо, что изучили это.

3. просто еще одна вещь, добавьте также j-схему!

4. добавлена схема для j

5. Странно, что там написано «……..не может быть преобразован в [I».

Ответ №1:

Изменение типа данных с Array[Int] на Seq[Int] в функции filterMapKeysWithSet, похоже, решает вышеуказанную проблему.

 def filterMapKeysWithSet(m: Map[Int, Int], a: Seq[Int]): Map[Int, Int] = {

    val seqToArray = a.toArray

    val s = seqToArray.toSet

    m filterKeys s

  }

val myUDF = udf((m: Map[Int, Int], a: Seq[Int]) => filterMapKeysWithSet(m, a))

k: org.apache.spark.sql.DataFrame = [id: int, m: map<int,int>, a: array<int>, filteredMap: map<int,int>]
root
 |-- id: integer (nullable = false)
 |-- m: map (nullable = true)
 |    |-- key: integer
 |    |-- value: integer (valueContainsNull = false)
 |-- a: array (nullable = true)
 |    |-- element: integer (containsNull = false)
 |-- filteredMap: map (nullable = true)
 |    |-- key: integer
 |    |-- value: integer (valueContainsNull = false)

 --- -------------------- ---------------- -------------------- 
| id|                   m|               a|         filteredMap|
 --- -------------------- ---------------- -------------------- 
|  1|Map(1 -> 100, 2 -...|[1, 2, 100, 200]|Map(1 -> 100, 2 -...|
|  2|Map(1 -> 100, 2 -...|      [100, 200]|               Map()|
|  3|Map(3 -> 300, 4 -...|          [1, 2]|               Map()|
 --- -------------------- ---------------- -------------------- 
  

Таким образом, похоже, что ArrayType во фрейме данных «idDF» на самом деле является WrappedArray, а не массивом — поэтому вызов функции «filterMapKeysWithSet» завершился неудачно, поскольку ожидался массив, но вместо этого был получен WrappedArray / Seq (который неявно преобразуется в массив в Scala 2.8 и выше).

Комментарии:

1. Есть ли какой-либо способ явно преобразовать тип массива в idDF фрейма данных перед передачей функции? Все функции массива в spark 2.6.1, такие как collect_list(), collect_set(), создают WrappedArray.

2. правильно. Если в datarame есть столбцы типа массива (например: Array<String> ) , следует использовать последовательность в функции udf в качестве типа параметра (например, Seq[String] )

3. спасибо, сэр.. у меня тоже сработало (по крайней мере, на данный момент :))

4. Большое вам спасибо!