Загрузка файлов из хранилища Google с помощью Spark (Python) и Dataproc

#python #apache-spark #google-cloud-storage #google-cloud-dataproc

#python #apache-spark #google-облачное хранилище #google-cloud-dataproc

Вопрос:

У меня есть приложение, которое распараллеливает выполнение объектов Python, которые обрабатывают данные, которые будут загружены из хранилища Google (моя корзина проекта). Кластер создается с использованием Google Dataproc. Проблема в том, что данные никогда не загружаются! Я написал тестовую программу, чтобы попытаться понять проблему. Я написал следующую функцию для копирования файлов из корзины и проверки, работает ли создание файлов на workers:

 from subprocess import call
from os.path import join

def copyDataFromBucket(filename,remoteFolder,localFolder):
  call(["gsutil","-m","cp",join(remoteFolder,filename),localFolder]

def execTouch(filename,localFolder):
  call(["touch",join(localFolder,"touched_" filename)])
  

Я протестировал эту функцию, вызвав ее из оболочки Python, и она работает. Но когда я запускаю следующий код с помощью spark-submit, файлы не загружаются (но ошибка не возникает):

 # ...
filesRDD = sc.parallelize(fileList)
filesRDD.foreach(lambda myFile: copyDataFromBucket(myFile,remoteBucketFolder,'/tmp/output')
filesRDD.foreach(lambda myFile: execTouch(myFile,'/tmp/output')
# ...
  

Функция execTouch работает (я вижу файлы на каждом рабочем компьютере), но функция copyDataFromBucket ничего не делает.

Итак, что я делаю не так?

Комментарии:

1. Одна точность: я использую пакет Anaconda2 для запуска своего приложения, но мне пришлось установить переменную CLOUDSDK_PYTHON в /usr/bin/python для работы gsutil

2. если вы должны были работать gsutil -m cp ... с bash или в вашей оболочке, работает ли это в настоящее время?

3. Да, это работает нормально, как на главном, так и на каждом из рабочих.

Ответ №1:

Проблема явно заключалась в контексте Spark. Замена вызова «gsutil» вызовом «hadoop fs» решает проблему:

 from subprocess import call
from os.path import join

def copyDataFromBucket(filename,remoteFolder,localFolder):
  call(["hadoop","fs","-copyToLocal",join(remoteFolder,filename),localFolder]
  

Я также провел тест для отправки данных в корзину. Нужно только заменить «-copyToLocal» на «-copyFromLocal»

Комментарии:

1. Игра меняется. Спасибо!