Как переписать приведенный ниже код, чтобы я получил ожидаемый результат

#scala #apache-spark

#scala #apache-spark

Вопрос:

Цель состоит в том, чтобы прочитать список известных файлов из Amazon s3 и создать один файл в s3 по некоторому пути вывода. Каждый файл разделен табуляцией. Я должен извлечь первый элемент из каждой строки и присвоить ему числовое значение в порядке возрастания. Числовое значение и элемент должны быть разделены табуляцией в новом файле, который будет создан. Я использую spark с scala для выполнения операций с RDDS.

Ожидаемый результат

1 qwerty
2 asdf


67892341 ghjk

Текущий результат

1 qwerty
2 asdf


456721 tyui
1 sdfg
2 swerr


263523 gkfk


512346 ghjk

Итак, в основном, поскольку вычисления происходят в распределенном кластере, глобальная переменная counter инициируется на каждой машине. Как я могу переписать код, чтобы получить желаемый результат. Ниже приведен фрагмент кода.

     def getReqCol() = {
     val myRDD = sc.textFile("s3://mybucket/fileFormatregex")
     var counter = 0
     val mbLuidCol = myRDD.map(x => x.split("t")).map(col =>col(0)).map(row => {
       def inc(acc : Int) = {
         counter = acc   1
       }
     inc(counter)
     counter   "t"   row
   })
   row.repartition(1).saveAsTextFile("s3://mybucket/outputPath")
 }
 

Комментарии:

1. Вы пытаетесь проиндексировать каждую строку в таблице?

Ответ №1:

Похоже, все, что вам нужно, это RDD.zipWithIndex() :

 val myRDD = 
  sc
   .textFile("s3://mybucket/fileFormatregex")
   .map(col => col(0))
   .zipWithIndex()
   .map(_.swap)
   .sortByKey(true)
   .repartition(1)
   .saveAsTextFile("s3://mybucket/outputPath")
 

Комментарии:

1. zipWithIndex().map(строка => строка.swap).sortByKey(true).map(строка => строка._1 » t» строка._2). Я смог получить требуемый результат после добавления приведенного выше фрагмента кода. Спасибо за помощь.

2. @zizouraj Потрясающе. Рад, что это помогло.