Spark UDF теперь вызывает исключение ArrayIndexOutOfBoundsException

#scala #apache-spark #user-defined-functions

Вопрос:

Я написал UDF в Spark (3.0.0), чтобы сделать хэш MD5 столбцов, который выглядит следующим образом:

 def Md5Hash(text: String): String = {
  java.security.MessageDigest.getInstance("MD5")
    .digest(text.getBytes())
    .map(0xFF amp; _)
    .map("x".format(_))
    .foldLeft("") { _   _ }
}
val md5Hash: UserDefinedFunction = udf(Md5Hash(_))
 

Эта функция отлично работала для меня в течение нескольких месяцев, но теперь она выходит из строя во время выполнения:

 org.apache.spark.SparkException: Failed to execute user defined function(UDFs$$Lambda$3876/1265187815: (string) => string)
....
Caused by: java.lang.ArrayIndexOutOfBoundsException
        at sun.security.provider.DigestBase.engineUpdate(DigestBase.java:116)
        at sun.security.provider.MD5.implDigest(MD5.java:109)
        at sun.security.provider.DigestBase.engineDigest(DigestBase.java:207)
        at sun.security.provider.DigestBase.engineDigest(DigestBase.java:186)
        at java.security.MessageDigest$Delegate.engineDigest(MessageDigest.java:592)
        at java.security.MessageDigest.digest(MessageDigest.java:365)
        at java.security.MessageDigest.digest(MessageDigest.java:411)
 

Он все еще работает с некоторыми небольшими наборами данных, но у меня есть еще один набор данных большего размера (10 мс строк, поэтому не очень большой), который здесь не работает. Я не смог найти никаких признаков того, что данные, которые я пытаюсь хэшировать, в любом случае являются странными-все входные значения являются ненулевыми строками ASCII. Что может вызвать эту ошибку, если раньше она работала нормально? Я работаю в AWS EMR 6.1.0 с Spark 3.0.0.

Комментарии:

1. можете ли вы опубликовать полный код? вы добавили нулевую проверку ?

2. Единственный другой соответствующий код-это когда я вызываю свой UDF-например, val withHash = spark.read.parquet("s3://path/to/input.parquet").withColumn("hashed", $"message").cache . Я подтвердил, что все мои входные данные не являются нулевыми.