#scala #apache-spark
#scala #apache-spark
Вопрос:
Цель состоит в том, чтобы прочитать список известных файлов из Amazon s3 и создать один файл в s3 по некоторому пути вывода. Каждый файл разделен табуляцией. Я должен извлечь первый элемент из каждой строки и присвоить ему числовое значение в порядке возрастания. Числовое значение и элемент должны быть разделены табуляцией в новом файле, который будет создан. Я использую spark с scala для выполнения операций с RDDS.
Ожидаемый результат
1 qwerty
2 asdf
…
…
67892341 ghjk
Текущий результат
1 qwerty
2 asdf
…
…
456721 tyui
1 sdfg
2 swerr
…
…
263523 gkfk
…
…
512346 ghjk
Итак, в основном, поскольку вычисления происходят в распределенном кластере, глобальная переменная counter
инициируется на каждой машине. Как я могу переписать код, чтобы получить желаемый результат. Ниже приведен фрагмент кода.
def getReqCol() = {
val myRDD = sc.textFile("s3://mybucket/fileFormatregex")
var counter = 0
val mbLuidCol = myRDD.map(x => x.split("t")).map(col =>col(0)).map(row => {
def inc(acc : Int) = {
counter = acc 1
}
inc(counter)
counter "t" row
})
row.repartition(1).saveAsTextFile("s3://mybucket/outputPath")
}
Комментарии:
1. Вы пытаетесь проиндексировать каждую строку в таблице?
Ответ №1:
Похоже, все, что вам нужно, это RDD.zipWithIndex()
:
val myRDD =
sc
.textFile("s3://mybucket/fileFormatregex")
.map(col => col(0))
.zipWithIndex()
.map(_.swap)
.sortByKey(true)
.repartition(1)
.saveAsTextFile("s3://mybucket/outputPath")
Комментарии:
1. zipWithIndex().map(строка => строка.swap).sortByKey(true).map(строка => строка._1 » t» строка._2). Я смог получить требуемый результат после добавления приведенного выше фрагмента кода. Спасибо за помощь.
2. @zizouraj Потрясающе. Рад, что это помогло.