#scala #apache-spark
#scala #apache-spark
Вопрос:
Я новичок в Scala, и у меня есть несколько вопросов о том, как это работает. Я хочу сделать следующее: учитывая список значений, я хочу параллельно создать некоторую имитацию словаря, что-то вроде этого : (1,2,3,4) -> ((1,1), (2,2), (3,3), (4,4) )
. Я знаю, что если мы имеем дело с распараллеленными коллекциями, мы должны использовать аккумуляторы. Итак, вот моя попытка:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable.ListBuffer
class DictAccumulatorV2 extends AccumulatorV2[Int, ListBuffer[(Int, Int)]] {
private var dict:ListBuffer[(Int, Int)]= new ListBuffer[(Int, Int)]
def reset(): Unit = {
dict.clear()
}
def add(v: Int): Unit = {
dict.append((v, v))
}
def value():ListBuffer[(Int, Int)] = {
return dict
}
def isZero(): Boolean = {
return dict.isEmpty
}
def copy() : AccumulatorV2[Int, ListBuffer[(Int, Int)]] = {
// I do not understand how to code it correctly
return new DictAccumulatorV2
}
def merge(other:AccumulatorV2[Int, ListBuffer[(Int, Int)]]): Unit = {
// I do not understand how to code it correctly without reinitializing dict from val to var
dict = dict other.value
}
}
object FirstSparkApplication {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("MyFirstApp").setMaster("local")
val sc = new SparkContext(conf)
val accum = new DictAccumulatorV2()
sc.register(accum, "mydictacc")
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
var res = distData.map(x => accum.add(x))
res.count()
println(accum)
}
}
Поэтому мне интересно, правильно ли я это делаю или есть какие-либо ошибки.
В общем, у меня тоже есть вопросы о том, как sc.parallelize
это работает. Действительно ли он распараллеливает задание на моей машине или это просто вымышленная строка кода? Что я должен поместить вместо "local"
in setMaster
? Как я могу увидеть, на каких узлах выполняется задача? Выполняется ли задача на всех узлах одновременно или существует какая-то последовательность?
Ответ №1:
(1,2,3,4) -> ((1,1), (2,2), (3,3), (4,4) )
Вы можете сделать это в Scala, выполнив
val list = List(1,2,3,4)
val dict = list.map(i => (i,i))
Искровые аккумуляторы используются в качестве средства связи между исполнителем Spark и драйвером.
Если вы хотите выполнить вышеуказанное параллельно, то вы должны создать RDD из этого списка и применить к нему преобразование карты, как показано выше.
В оболочке spark это будет выглядеть так
val list = List(1,2,3,4)
val listRDD = sc.parallelize(list)
val dictRDD = listRDD.map(i => (i,i))
как работает sc.parallelize
Он создает распределенный набор данных (RDD в терминах spark), используя коллекцию, которую вы передаете функции. Дополнительная информация.
Это действительно распараллеливает вашу работу. Если вы отправляете свое задание spark в кластер, вы должны иметь возможность видеть идентификатор приложения YARN или URL-адрес после выполнения команды spark-submit.Вы можете посетить URL-адрес приложения YARN и посмотреть, сколько исполнителей обрабатывают этот распределенный набор данных и в какой последовательности они выполняются.
Что я должен поместить вместо «local» в setMaster
Из документации Spark — основной URL для подключения, например, «local» для локального запуска с одним потоком, «local [4]» для локального запуска с 4 ядрами или «spark: // master: 7077» для запуска в автономном кластере Spark.