#scala #apache-spark
#scala #apache-spark
Вопрос:
Я пытаюсь улучшить свой код Spark:
var lst = disOneRDDM.filter(x=> x._2._1 == 1).keys.collect
val disTwoRDDM = disOneRDDM.map(x=> {
var b:Boolean = false
breakable{
for (str <- x._2._2)
if (lst.contains(str))
{b = true
break}
}
if (b)
(x._1,(Math.min(2,x._2._1),x._2._2))
else
x
}).cache
У меня есть RDD вида (String,(Int,List[Строка])). Каждый элемент в List[String]
имеет свою собственную запись в этом RDD, где он служит ключом. Ниже показан пример ввода (это disOneRDDM
в моем коде):
("abc",(10,List("hij","efg","klm")))
("efg",(1,List("jhg","Beethan","abc","ert")))
("Beethan",(0,List("efg","vcx","zse")))
("vcx",(1,List("czx","Beethan","abc")))
("zse",(1,List("efg","Beethan","nbh")))
("hij",(10,List("vcx","klm","zse")))
("jhg",(10,List("ghb","cdz","awq","swq")))
...
Мое намерение состоит в том, чтобы найти в каждом из List[String]
элементов, который имеет Int
значение 1, и изменить его собственное Int
на min(2,current_Int_value)
. Например, во входном коде запись "abc"
имеет список, который содержит "efg"
в качестве элемента, имеющего Int
значение 1, а также запись, "hij"
имеющую "vcx"
. Итак, я бы ожидал вывода формы:
("abc",(2,List("hij","efg","klm")))
("efg",(1,List("jhg","Beethan","abc","ert")))
("Beethan",(0,List("efg","vcx","zse")))
("vcx",(1,List("czx","Beethan","abc")))
("zse",(1,List("efg","Beethan","nbh")))
("hij",(2,List("vcx","klm","zse")))
("jhg",(10,List("ghb","cdz","awq","swq")))
...
Размер RDD огромен, и то, как я это делаю, работает, но очень медленно. В приведенном выше коде я пытаюсь отфильтровать RDD, который имеет Int
значение 1, и сформировать список lst
, собирая их. Затем для поиска элементов, имеющих Int
значение 2, я перебираю записи списка элементов и проверяю, lst
содержит ли список эту запись. Если это произойдет, я выхожу из цикла и присваиваю соответствующее Int
значение.
Есть ли более быстрый способ сделать это, например, без необходимости собирать огромный список RDD в?
Комментарии:
1. lst — это список, который поступает из RDD, поэтому я предполагаю, что он несколько значительный. Что вы делаете, так это повторяете его для каждой отдельной строки в disOneRDDM. Также он не транслируется, поэтому для каждого отдельного раздела на карте список сериализуется и распространяется. Я бы преобразовал lst в Set и транслировал его.
Ответ №1:
Как прокомментировал @a-spoty-spot, если уникальных значений lst
не слишком много, ваш лучший подход — изменить его на Set
(который удаляет дубликаты) и использовать broadcast.
В противном случае (если этот список уникальных ключей все еще может быть огромным) — вот решение, которое вообще не используется collect
, что означает, что оно может обрабатывать любой размер. Однако, поскольку это увеличивает размер RDD с помощью flatMap
и выполняет join
(что влечет за собой перетасовку), я не уверен, что это будет намного быстрее, это зависит от специфики ваших данных и вашего кластера.
// create the lookup "map" (the int values are actually irrelevant, we just need the keys)
val lookup: RDD[(String, Int)] = disOneRDDM.cache().filter(_._2._1 == 1).map(t => (t._1, 1))
val result = disOneRDDM
.flatMap { // break up each record into individual records for join
case (k, (i, list)) => list.map(s => (s, (k, i)))
}
.leftOuterJoin(lookup).map { // left join with lookup and change int values if we found a match
case (item, ((k, i), Some(_))) => (k, (Math.min(2, i), item))
case (item, ((k, i), _)) => (k, (i, item))
}
.groupByKey().map { // group by key to merge back to lists, while mapping to the desired structure
case (k, iter) =>
val l = iter.toList
(k, (l.map(_._1).min, l.map(_._2)))
}
result.foreach(println)
// (Beethan,(0,List(zse, efg, vcx)))
// (jhg,(10,List(cdz, swq, ghb, awq)))
// (hij,(2,List(klm, zse, vcx)))
// (zse,(1,List(Beethan, nbh, efg)))
// (efg,(1,List(Beethan, jhg, abc, ert)))
// (vcx,(1,List(Beethan, czx, abc)))
// (abc,(2,List(klm, hij, efg)))
Комментарии:
1. добавлен другой ответ с использованием Dataframes, который я нахожу немного более чистым, но оба допустимы.
2. Просто небольшой вопрос, поможет ли трансляция, если я использую один узел для своих вычислений?
3. Да, я так думаю — когда не используется широковещательная передача, данные сериализуются и отправляются для каждой задачи , поэтому вы все равно можете платить за более высокие накладные расходы, даже если используете один узел.
Ответ №2:
Если вы готовы использовать Dataframes API вместо RDDS — вот еще один вариант, который может немного упростить код (и повысить производительность):
// UDF to check if string contained in array - will be used for the join
val arrayContains = udf { (a: mutable.WrappedArray[String], s: String) => a.contains(s) }
// create Dataframe from RDD and create the filtered lookupDF
val df = disOneRDDM.map {case (k, (v, l)) => (k, v, l) }.toDF("key", "val", "list").cache()
val lookupDf = df.filter($"val" === 1).select($"key" as "match")
// join, groupBy to remove the duplicates while collecting non-null matches, and perform transformation on "val"
val resultDF = df
.join(lookupDf, arrayContains($"list", $"match"), "leftouter")
.groupBy($"key").agg(
first("val") as "val",
first("list") as "list",
first("match", ignoreNulls = true) as "match")
.selectExpr("key", "IF(match IS NULL OR val < 2, val, 2) as val", "list")
resultDF.show()
// ------- --- --------------------
// | key|val| list|
// ------- --- --------------------
// | zse| 1| [efg, Beethan, nbh]|
// | efg| 1|[jhg, Beethan, ab...|
// | hij| 2| [vcx, klm, zse]|
// |Beethan| 0| [efg, vcx, zse]|
// | vcx| 1| [czx, Beethan, abc]|
// | abc| 2| [hij, efg, klm]|
// | jhg| 10|[ghb, cdz, awq, swq]|
// ------- --- --------------------