Не удалось найти источник данных: дельта в среде Python

#unit-testing #pyspark #databricks #delta-lake

#модульное тестирование #pyspark #databricks #дельта-озеро

Вопрос:

Следующее: https://docs .delta.io/latest/quick-start.html#python

Я установил delta-spark и запустил:

 from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") 
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = spark = configure_spark_with_delta_pip(builder).getOrCreate()
 

Однако, когда я запускаю:

 data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
 

сообщение об ошибке: дельта не распознана

и если я запускаю

 DeltaTable.isDeltaTable(spark, "packages/tests/streaming/data")
 

В нем указано: TypeError: объект ‘JavaPackage’ не вызывается

Казалось, что я могу запускать эти команды локально (например, модульные тесты) без Maven или запускать их в оболочке pyspark? Было бы хорошо просто посмотреть, не отсутствует ли у меня зависимость?

Ответ №1:

Вы можете просто установить delta-spark пакет PyPI с помощью pip install delta-spark (он также будет извлекать pyspark), а затем обратиться к нему.

Или вы можете добавить параметр конфигурации, который будет извлекать пакет Delta. Это .config("spark.jars.packages", "io.delta:delta-core_2.12:<delta-version>") . Для версий Spark 3.1 Delta — 1.0.0 (дополнительную информацию см. В документации по сопоставлению релизов).

У меня есть пример использования дельта-таблиц в модульных тестах (обратите внимание, что оператор import находится в определении функции, поскольку пакет Delta загружается динамически):

 import pyspark
import pyspark.sql
import pytest
import shutil
from pyspark.sql import SparkSession

delta_dir_name = "/tmp/delta-table"

@pytest.fixture
def delta_setup(spark_session):
    data = spark_session.range(0, 5)
    data.write.format("delta").save(delta_dir_name)
    yield data
    shutil.rmtree(delta_dir_name, ignore_errors=True)

def test_delta(spark_session, delta_setup):
    from delta.tables import DeltaTable
    deltaTable = DeltaTable.forPath(spark_session, delta_dir_name)
    hist = deltaTable.history()
    assert hist.count() == 1
 

среда инициализируется с помощью pytest-spark:

 [pytest]
filterwarnings =
  ignore::DeprecationWarning
spark_options =
  spark.sql.extensions: io.delta.sql.DeltaSparkSessionExtension
  spark.sql.catalog.spark_catalog: org.apache.spark.sql.delta.catalog.DeltaCatalog
  spark.jars.packages: io.delta:delta-core_2.12:1.0.0
  spark.sql.catalogImplementation: in-memory
 

Комментарии:

1. Спасибо, Алекс, я обнаружил, что после очистки кэша моей среды pipenv и последующей установки pyspark = 3.1.0 это сработало