#scala #apache-spark #hadoop #hdfs
#scala #apache-искра #hadoop #hdfs
Вопрос:
Попытка обработать каждый файл JSON из каталога HDFS и записать в каталог HDFS. Здесь я использую Scala parallel collection par. Я пытаюсь использовать приведенный ниже код для чтения файла JSON.
Код:
val lines2 =Source.fromFile("allats_tmp.txt").getLines.toList.filter(x => x.size > 1)
val ooo = lines2.par.foreach(x => {
var path = path of hdfs directory
val readJSON = sc.wholeTextFiles(path)
val dataFrame = spark.read.json(readJSON.toSeq.toDS)
dataFrame.coalesce(1).write.mode("append").json("appendedjsontestpar")
})
Я получаю сообщение об ошибке ниже.
Caused by: org.apache.hadoop.ipc.RemoteException(
org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
No lease on /user/ine12363287/appendedjsontestpar/_SUCCESS (inode 2013174239):
File does not exist.
[Lease. Holder: DFSClient_NONMAPREDUCE_-1536968823_1, pendingcreates: 2]
Комментарии:
1. значение lines2 =Source.fromFile(«allats_tmp.txt «).getLines. ToList.filter(x => x.size > 1) val ooo = lines2.par.foreach(x => {var path = путь к каталогу hdfs val readJSON = sc.wholeTextFiles(путь) val DataFrame = spark.read.json(readJSON.toSeq.toDS) DataFrame.coalesce(1).write.mode(«добавить»).json(«appendedjsontestpar») })
Ответ №1:
Вы можете просто прочитать несколько json
файлов, используя json()
функцию, доступную в Spark.
Случай 1: если у вас все json
файлы в одном HDFS
каталоге, просто передайте json
HDFS
путь к файлам в функцию json, как показано ниже.
Синтаксис: spark.read.json("<hdfs_path>")
Случай 2: если ваши json
файлы находятся в разных HDFS
каталогах, вы можете использовать ту же функцию json(paths:String*)
, которая также принимает переменные аргументы.
Синтаксис: spark.read.json("<hdfs_path1>","<hdfs_path2>")