#scala #apache-spark
#scala #apache-spark
Вопрос:
Мне нужно использовать Spark для обработки данных в большом наборе текстовых файлов на основе запроса другого индекса. Я могу сделать это для небольших случаев (путем преобразования RDD в массив / см. Ниже), но у меня возникают трудности с правильной настройкой для работы с большими объемами данных.
У меня есть это:
val rootPath = "..."
val ndxRDD = sc.textFile(rootPath "index/2016-09-01*")
def ndxToDoc(articleName: String): String = { sc.textFile(rootPath articleName).first(); }
// works
val artcilesArr = ndxRDD.collect().map(ndxToDoc);
val articlesRDD = sc.parallelize(articlesArr)
// does not work
// val articlesRDD = ndxRDD.map(ndxToDoc)
articlesRDD.count()
Я полагаю, проблема в том, что я пытаюсь прочитать файл внутри rdd. Как мне заставить вышеуказанное работать без промежуточного collect()
— map -> textFile()
— parallelize()
set?
Заранее спасибо!
Комментарии:
1. Просто чтобы убедиться, что я понимаю ваш вариант использования — у вас есть текстовый файл с именами миллиардов других файлов, которые вы хотите загрузить?? Также какую версию Spark вы используете?
2. У меня есть миллиарды текстовых файлов (по сути, индекс), которые имеют имена миллиардов других файлов. Я использую Spark 1.6, но при необходимости могу легко перейти на Spark 2.0.
Ответ №1:
Я думаю, что это оптимальный подход для такого рода задач. Это ваш вариант использования, который этого хочет!
Вы должны собрать его в виде списка, в противном случае вам придется создать RDD внутри RDD, что невозможно в текущей реализации Spark.
Для получения дополнительной информации, почему мы не можем создать RDD внутри RDD, смотрите здесь :
Следовательно, это оптимальный подход, хотя я могу предложить одно — вы можете использовать оперативную память для хранения больших объектов в памяти