Функция инъекции SparkSessionExtensions в среде Databricks

#apache-spark #apache-spark-sql #databricks #databricks-community-edition

Вопрос:

Функция инъекции SparkSessionExtensions работает локально, но я не могу заставить ее работать в среде Databricks.

Проект итачи определяет выражения катализатора, подобные age тем, которые я могу успешно использовать локально с помощью spark-sql :

 bin/spark-sql --packages com.github.yaooqinn:itachi_2.12:0.1.0 --conf spark.sql.extensions=org.apache.spark.sql.extra.PostgreSQLExtensions
 
 spark-sql> select age(timestamp '2000', timestamp'1990');
10 years
 

У меня возникли проблемы с тем, чтобы заставить это работать в среде Databricks.

Я запустил кластер сообщества Databricks с spark.sql.extensions=org.apache.spark.sql.extra.PostgreSQLExtensions установленным параметром конфигурации.

создание кластера

Затем я присоединил библиотеку.

прикрепить lib

array_append Функция, определенная в итачи, недоступна, как я и ожидал:

не удается запустить функцию

Подтвердите правильность настройки параметра конфигурации:

введите описание изображения здесь

У spark-алхимии есть еще один подход, который работает в среде Databricks. Нужно ли нам возиться с внутренними устройствами Spark, чтобы это работало в среде Databricks? Или есть способ начать injectFunction работать в базах данных?

Ответ №1:

Это spark.sql.extensions прекрасно работает с полными базами данных (пока это не слишком глубоко войдет во внутренние части Spark — иногда возникают несовместимости), но не в Community Edition. Проблема в том, что spark.sql.extensions они вызываются во время инициализации сеанса, а библиотека, указанная в пользовательском интерфейсе, устанавливается впоследствии, поэтому это происходит после/параллельно с инициализацией. На полных базах данных, которые работают с использованием сценария инициализации для установки библиотеки перед запуском кластера, но эта функция недоступна в Community Edition.

Обходным путем было бы явное регистрация функций, как это:

 %scala
import org.apache.spark.sql.catalyst.expressions.postgresql.{Age, ArrayAppend, ArrayLength, IntervalJustifyLike, Scale, SplitPart, StringToArray, UnNest}
import org.apache.spark.sql.extra.FunctionAliases

spark.sessionState.functionRegistry.registerFunction(Age.fd._1, Age.fd._2, Age.fd._3)
spark.sessionState.functionRegistry.registerFunction(FunctionAliases.array_cat._1, FunctionAliases.array_cat._2, FunctionAliases.array_cat._3)
spark.sessionState.functionRegistry.registerFunction(ArrayAppend.fd._1, ArrayAppend.fd._2, ArrayAppend.fd._3)
spark.sessionState.functionRegistry.registerFunction(ArrayLength.fd._1, ArrayLength.fd._2, ArrayLength.fd._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyDays._1, IntervalJustifyLike.justifyDays._2, IntervalJustifyLike.justifyDays._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyHours._1, IntervalJustifyLike.justifyHours._2, IntervalJustifyLike.justifyHours._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyInterval._1, IntervalJustifyLike.justifyInterval._2, IntervalJustifyLike.justifyInterval._3)
spark.sessionState.functionRegistry.registerFunction(Scale.fd._1, Scale.fd._2, Scale.fd._3)
spark.sessionState.functionRegistry.registerFunction(SplitPart.fd._1, SplitPart.fd._2, SplitPart.fd._3)
spark.sessionState.functionRegistry.registerFunction(StringToArray.fd._1, StringToArray.fd._2, StringToArray.fd._3)
spark.sessionState.functionRegistry.registerFunction(UnNest.fd._1, UnNest.fd._2, UnNest.fd._3)
 

После этого это работает:

введите описание изображения здесь

Это не так удобно, как расширения, но это ограничение CE.

Комментарии:

1. Итак, чтобы заставить это работать с полными базами данных, мне нужно установить библиотеку с помощью сценария инициализации (а не устанавливать библиотеку после запуска кластера)? Не могли бы вы предоставить фрагмент кода? Спасибо!

2. это просто сценарий оболочки, который либо копирует файл из DBFS (as /dbfs/some_location ), либо загружает файл из Интернета и помещает его в /databricks/jars . Смотрите примеры здесь: docs.databricks.com/clusters/…