Искровой массив пуст после итерации

#arrays #scala #apache-spark

#массивы #scala #apache-spark

Вопрос:

В spark 1.6.0 (я не очень знаком с spark и scala), когда я перебираю коллекцию и добавляю элементы в массив, когда итерация завершена, массив кажется пустым.

 var testing = unlabeled.map { line =>
  val parts = line.split(',')
  val text = parts(7).split(' ')
  (line, htf.transform(text))
}

var lowPropQueue = new mutable.ArrayBuffer[(String, org.apache.spark.mllib.linalg.Vector)]
var highPropQueue = new mutable.ArrayBuffer[(String, org.apache.spark.mllib.linalg.Vector)]

for(counter <- 1 to 5){

  logger.info("this is the "   counter   " run -----------------")
  for (i <- testing) {
    val label = model.predict(i._2).toString
    //        logger.info(i._1.split(",")(7))
    //        logger.info(label)
    var probs = model.predictProbabilities(i._2)
    logger.info("prob 0 : "   probs(0))
    logger.info("prob 1 : "   probs(1))
    logger.info("--------------------- ")

    if (probs(0).toDouble <= 0.95 amp;amp; probs(1).toDouble <= 0.95 ) {
      lowPropQueue. =(i)
    } else {
      highPropQueue. =((i._1   ","   label , i._2))
    }

    logger.info("size of high array : "   highPropQueue.length)
    logger.info("size of low array : "   lowPropQueue.length)

  }

  logger.info("passed: "   lowPropQueue.length)
  logger.info("NOT passed: "   highPropQueue.length)

  var xx=  sc.parallelize(highPropQueue).collect()
  var yy = sc.parallelize(lowPropQueue).collect()

  logger.info("passed: "   xx.length)
  logger.info("NOT passed: "   yy.length)
...
}
  

но на основе журналов внутренний цикл, похоже, добавляет элементы в массивы, т.Е.:

16/10/11 11:22:31 ИНФОРМАЦИЯ SelfLearningMNB $: размер большого массива: 500

16/10/11 11:22:31 ИНФОРМАЦИЯ SelfLearningMNB $: размер нижнего массива: 83

16/10/11 11:22:31 INFO SelfLearningMNB $: проблема 0: 0,37094327822665185

16/10/11 11:22:31 INFO SelfLearningMNB $: проблема 1: 0.6290567217733481

16/10/11 11:22:31 INFO SelfLearningMNB $: ———————

16/10/11 11:22:31 ИНФОРМАЦИЯ SelfLearningMNB $: размер большого массива: 500

16/10/11 11:22:31 ИНФОРМАЦИЯ SelfLearningMNB $: размер нижнего массива: 84

16/10/11 11:22:31 INFO SelfLearningMNB $: prob 0: 0.16872929936216619

16/10/11 11:22:31 INFO SelfLearningMNB $: проблема 1: 0.8312707006378338

Но когда внутренний цикл заканчивается, я получаю это:

16/10/11 11:43:53 INFO SelfLearningMNB $: передано: 0

16/10/11 11:43:53 ИНФОРМАЦИЯ SelfLearningMNB $: НЕ передано: 0

Что происходит?

Редактировать

Как вы можете получить данные от исполнителей или сохранить данные от исполнителей в HDFS, чтобы их можно было прочитать с главного узла позже?

Ответ №1:

TL; DR Это не может работать в Spark.

Что происходит?

  • каждый исполнитель получает свою собственную копию lowPropQueue , highPropQueue .
  • во время итерации локальные копии изменяются
  • после итерации локальные копии отбрасываются

К вашему сведению, наивное добавление к ArrayBuffer не является потокобезопасным.

Комментарии:

1. Я думал об этом. Но как можно сохранить данные от исполнителей в «глобальном» массиве?

2. Вы можете попробовать аккумуляторы, но вам понадобится синхронизированный доступ, и, глядя на ваш код, он просто не будет масштабироваться.

3. Я провел некоторый поиск, и этот подход определенно не подходит для Spark. Мне пришлось сопоставить все, но это сработало.