#pandas #apache-spark #pyspark #user-defined-functions
#pandas #apache-spark #pyspark #определяемые пользователем функции
Вопрос:
Я пытаюсь переписать UDF в pandas UDF.
Однако, когда дело доходит до столбца с ArrayType внутри. Я изо всех сил пытаюсь найти правильное решение.
У меня есть фрейм данных, как показано ниже:
----------- --------------------
| genre| ids|
----------- --------------------
| Crime|[6, 22, 42, 47, 5...|
| Romance|[3, 7, 11, 15, 17...|
| Thriller|[6, 10, 16, 18, 2...|
| Adventure|[2, 8, 10, 15, 29...|
| Children|[1, 2, 8, 13, 34,...|
| Drama|[4, 11, 14, 16, 1...|
| War|[41, 110, 151, 15...|
|Documentary|[37, 77, 99, 108,...|
| Fantasy|[2, 56, 60, 126, ...|
| Mystery|[59, 113, 123, 16...|
----------- --------------------
Следующий UDF работает хорошо:
pairs_udf = udf(lambda x: itertools.combinations(x, 2), transformer.schema)
df = df.select("genre", pairs_udf("ids").alias("ids"))
Вывод выглядит так:
----------- --------------------
| genre| ids|
----------- --------------------
| Crime|[[6, 22], [6, 42]...|
| Romance|[[3, 7], [3, 11],...|
| Thriller|[[6, 10], [6, 16]...|
| Adventure|[[2, 8], [2, 10],...|
| Children|[[1, 2], [1, 8], ...|
| Drama|[[4, 11], [4, 14]...|
| War|[[41, 110], [41, ...|
|Documentary|[[37, 77], [37, 9...|
| Fantasy|[[2, 56], [2, 60]...|
| Mystery|[[59, 113], [59, ...|
----------- --------------------
Однако, что было бы эквивалентно при записи функции в pandas udf
.
PS: Я понимаю, что в качестве альтернативы я могу использовать перекрестное объединение для достижения тех же результатов.
Но мне больше интересно, как pandas udf обрабатывает столбец с помощью ArrayType.
Комментарии:
1. может быть, что-то вроде
lambda row: row.apply(lambda x: itertools.combinations(x, 2))
2. спасибо, @mck, это была одна из моих попыток. теперь я думаю, что проблема, с которой я столкнулся, больше связана
java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
. после нескольких поисков в Google, похоже, это связано с поддержкой java 11 и spark-arrow. который может относиться к отдельному вопросу.
Ответ №1:
Я собираюсь поделиться своими выводами здесь:
есть 3 аспекта, чтобы заставить pandas udf работать для вашего проекта:
1. pandas UDF, или, точнее, Apache Arrow, не поддерживает сложные типы, как это делает обычный udf.(по состоянию pyspark 3.0.1
на , pyarrow 2.0.0
)
например:
ArrayType(StringType())
поддерживается pandas udf.ArrayType(StructType([...]))
не поддерживаются. вы можете узнать больше: https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#supported-sql-types
2. если вы используете Java 11, которая используется по умолчанию в (py) Spark 3. вам необходимо добавить следующее как часть вашей конфигурации spark:
spark.driver.extraJavaOptions='-Dio.netty.tryReflectionSetAccessible=true'
spark.executor.extraJavaOptions='-Dio.netty.tryReflectionSetAccessible=true'
это решит java.lang.UnsupportedOperationException
проблему, упомянутую выше.
3. убедитесь, что путь python вашей виртуальной среды добавлен в ваш pyspark_python
т.е. environ['PYSPARK_PYTHON']='./your/virtual/enviroment/path'