#apache-spark #pyspark #delta-lake
Вопрос:
При использовании com.crealytics:spark-excel_2.12:0.14.0 без дельты:
spark = SparkSession.builder.appName("Word Count")
.config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.14.0")
.getOrCreate()
df = spark.read.format("com.crealytics.spark.excel")
.option("header", "true")
.load(path2)
Это работает, и я прекрасно могу читать файлы Excel. Но создание сеанса с помощью configure_spark_with_delta_pip:
builder = SparkSession.builder.appName("transaction")
.config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.14.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
Выдает мне следующую ошибку:
Py4JJavaError: Произошла ошибка при вызове o139.load. : java.lang.Исключение ClassNotFoundException: Не удалось найти источник данных: com.crealytics.spark.excel. Пожалуйста, найдите пакеты по адресу http://spark.apache.org/third-party-projects.html в орг.apache.spark.sql.выполнение.источники данных.Источник данных$.Посмотрите на источник данных(DataSource.scala:692) в org.apache.spark.sql.выполнение.источники данных.Источник данных$.lookupDataSourceV2(источник данных.в Scala:746) в орг.Апачи.Искра.язык SQL.DataFrameReader.нагрузка(DataFrameReader.в Scala:265) в орг.Апачи.Искра.язык SQL.DataFrameReader.нагрузка(DataFrameReader.в Scala:239) в Java.база с JDK.внутренние.отражения.NativeMethodAccessorImpl.invoke0(родной метод) на языке Java.база с JDK.внутренние.отражения.NativeMethodAccessorImpl.командлет Invoke(NativeMethodAccessorImpl.Ява:62) на языке Java.база/jdk.внутренний.отражение.Делегирование methodaccessorimpl.invoke(делегирование methodaccessorimpl.java:43) в java.base/java.lang.reflect.Метод.вызов(Метод.java:566) в py4j.отражение.Методинвокер.вызов(методинвокер.java:244) в py4j.отражение.ReflectionEngine.вызов(ReflectionEngine.java:357) в py4j.Шлюз.вызов(Gateway.java:282) в py4j.командах.AbstractCommand.Вызов метода(AbstractCommand.java:132) в py4j.команды.Вызов команды.выполнить(CallCommand.java:79) в py4j.GatewayConnection.run(GatewayConnection.java:238) в java.base/java.lang.Thread.run(Thread.java:829) Вызвано: java.lang.Исключение ClassNotFoundException: com.crealytics.spark.excel.Источник по умолчанию в java.base/java.net.URLClassLoader.FindClass(URLClassLoader.java:471) в java.base/java.lang.Загрузчик классов.Класс загрузки(загрузчик классов.java:589) в java.base/java.lang.Загрузчик классов. Класс загрузки(загрузчик классов.java:522) в org.apache.spark.sql.выполнение.источники данных.Источник данных$.$anonfun$lookupDataSource$5(источник данных. scala:666) в scala.util.Попробуйте$.применить(Попробуйте. scala:213) в org.apache.spark.sql.выполнение.источники данных.Источник данных$.$anonfun$lookupDataSource$4(источник данных.scala:666) в scala.util.Неудача.Орельсе(попробуйте.scala:224) в орг.apache.spark.sql.выполнение.источники данных.Источник данных$.lookupDataSource(источник данных. scala:666) … еще 14
Почему? И как мне этого избежать?
Ответ №1:
Вы получаете эту ошибку, потому configure_spark_with_delta_pip
что перезаписывает/заменяет ваше свойство spark.jars.packages
конфигурации соответствующим пакетом delta lake, который необходимо импортировать. Таким образом, ваш пакет com.crealytics:spark-excel_2.12:0.14.0
может быть недоступен/импортирован. Смотрите фрагмент исходного кода здесь
scala_version = "2.12" maven_artifact = f"io.delta:delta-core_{scala_version}:{delta_version}" return spark_session_builder.config("spark.jars.packages", maven_artifact)
К сожалению, в настоящее время Builder
это не позволяет нам извлекать существующие свойства конфигурации или SparkConf
объект для динамической настройки этих свойств перед вызовом getOrCreate
для создания или запуска сеанса.
Подход 1
Чтобы решить эту проблему, вы можете самостоятельно получить соответствующий пакет delta, аналогично тому, как configure_spark_with_delta_pip
это делается, например.
import importlib_metadata
delta_version = importlib_metadata.version("delta_spark")
scala_version = "2.12"
delta_package = f"io.delta:delta-core_{scala_version}:{delta_version}"
builder = SparkSession.builder.appName("transaction")
.config("spark.jars.packages", f"com.crealytics:spark-excel_2.12:0.14.0,{delta_package}")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
Подход 2
Чтобы решить эту проблему, вы можете создать сеанс spark после применения пакета delta с configure_spark_with_delta_pip
. После этого вы можете запустить повторную инициализацию сеанса spark с обновленными свойствами конфигурации.
напр..
builder = SparkSession.builder.appName("transaction")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
builder = SparkSession.builder.appName("transaction")
.config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.14.0")
spark = builder.getOrCreate()
Поскольку оба сеанса spark имеют одинаковую appName
getOrCreate
конфигурацию, будет получен существующий сеанс spark, но также будет применена новая конфигурация. Это поведение задокументировано здесь как
В случае возврата существующего сеанса SparkSession параметры конфигурации, указанные в этом построителе, будут применены к существующему сеансу SparkSession.
>>> s2 = SparkSession.builder.config("k2", "v2").getOrCreate() >>> s1.conf.get("k1") == s2.conf.get("k1") True >>> s1.conf.get("k2") == s2.conf.get("k2") True
Дайте мне знать, если это сработает для вас.