#python #apache-spark #pyspark
#python #apache-spark #pyspark
Вопрос:
У меня есть сложная функция, которую я запускаю над набором данных в spark с помощью функции map . Он находится в другом модуле python. При вызове map узлы-исполнители не имеют этого кода, а затем функция map завершается с ошибкой.
s_cobDates = getCobDates() #returns a list of dates
sb_dataset = sc.broadcast(dataset) #fyi - it is not trivial to slice this into chunks per date
def sparkInnerLoop(n_cobDate):
n_dataset = sb_dataset.value
import someOtherModule
return someOtherModule.myComplicatedCalc(n_dataset)
results = s_cobDates.map(sparkInnerLoop).collect()
Затем происходит сбой Spark, поскольку он не может импортировать myOtherModule.
До сих пор я обходил это, создавая пакет python, содержащий someOtherModule, и развертывая его в кластере перед моими заданиями spark, но это не способствует быстрому прототипированию.
Как я могу заставить spark отправлять полный код узлам-исполнителям, не вставляя весь код в «sparkInnerLoop»? Этот код используется в другом месте моего решения, и я не хочу дублирования кода.
Я использую кластер из восьми узлов в автономном режиме, версия 1.6.2, и драйвер запущен на моей рабочей станции в pycharm.
Ответ №1:
Ну, приведенный выше ответ работает, он падает, если ваши модули являются частью пакета. Вместо этого можно заархивировать ваши модули, а затем добавить zip-файл в контекст spark, и тогда у них будет правильное имя пакета.
def ziplib():
libpath = os.path.dirname(__file__) # this should point to your packages directory
zippath = r'c:Tempmylib-' randstr.randstr(6) '.zip'
zippath = os.path.abspath(zippath)
zf = zipfile.PyZipFile(zippath, mode='w')
try:
zf.debug = 3 # making it verbose, good for debugging
zf.writepy(libpath)
return zippath # return path to generated zip archive
finally:
zf.close()
sc = SparkContext(conf=conf)
zip_path = ziplib() # generate zip archive containing your lib
zip_path = pathlib.Path(zip_path).as_uri()
sc.addPyFile(zip_path) # add the entire archive to SparkContext