#scala #apache-spark #user-defined-functions
Вопрос:
Я написал UDF в Spark (3.0.0), чтобы сделать хэш MD5 столбцов, который выглядит следующим образом:
def Md5Hash(text: String): String = {
java.security.MessageDigest.getInstance("MD5")
.digest(text.getBytes())
.map(0xFF amp; _)
.map("x".format(_))
.foldLeft("") { _ _ }
}
val md5Hash: UserDefinedFunction = udf(Md5Hash(_))
Эта функция отлично работала для меня в течение нескольких месяцев, но теперь она выходит из строя во время выполнения:
org.apache.spark.SparkException: Failed to execute user defined function(UDFs$$Lambda$3876/1265187815: (string) => string)
....
Caused by: java.lang.ArrayIndexOutOfBoundsException
at sun.security.provider.DigestBase.engineUpdate(DigestBase.java:116)
at sun.security.provider.MD5.implDigest(MD5.java:109)
at sun.security.provider.DigestBase.engineDigest(DigestBase.java:207)
at sun.security.provider.DigestBase.engineDigest(DigestBase.java:186)
at java.security.MessageDigest$Delegate.engineDigest(MessageDigest.java:592)
at java.security.MessageDigest.digest(MessageDigest.java:365)
at java.security.MessageDigest.digest(MessageDigest.java:411)
Он все еще работает с некоторыми небольшими наборами данных, но у меня есть еще один набор данных большего размера (10 мс строк, поэтому не очень большой), который здесь не работает. Я не смог найти никаких признаков того, что данные, которые я пытаюсь хэшировать, в любом случае являются странными-все входные значения являются ненулевыми строками ASCII. Что может вызвать эту ошибку, если раньше она работала нормально? Я работаю в AWS EMR 6.1.0 с Spark 3.0.0.
Комментарии:
1. можете ли вы опубликовать полный код? вы добавили нулевую проверку ?
2. Единственный другой соответствующий код-это когда я вызываю свой UDF-например,
val withHash = spark.read.parquet("s3://path/to/input.parquet").withColumn("hashed", $"message").cache
. Я подтвердил, что все мои входные данные не являются нулевыми.