Сочетание дельта-ввода-вывода и чтения excel

#apache-spark #pyspark #delta-lake

Вопрос:

При использовании com.crealytics:spark-excel_2.12:0.14.0 без дельты:

 spark = SparkSession.builder.appName("Word Count")
.config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.14.0")
.getOrCreate()

df = spark.read.format("com.crealytics.spark.excel")
.option("header", "true")
.load(path2)
 

Это работает, и я прекрасно могу читать файлы Excel. Но создание сеанса с помощью configure_spark_with_delta_pip:

 builder = SparkSession.builder.appName("transaction")
.config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.14.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
 

Выдает мне следующую ошибку:

Py4JJavaError: Произошла ошибка при вызове o139.load. : java.lang.Исключение ClassNotFoundException: Не удалось найти источник данных: com.crealytics.spark.excel. Пожалуйста, найдите пакеты по адресу http://spark.apache.org/third-party-projects.html в орг.apache.spark.sql.выполнение.источники данных.Источник данных$.Посмотрите на источник данных(DataSource.scala:692) в org.apache.spark.sql.выполнение.источники данных.Источник данных$.lookupDataSourceV2(источник данных.в Scala:746) в орг.Апачи.Искра.язык SQL.DataFrameReader.нагрузка(DataFrameReader.в Scala:265) в орг.Апачи.Искра.язык SQL.DataFrameReader.нагрузка(DataFrameReader.в Scala:239) в Java.база с JDK.внутренние.отражения.NativeMethodAccessorImpl.invoke0(родной метод) на языке Java.база с JDK.внутренние.отражения.NativeMethodAccessorImpl.командлет Invoke(NativeMethodAccessorImpl.Ява:62) на языке Java.база/jdk.внутренний.отражение.Делегирование methodaccessorimpl.invoke(делегирование methodaccessorimpl.java:43) в java.base/java.lang.reflect.Метод.вызов(Метод.java:566) в py4j.отражение.Методинвокер.вызов(методинвокер.java:244) в py4j.отражение.ReflectionEngine.вызов(ReflectionEngine.java:357) в py4j.Шлюз.вызов(Gateway.java:282) в py4j.командах.AbstractCommand.Вызов метода(AbstractCommand.java:132) в py4j.команды.Вызов команды.выполнить(CallCommand.java:79) в py4j.GatewayConnection.run(GatewayConnection.java:238) в java.base/java.lang.Thread.run(Thread.java:829) Вызвано: java.lang.Исключение ClassNotFoundException: com.crealytics.spark.excel.Источник по умолчанию в java.base/java.net.URLClassLoader.FindClass(URLClassLoader.java:471) в java.base/java.lang.Загрузчик классов.Класс загрузки(загрузчик классов.java:589) в java.base/java.lang.Загрузчик классов. Класс загрузки(загрузчик классов.java:522) в org.apache.spark.sql.выполнение.источники данных.Источник данных$.$anonfun$lookupDataSource$5(источник данных. scala:666) в scala.util.Попробуйте$.применить(Попробуйте. scala:213) в org.apache.spark.sql.выполнение.источники данных.Источник данных$.$anonfun$lookupDataSource$4(источник данных.scala:666) в scala.util.Неудача.Орельсе(попробуйте.scala:224) в орг.apache.spark.sql.выполнение.источники данных.Источник данных$.lookupDataSource(источник данных. scala:666) … еще 14

Почему? И как мне этого избежать?

Ответ №1:

Вы получаете эту ошибку, потому configure_spark_with_delta_pip что перезаписывает/заменяет ваше свойство spark.jars.packages конфигурации соответствующим пакетом delta lake, который необходимо импортировать. Таким образом, ваш пакет com.crealytics:spark-excel_2.12:0.14.0 может быть недоступен/импортирован. Смотрите фрагмент исходного кода здесь

     scala_version = "2.12"
    maven_artifact = f"io.delta:delta-core_{scala_version}:{delta_version}"

    return spark_session_builder.config("spark.jars.packages", maven_artifact) 
 

К сожалению, в настоящее время Builder это не позволяет нам извлекать существующие свойства конфигурации или SparkConf объект для динамической настройки этих свойств перед вызовом getOrCreate для создания или запуска сеанса.

Подход 1

Чтобы решить эту проблему, вы можете самостоятельно получить соответствующий пакет delta, аналогично тому, как configure_spark_with_delta_pip это делается, например.

 
import importlib_metadata
delta_version = importlib_metadata.version("delta_spark")
scala_version = "2.12"
delta_package = f"io.delta:delta-core_{scala_version}:{delta_version}"

builder = SparkSession.builder.appName("transaction")
.config("spark.jars.packages", f"com.crealytics:spark-excel_2.12:0.14.0,{delta_package}")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()


 

Подход 2

Чтобы решить эту проблему, вы можете создать сеанс spark после применения пакета delta с configure_spark_with_delta_pip . После этого вы можете запустить повторную инициализацию сеанса spark с обновленными свойствами конфигурации.

напр..

 builder = SparkSession.builder.appName("transaction")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

builder = SparkSession.builder.appName("transaction")
.config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.14.0")

spark = builder.getOrCreate()
 

Поскольку оба сеанса spark имеют одинаковую appName getOrCreate конфигурацию, будет получен существующий сеанс spark, но также будет применена новая конфигурация. Это поведение задокументировано здесь как

В случае возврата существующего сеанса SparkSession параметры конфигурации, указанные в этом построителе, будут применены к существующему сеансу SparkSession.

 >>> s2 = SparkSession.builder.config("k2", "v2").getOrCreate()
>>> s1.conf.get("k1") == s2.conf.get("k1") 
True
>>> s1.conf.get("k2") == s2.conf.get("k2") 
True
 

Дайте мне знать, если это сработает для вас.