Оптимизация кода Spark

#scala #apache-spark

#scala #apache-spark

Вопрос:

Я пытаюсь улучшить свой код Spark:

 var lst = disOneRDDM.filter(x=> x._2._1 == 1).keys.collect
val disTwoRDDM = disOneRDDM.map(x=> {
                                    var b:Boolean = false
                                    breakable{
                                    for (str <- x._2._2)
                                       if (lst.contains(str))
                                            {b = true
                                            break}
                                    }
                                    if (b)
                                        (x._1,(Math.min(2,x._2._1),x._2._2))
                                    else
                                        x
                                   }).cache
  

У меня есть RDD вида (String,(Int,List[Строка])). Каждый элемент в List[String] имеет свою собственную запись в этом RDD, где он служит ключом. Ниже показан пример ввода (это disOneRDDM в моем коде):

 ("abc",(10,List("hij","efg","klm")))
("efg",(1,List("jhg","Beethan","abc","ert")))
("Beethan",(0,List("efg","vcx","zse")))
("vcx",(1,List("czx","Beethan","abc")))
("zse",(1,List("efg","Beethan","nbh")))
("hij",(10,List("vcx","klm","zse")))
("jhg",(10,List("ghb","cdz","awq","swq")))
...
  

Мое намерение состоит в том, чтобы найти в каждом из List[String] элементов, который имеет Int значение 1, и изменить его собственное Int на min(2,current_Int_value) . Например, во входном коде запись "abc" имеет список, который содержит "efg" в качестве элемента, имеющего Int значение 1, а также запись, "hij" имеющую "vcx" . Итак, я бы ожидал вывода формы:

 ("abc",(2,List("hij","efg","klm")))
("efg",(1,List("jhg","Beethan","abc","ert")))
("Beethan",(0,List("efg","vcx","zse")))
("vcx",(1,List("czx","Beethan","abc")))
("zse",(1,List("efg","Beethan","nbh")))
("hij",(2,List("vcx","klm","zse")))
("jhg",(10,List("ghb","cdz","awq","swq")))
...
  

Размер RDD огромен, и то, как я это делаю, работает, но очень медленно. В приведенном выше коде я пытаюсь отфильтровать RDD, который имеет Int значение 1, и сформировать список lst , собирая их. Затем для поиска элементов, имеющих Int значение 2, я перебираю записи списка элементов и проверяю, lst содержит ли список эту запись. Если это произойдет, я выхожу из цикла и присваиваю соответствующее Int значение.
Есть ли более быстрый способ сделать это, например, без необходимости собирать огромный список RDD в?

Комментарии:

1. lst — это список, который поступает из RDD, поэтому я предполагаю, что он несколько значительный. Что вы делаете, так это повторяете его для каждой отдельной строки в disOneRDDM. Также он не транслируется, поэтому для каждого отдельного раздела на карте список сериализуется и распространяется. Я бы преобразовал lst в Set и транслировал его.

Ответ №1:

Как прокомментировал @a-spoty-spot, если уникальных значений lst не слишком много, ваш лучший подход — изменить его на Set (который удаляет дубликаты) и использовать broadcast.

В противном случае (если этот список уникальных ключей все еще может быть огромным) — вот решение, которое вообще не используется collect , что означает, что оно может обрабатывать любой размер. Однако, поскольку это увеличивает размер RDD с помощью flatMap и выполняет join (что влечет за собой перетасовку), я не уверен, что это будет намного быстрее, это зависит от специфики ваших данных и вашего кластера.

 // create the lookup "map" (the int values are actually irrelevant, we just need the keys)
val lookup: RDD[(String, Int)] = disOneRDDM.cache().filter(_._2._1 == 1).map(t => (t._1, 1))

val result = disOneRDDM
  .flatMap { // break up each record into individual records for join
    case (k, (i, list)) => list.map(s => (s, (k, i)))
  }
  .leftOuterJoin(lookup).map { // left join with lookup and change int values if we found a match
    case (item, ((k, i), Some(_))) => (k, (Math.min(2, i), item))
    case (item, ((k, i), _)) => (k, (i, item))
  }
  .groupByKey().map { // group by key to merge back to lists, while mapping to the desired structure
    case (k, iter) =>
      val l = iter.toList
      (k, (l.map(_._1).min, l.map(_._2)))
  }

result.foreach(println)
// (Beethan,(0,List(zse, efg, vcx)))
// (jhg,(10,List(cdz, swq, ghb, awq)))
// (hij,(2,List(klm, zse, vcx)))
// (zse,(1,List(Beethan, nbh, efg)))
// (efg,(1,List(Beethan, jhg, abc, ert)))
// (vcx,(1,List(Beethan, czx, abc)))
// (abc,(2,List(klm, hij, efg)))
  

Комментарии:

1. добавлен другой ответ с использованием Dataframes, который я нахожу немного более чистым, но оба допустимы.

2. Просто небольшой вопрос, поможет ли трансляция, если я использую один узел для своих вычислений?

3. Да, я так думаю — когда не используется широковещательная передача, данные сериализуются и отправляются для каждой задачи , поэтому вы все равно можете платить за более высокие накладные расходы, даже если используете один узел.

Ответ №2:

Если вы готовы использовать Dataframes API вместо RDDS — вот еще один вариант, который может немного упростить код (и повысить производительность):

 // UDF to check if string contained in array - will be used for the join
val arrayContains = udf { (a: mutable.WrappedArray[String], s: String) => a.contains(s) }

// create Dataframe from RDD and create the filtered lookupDF
val df = disOneRDDM.map {case (k, (v, l)) => (k, v, l) }.toDF("key", "val", "list").cache()
val lookupDf = df.filter($"val" === 1).select($"key" as "match")

// join, groupBy to remove the duplicates while collecting non-null matches, and perform transformation on "val"
val resultDF = df
.join(lookupDf, arrayContains($"list", $"match"), "leftouter")
.groupBy($"key").agg(
  first("val") as "val",
  first("list") as "list",
  first("match", ignoreNulls = true) as "match")
.selectExpr("key", "IF(match IS NULL OR val < 2, val, 2) as val", "list")

resultDF.show()
//  ------- --- -------------------- 
// |    key|val|                list|
//  ------- --- -------------------- 
// |    zse|  1| [efg, Beethan, nbh]|
// |    efg|  1|[jhg, Beethan, ab...|
// |    hij|  2|     [vcx, klm, zse]|
// |Beethan|  0|     [efg, vcx, zse]|
// |    vcx|  1| [czx, Beethan, abc]|
// |    abc|  2|     [hij, efg, klm]|
// |    jhg| 10|[ghb, cdz, awq, swq]|
//  ------- --- --------------------