#scala #apache-spark #spark-streaming #spark-dataframe
#scala #apache-spark #spark-streaming #apache-spark-sql
Вопрос:
У меня есть массив A
размером 200. A [i] = 1,000,000,000 означает, что мне нужно записать в файл (ы) 1 миллиард записей значения i
. Например, A = [2, 3, 1, …], выходные файлы должны быть такими
0
0
1
1
1
2
2
...
Учитывая такой массив A, как я могу выводить в файлы ( part-r-00000
, part-r-00001
part-r-00002
и т.д.) С помощью Spark. Я использую Spark 2.0.1 со Scala.
Спасибо!
Комментарии:
1. Что вы хотите записать в каждый файл? Является ли part-r-00000 для [0] заполненным 0s, part-r-00001 для [1] заполнен 1s, и так далее, вплоть до part-r-00199 для [199] заполнен 199s?
2. Нет, я хочу позволить Spark обрабатывать автоматическое разделение (новый файл), когда первый файл больше не может хранить возрасты.
3. Мне любопытно узнать о причине расширения данных при записи на диск. Зачем вам нужно
[2,3,1] = (0,0,1,1,1,2)
вместо[2,3,1] = [(0,2), (1,3),(2,1)]
??4. @LuongBaLinh Я думаю, ты идешь не в том направлении. На этом шаге будут разнесены тома данных без добавления какого-либо значения. Я бы пересмотрел подход.
5. Смотрите также: mywiki. wooledge.org/XyProblem
Ответ №1:
Я бы, вероятно, подошел к этому с помощью автоматического метода saveAsTextFile(), который по умолчанию выполняет то, что вы хотите, разбивая на разные файлы, по одному файлу на RDD.
Максимальный размер файлов зависит от используемой файловой системы, поэтому, хотя и не на 100%, я сомневаюсь, что существует автоматический способ сделать это.
Основываясь на коде из этого примера, я бы вычислил NUM_PARTITIONS перед вызовом .repartition() на основе количества записей и того, что вы знаете о файловой системе, если вы можете получить эту информацию из системных вызовов, или вы хотите использовать некоторые значения по умолчанию.
Комментарии:
1. Проблема в том, что у меня есть только массив A, а не входной файл.
2. Я ссылался на этот пример для бита вывода (из RDDS в files), игнорируйте часть inputFile; должно быть просто сопоставить массив A с RDDS.
3. Учитывая = [2, 3, 1, …], как я могу сопоставить с (0, 0, 1, 1, 1, 2, 2, …)? Выходная карта будет слишком большой, чтобы ее можно было сохранить в памяти.
4. Это не проблема, ваш распределенный кластер spark позаботится об этом, это прозрачно для вашего кода. Когда RDDS не помещаются в память (это объединенная оперативная память вашего кластера), они будут разделены на диск или даже пересчитаны (вы можете это настроить).
5. Извините, все еще не понимаю. Не могли бы вы написать несколько строк кода, чтобы «сопоставить массив A с RDDS». Спасибо.