Как создать список (ключ, значение) из распараллеленного списка в scala spark?

#scala #apache-spark

#scala #apache-spark

Вопрос:

Я новичок в Scala, и у меня есть несколько вопросов о том, как это работает. Я хочу сделать следующее: учитывая список значений, я хочу параллельно создать некоторую имитацию словаря, что-то вроде этого : (1,2,3,4) -> ((1,1), (2,2), (3,3), (4,4) ) . Я знаю, что если мы имеем дело с распараллеленными коллекциями, мы должны использовать аккумуляторы. Итак, вот моя попытка:

 import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable.ListBuffer

class DictAccumulatorV2 extends AccumulatorV2[Int, ListBuffer[(Int, Int)]] {
  private var dict:ListBuffer[(Int, Int)]= new ListBuffer[(Int, Int)]

  def reset(): Unit = {
    dict.clear()
  }

  def add(v: Int): Unit = {
    dict.append((v, v))
  }
  def value():ListBuffer[(Int, Int)] = {
    return dict
  }
  def isZero(): Boolean = {
    return dict.isEmpty
  }
  def copy() : AccumulatorV2[Int, ListBuffer[(Int, Int)]] = {
    // I do not understand how to code it correctly
    return new DictAccumulatorV2
  }
  def merge(other:AccumulatorV2[Int, ListBuffer[(Int, Int)]]): Unit = {
    // I do not understand how to code it correctly without reinitializing dict from val to var
    dict = dict    other.value
  }
}
object FirstSparkApplication {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("MyFirstApp").setMaster("local")
    val sc = new SparkContext(conf)

    val accum = new DictAccumulatorV2()
    sc.register(accum, "mydictacc")
    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data)
    var res = distData.map(x => accum.add(x))
    res.count()
    println(accum)
  }
}
 

Поэтому мне интересно, правильно ли я это делаю или есть какие-либо ошибки.

В общем, у меня тоже есть вопросы о том, как sc.parallelize это работает. Действительно ли он распараллеливает задание на моей машине или это просто вымышленная строка кода? Что я должен поместить вместо "local" in setMaster ? Как я могу увидеть, на каких узлах выполняется задача? Выполняется ли задача на всех узлах одновременно или существует какая-то последовательность?

Ответ №1:

(1,2,3,4) -> ((1,1), (2,2), (3,3), (4,4) )

Вы можете сделать это в Scala, выполнив

 val list = List(1,2,3,4)
val dict = list.map(i => (i,i))
 

Искровые аккумуляторы используются в качестве средства связи между исполнителем Spark и драйвером.

Если вы хотите выполнить вышеуказанное параллельно, то вы должны создать RDD из этого списка и применить к нему преобразование карты, как показано выше.

В оболочке spark это будет выглядеть так

 val list = List(1,2,3,4)
val listRDD = sc.parallelize(list)
val dictRDD = listRDD.map(i => (i,i))
 

как работает sc.parallelize
Он создает распределенный набор данных (RDD в терминах spark), используя коллекцию, которую вы передаете функции. Дополнительная информация.

Это действительно распараллеливает вашу работу. Если вы отправляете свое задание spark в кластер, вы должны иметь возможность видеть идентификатор приложения YARN или URL-адрес после выполнения команды spark-submit.Вы можете посетить URL-адрес приложения YARN и посмотреть, сколько исполнителей обрабатывают этот распределенный набор данных и в какой последовательности они выполняются.

Что я должен поместить вместо «local» в setMaster

Из документации Spark — основной URL для подключения, например, «local» для локального запуска с одним потоком, «local [4]» для локального запуска с 4 ядрами или «spark: // master: 7077» для запуска в автономном кластере Spark.