Создание фрейма данных pyspark внутри пользовательского пакета?

#python #pyspark #package #databricks

#python #pyspark #пакет #databricks

Вопрос:

Я немного запутался в лучших практиках и в том, как создавать сеансы spark внутри пакета, где пакет можно использовать в блоках данных или локально.

Но допустим, у меня есть функция, которая принимает некоторые списки или данные, и ожидаемый результат — фрейм данных pyspark с данными. Я думаю, что одного сеанса должно быть достаточно, поэтому нужно будет проверить, существует ли сеанс, и если нет, создать его.

Но я просто не уверен, как это сделать в функции?

Это мой код для того, как он не находится внутри ноутбука jupyter, я не знаю, лучший ли это способ в любом случае … иногда я получаю сообщение об ошибке, используя этот метод.

 try:
  conf = pyspark.SparkConf().set('spark.driver.host','127.0.0.1')
  sc = pyspark.SparkContext(master='local', appName='example',conf=conf)
  sqlContext = SQLContext(sc)
  print("Binding")
except ValueError:
    print("Spark session already created")
  

И если мне нужно создать фрейм данных, здесь я использую:

 df = sqlContext.createDataFrame(test_list,schema=cSchema)
  

Я также нашел эту документацию: https://spark.apache.org/docs/latest/sql-getting-started.html
Но, похоже, у меня это не работает, я получаю, что «искра» не определена:

 from pyspark.sql import SparkSession

spark = SparkSession 
        .builder 
        .appName("Python Spark SQL basic example") 
        .config("spark.some.config.option", "some-value") 
        .getOrCreate()

....
spark.createDataFrame(test_list, schema=cSchema)
  

Ответ №1:

это определенно правильный способ:

 from pyspark.sql import SparkSession

spark = SparkSession 
     .builder 
     .appName("Python Spark SQL basic example") 
     .config("spark.some.config.option", "some-value") 
    .getOrCreate()
  

хотя стилистически я бы сделал это так для удобства чтения

 spark = (SparkSession 
         .builder 
         .appName("Python Spark SQL basic example")           
         .config("spark.some.config.option", "some-value") 
         .getOrCreate())
  

Он создаст сеанс или просто предоставит вам существующий. Это не должно быть неопределенным, есть ли у вас ошибка? Какая версия spark у вас есть?

Я использую spark локально, а также в блоках данных, и я постоянно использую этот метод

Чтобы ответить на вопрос о том, как вы должны это делать, я бы просто написал функцию в библиотеке утилит, например:

 def getSparkSession():
    return (SparkSession 
            .builder 
            .appName("Python Spark SQL basic example")           
            .config("spark.some.config.option", "some-value") 
            .getOrCreate())
  

и чем использовать его в своем коде:

 spark = getSparkSession()