#python #pyspark #hdfs
Вопрос:
Я использую 3 компьютера для выполнения pyspark
задания. 1 компьютер является ведущим узлом, а компьютер A, B является подчиненным узлом.
У меня есть такой список, и я хочу сохранить его в виде текстового файла в HDFS:
data = [['1', '2'], ['3']]
Вот как я сохраняю список в виде текстового файла в HDFS:
def save_file_hdfs(data, session, path):
"""
Use this to save the file to HDFS.
The saved file will be named "part-00000"
"""
# First need to convert the list to parallel RDD
rdd_list = session.sparkContext.parallelize(data)
# Use the map function to write one element per line and write all elements to a single file (coalesce)
rdd_list.coalesce(1).map(lambda row: str(row)).saveAsTextFile(path)
Основной код:
import pyspark
from pyspark.sql import SparkSession
output_path = "hdfs:///user/output"
spark = SparkSession.builder.appName("build_graph").getOrCreate()
save_file_hdfs(data, spark, output_path)
Выполнить команду:
spark-submit
--master yarn
--deploy-mode cluster
example.py
Затем я получил эту ошибку из stdout
файла компьютера A, компьютер B работает нормально:
Traceback (most recent call last):
File "connected.py", line 158, in <module>
save_file_hdfs(data, spark, output_path)
File "example.py", line 135, in save_file_hdfs
rdd_list.coalesce(1).map(lambda row: str(row)).saveAsTextFile(path)
File "/tmp/hadoop-tnhan/nm-local-dir/usercache/tnhan/appcache/application_1621226092496_0024/container_1621226092496_0024_02_000001/pyspark.zip/pyspark/rdd.py", line 1656, in saveAsTextFile
File "/tmp/hadoop-tnhan/nm-local-dir/usercache/tnhan/appcache/application_1621226092496_0024/container_1621226092496_0024_02_000001/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
File "/tmp/hadoop-tnhan/nm-local-dir/usercache/tnhan/appcache/application_1621226092496_0024/container_1621226092496_0024_02_000001/pyspark.zip/pyspark/sql/utils.py", line 128, in deco
File "/tmp/hadoop-tnhan/nm-local-dir/usercache/tnhan/appcache/application_1621226092496_0024/container_1621226092496_0024_02_000001/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o111.saveAsTextFile.
: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://tnhanComputer:9000/user/output already exists
Если я запущу его без настройки master yarn deploy-mode cluster
, он будет работать нормально.
Вы знаете, почему я получаю эту ошибку?
Ответ №1:
Ошибка довольно очевидна, путь уже существует FileAlreadyExistsException: Output directory hdfs://tnhanComputer:9000/user/output already exists
Комментарии:
1. Я верю, что он существует, потому что компьютер B создал его, потому что нет ошибки с компьютера B. Я думал, что два компьютера просто помогают распространять данные и, в конце концов, они будут сохранены в одном файле?
2. Нет, я думаю, что он существует, потому что вы создали его при предыдущем запуске (с клиентом режима развертывания). Попробуйте другой путь
3. Я уже удалил все папки в HDFS, прежде чем запускать его
deploy-mode cluster
. Если это правда, то почему компьютер B может запустить его без каких-либо ошибок?4. Ах, я думаю, я понимаю, что происходит: итак, вы передали один путь к своей функции, каждый исполнитель будет использовать его в качестве пути к файлу, поэтому, когда A работает, B не будет и наоборот. Возможно, вам захочется добавить несколько случайных чисел после пути к файлу, чтобы rdd сохранялся по-другому. В фрейм данных это встроено, но поскольку вы сохраняете данные пользовательским способом, вам придется сделать это вручную.
5. Вы можете просто сгенерировать несколько случайных чисел или случайным
uuid
образом добавить их в свой путь к файлу в своей функции