#apache-spark #apache-spark-sql #databricks #databricks-community-edition
Вопрос:
Функция инъекции SparkSessionExtensions работает локально, но я не могу заставить ее работать в среде Databricks.
Проект итачи определяет выражения катализатора, подобные age
тем, которые я могу успешно использовать локально с помощью spark-sql
:
bin/spark-sql --packages com.github.yaooqinn:itachi_2.12:0.1.0 --conf spark.sql.extensions=org.apache.spark.sql.extra.PostgreSQLExtensions
spark-sql> select age(timestamp '2000', timestamp'1990');
10 years
У меня возникли проблемы с тем, чтобы заставить это работать в среде Databricks.
Я запустил кластер сообщества Databricks с spark.sql.extensions=org.apache.spark.sql.extra.PostgreSQLExtensions
установленным параметром конфигурации.
Затем я присоединил библиотеку.
array_append
Функция, определенная в итачи, недоступна, как я и ожидал:
Подтвердите правильность настройки параметра конфигурации:
У spark-алхимии есть еще один подход, который работает в среде Databricks. Нужно ли нам возиться с внутренними устройствами Spark, чтобы это работало в среде Databricks? Или есть способ начать injectFunction
работать в базах данных?
Ответ №1:
Это spark.sql.extensions
прекрасно работает с полными базами данных (пока это не слишком глубоко войдет во внутренние части Spark — иногда возникают несовместимости), но не в Community Edition. Проблема в том, что spark.sql.extensions
они вызываются во время инициализации сеанса, а библиотека, указанная в пользовательском интерфейсе, устанавливается впоследствии, поэтому это происходит после/параллельно с инициализацией. На полных базах данных, которые работают с использованием сценария инициализации для установки библиотеки перед запуском кластера, но эта функция недоступна в Community Edition.
Обходным путем было бы явное регистрация функций, как это:
%scala
import org.apache.spark.sql.catalyst.expressions.postgresql.{Age, ArrayAppend, ArrayLength, IntervalJustifyLike, Scale, SplitPart, StringToArray, UnNest}
import org.apache.spark.sql.extra.FunctionAliases
spark.sessionState.functionRegistry.registerFunction(Age.fd._1, Age.fd._2, Age.fd._3)
spark.sessionState.functionRegistry.registerFunction(FunctionAliases.array_cat._1, FunctionAliases.array_cat._2, FunctionAliases.array_cat._3)
spark.sessionState.functionRegistry.registerFunction(ArrayAppend.fd._1, ArrayAppend.fd._2, ArrayAppend.fd._3)
spark.sessionState.functionRegistry.registerFunction(ArrayLength.fd._1, ArrayLength.fd._2, ArrayLength.fd._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyDays._1, IntervalJustifyLike.justifyDays._2, IntervalJustifyLike.justifyDays._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyHours._1, IntervalJustifyLike.justifyHours._2, IntervalJustifyLike.justifyHours._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyInterval._1, IntervalJustifyLike.justifyInterval._2, IntervalJustifyLike.justifyInterval._3)
spark.sessionState.functionRegistry.registerFunction(Scale.fd._1, Scale.fd._2, Scale.fd._3)
spark.sessionState.functionRegistry.registerFunction(SplitPart.fd._1, SplitPart.fd._2, SplitPart.fd._3)
spark.sessionState.functionRegistry.registerFunction(StringToArray.fd._1, StringToArray.fd._2, StringToArray.fd._3)
spark.sessionState.functionRegistry.registerFunction(UnNest.fd._1, UnNest.fd._2, UnNest.fd._3)
После этого это работает:
Это не так удобно, как расширения, но это ограничение CE.
Комментарии:
1. Итак, чтобы заставить это работать с полными базами данных, мне нужно установить библиотеку с помощью сценария инициализации (а не устанавливать библиотеку после запуска кластера)? Не могли бы вы предоставить фрагмент кода? Спасибо!
2. это просто сценарий оболочки, который либо копирует файл из DBFS (as
/dbfs/some_location
), либо загружает файл из Интернета и помещает его в/databricks/jars
. Смотрите примеры здесь: docs.databricks.com/clusters/…