Как я могу заставить Spark видеть код в другом модуле?

#python #apache-spark #pyspark

#python #apache-spark #pyspark

Вопрос:

У меня есть сложная функция, которую я запускаю над набором данных в spark с помощью функции map . Он находится в другом модуле python. При вызове map узлы-исполнители не имеют этого кода, а затем функция map завершается с ошибкой.

 s_cobDates = getCobDates() #returns a list of dates
sb_dataset = sc.broadcast(dataset) #fyi - it is not trivial to slice this into chunks per date

def sparkInnerLoop(n_cobDate):
   n_dataset = sb_dataset.value
   import someOtherModule
   return someOtherModule.myComplicatedCalc(n_dataset)

results = s_cobDates.map(sparkInnerLoop).collect()
  

Затем происходит сбой Spark, поскольку он не может импортировать myOtherModule.

До сих пор я обходил это, создавая пакет python, содержащий someOtherModule, и развертывая его в кластере перед моими заданиями spark, но это не способствует быстрому прототипированию.

Как я могу заставить spark отправлять полный код узлам-исполнителям, не вставляя весь код в «sparkInnerLoop»? Этот код используется в другом месте моего решения, и я не хочу дублирования кода.

Я использую кластер из восьми узлов в автономном режиме, версия 1.6.2, и драйвер запущен на моей рабочей станции в pycharm.

Ответ №1:

Ну, приведенный выше ответ работает, он падает, если ваши модули являются частью пакета. Вместо этого можно заархивировать ваши модули, а затем добавить zip-файл в контекст spark, и тогда у них будет правильное имя пакета.

 def ziplib():
    libpath = os.path.dirname(__file__)  # this should point to your packages directory
    zippath = r'c:Tempmylib-'   randstr.randstr(6)   '.zip'
    zippath = os.path.abspath(zippath)
    zf = zipfile.PyZipFile(zippath, mode='w')
    try:
        zf.debug = 3  # making it verbose, good for debugging
        zf.writepy(libpath)
        return zippath  # return path to generated zip archive
    finally:
        zf.close()

sc = SparkContext(conf=conf)

zip_path = ziplib()  # generate zip archive containing your lib
zip_path = pathlib.Path(zip_path).as_uri()
sc.addPyFile(zip_path)  # add the entire archive to SparkContext