#arrays #scala #apache-spark
#массивы #scala #apache-spark
Вопрос:
В spark 1.6.0 (я не очень знаком с spark и scala), когда я перебираю коллекцию и добавляю элементы в массив, когда итерация завершена, массив кажется пустым.
var testing = unlabeled.map { line =>
val parts = line.split(',')
val text = parts(7).split(' ')
(line, htf.transform(text))
}
var lowPropQueue = new mutable.ArrayBuffer[(String, org.apache.spark.mllib.linalg.Vector)]
var highPropQueue = new mutable.ArrayBuffer[(String, org.apache.spark.mllib.linalg.Vector)]
for(counter <- 1 to 5){
logger.info("this is the " counter " run -----------------")
for (i <- testing) {
val label = model.predict(i._2).toString
// logger.info(i._1.split(",")(7))
// logger.info(label)
var probs = model.predictProbabilities(i._2)
logger.info("prob 0 : " probs(0))
logger.info("prob 1 : " probs(1))
logger.info("--------------------- ")
if (probs(0).toDouble <= 0.95 amp;amp; probs(1).toDouble <= 0.95 ) {
lowPropQueue. =(i)
} else {
highPropQueue. =((i._1 "," label , i._2))
}
logger.info("size of high array : " highPropQueue.length)
logger.info("size of low array : " lowPropQueue.length)
}
logger.info("passed: " lowPropQueue.length)
logger.info("NOT passed: " highPropQueue.length)
var xx= sc.parallelize(highPropQueue).collect()
var yy = sc.parallelize(lowPropQueue).collect()
logger.info("passed: " xx.length)
logger.info("NOT passed: " yy.length)
...
}
но на основе журналов внутренний цикл, похоже, добавляет элементы в массивы, т.Е.:
16/10/11 11:22:31 ИНФОРМАЦИЯ SelfLearningMNB $: размер большого массива: 500
16/10/11 11:22:31 ИНФОРМАЦИЯ SelfLearningMNB $: размер нижнего массива: 83
16/10/11 11:22:31 INFO SelfLearningMNB $: проблема 0: 0,37094327822665185
16/10/11 11:22:31 INFO SelfLearningMNB $: проблема 1: 0.6290567217733481
16/10/11 11:22:31 INFO SelfLearningMNB $: ———————
16/10/11 11:22:31 ИНФОРМАЦИЯ SelfLearningMNB $: размер большого массива: 500
16/10/11 11:22:31 ИНФОРМАЦИЯ SelfLearningMNB $: размер нижнего массива: 84
16/10/11 11:22:31 INFO SelfLearningMNB $: prob 0: 0.16872929936216619
16/10/11 11:22:31 INFO SelfLearningMNB $: проблема 1: 0.8312707006378338
Но когда внутренний цикл заканчивается, я получаю это:
16/10/11 11:43:53 INFO SelfLearningMNB $: передано: 0
16/10/11 11:43:53 ИНФОРМАЦИЯ SelfLearningMNB $: НЕ передано: 0
Что происходит?
Редактировать
Как вы можете получить данные от исполнителей или сохранить данные от исполнителей в HDFS, чтобы их можно было прочитать с главного узла позже?
Ответ №1:
TL; DR Это не может работать в Spark.
Что происходит?
- каждый исполнитель получает свою собственную копию
lowPropQueue
,highPropQueue
. - во время итерации локальные копии изменяются
- после итерации локальные копии отбрасываются
К вашему сведению, наивное добавление к ArrayBuffer
не является потокобезопасным.
Комментарии:
1. Я думал об этом. Но как можно сохранить данные от исполнителей в «глобальном» массиве?
2. Вы можете попробовать аккумуляторы, но вам понадобится синхронизированный доступ, и, глядя на ваш код, он просто не будет масштабироваться.
3. Я провел некоторый поиск, и этот подход определенно не подходит для Spark. Мне пришлось сопоставить все, но это сработало.