#scala #parallel-collections
#scala #параллельные коллекции
Вопрос:
Я хочу использовать параллельные массивы для задачи, и прежде чем я начну с кодирования, мне было бы интересно узнать, является ли этот небольшой фрагмент потокобезопасным:
import collection.mutable._
var listBuffer = ListBuffer[String]("one","two","three","four","five","six","seven","eight","nine")
var jSyncList = java.util.Collections.synchronizedList(new java.util.ArrayList[String]())
listBuffer.par.foreach { e =>
println("processed :" e)
// using sleep here to simulate a random delay
Thread.sleep((scala.math.random * 1000).toLong)
jSyncList.add(e)
}
jSyncList.toArray.foreach(println)
Существуют ли лучшие способы обработки чего-либо с помощью параллельных коллекций и накопления результатов в другом месте?
Ответ №1:
Опубликованный вами код совершенно безопасен; Однако я не уверен в предпосылке: зачем вам накапливать результаты параллельной коллекции в непараллельной? Одним из основных преимуществ параллельных коллекций является то, что они выглядят как другие коллекции.
Я думаю, что параллельные коллекции также предоставят seq
метод переключения на последовательные. Так что вам, вероятно, следует использовать это!
Комментарии:
1. Где я должен искать, чтобы увидеть некоторые другие примеры накопления в параллельную коллекцию? Помимо использования map.
2. Ну, это трудно сказать, учитывая, что 2.9 (и, следовательно, parallel collections) еще не выпущен . Однако, как я уже сказал, параллельные коллекции практически идентичны последовательным с точки зрения интерфейса, поэтому вы могли бы использовать их таким же образом. Я думаю, важно понимать, какие из операций будут распараллелены. Например,
foldLeft
он не распараллеливается.3. Хм, но это (или что-то подобное) происходит при применении ассоциативной операции. Есть ли что-нибудь подобное в 2.9?
4. Как мы можем выяснить, какие операции не поддаются распараллеливанию?
5. @ziggystar — это неверно, если только элемент, свернутый в , не является идентификатором для ассоциативной операции (т. Е. Что сгиб фактически является
sum
над моноидом — a la Scalaz). В противном случае, когда вы разделили коллекцию; что складывается в левую часть всех фрагментов?
Ответ №2:
Чтобы этот шаблон был безопасным:
listBuffer.par.foreach { e => f(e) }
f
должен быть способен безопасно выполняться одновременно. Я думаю, что применяются те же правила, которые вам нужны для безопасной многопоточности (доступ к общему состоянию должен быть потокобезопасным, порядок f
вызовов для разных e
не будет детерминированным, и вы можете столкнуться с взаимоблокировками, когда начнете синхронизировать свои инструкции в f
).
Кроме того, мне не ясно, какие гарантии дают вам параллельные коллекции в отношении изменения базовой коллекции во время обработки, поэтому изменяемый буфер списка, в который могут добавляться / удаляться элементы, возможно, является плохим выбором. Вы никогда не знаете, когда следующий программист вызовет что-то вроде foo(listBuffer)
перед вашим foreach
и передаст эту ссылку другому потоку, который может изменить список во время его обработки.
Кроме этого, я думаю, что для любого, f
что займет много времени, может вызываться одновременно и где e
может обрабатываться не по порядку, это прекрасный шаблон.
immutCol.par.foreach { e => threadSafeOutOfOrderProcessingOf(e) }
отказ от ответственности: я сам не пробовал // colls, но я с нетерпением жду, когда вопросы / ответы SO покажут нам, что хорошо работает.
Ответ №3:
synchronisedList
Должно быть безопасно, хотя println
может привести к неожиданным результатам — у вас нет гарантий порядка, в котором будут напечатаны элементы, или даже того, что ваши адреса печати не будут чередоваться в середине символа.
Синхронизированный список также вряд ли будет самым быстрым способом, которым вы можете это сделать, более безопасным решением является map
поверх неизменяемой коллекции ( Vector
вероятно, это ваш лучший выбор здесь), а затем распечатать все строки (по порядку) впоследствии:
val input = Vector("one","two","three","four","five","six","seven","eight","nine")
val output = input.par.map { e =>
val msg = "processed :" e
// using sleep here to simulate a random delay
Thread.sleep((math.random * 1000).toLong)
msg
}
println(output mkString "n")
Вы также заметите, что этот код имеет примерно такую же практическую полезность, как и ваш пример 🙂
Ответ №4:
Этот код просто странный — зачем добавлять материал параллельно чему-то, что нужно синхронизировать? Вы добавите конфликт и абсолютно ничего не получите взамен.
Принцип этого — накапливать результаты параллельной обработки, лучше достигается с помощью таких вещей, как fold
, reduce
или aggregate
.
Комментарии:
1. На данный момент я мог думать только о накоплении с использованием синхронизированного списка коллекций, потому что я не знаю другого способа сделать приведенный выше код потокобезопасным.
2. @Geo Дело в том, что вы не преобразуете коллекцию. Вы просто вставляете элементы параллельной коллекции в синхронизированный список. Если бы вы показали, что вы хотите сделать, тогда мы могли бы дать совет о том, как это сделать. Прямо сейчас ваш код ничего не делает.
Ответ №5:
Опубликованный вами код безопасен — ошибок из-за несогласованного состояния вашего списка массивов не будет, поскольку доступ к нему синхронизирован.
Однако параллельные коллекции обрабатывают элементы одновременно (в одно и то же время) И не по порядку. Нарушение порядка означает, что 54. элемент может быть обработан до 2. элемент — ваш список синхронизированных массивов будет содержать элементы в не предопределенном порядке.
В общем, лучше использовать map
, filter
и другие функциональные комбинаторы для преобразования коллекции в другую коллекцию — они гарантируют, что гарантии упорядочения сохраняются, если в коллекции есть некоторые (например, Seq
s do). Например:
ParArray(1, 2, 3, 4).map(_ 1)
всегда возвращает ParArray(2, 3, 4, 5)
.
Однако, если вам нужен определенный потокобезопасный тип коллекции, такой как ConcurrentSkipListMap
или синхронизированная коллекция, для передачи какому-либо методу в каком-либо API, безопасно изменять его из параллельного foreach.
Наконец, примечание — параллельная коллекция обеспечивает параллельные массовые операции с данными. Изменяемые параллельные коллекции не являются потокобезопасными в том смысле, что вы можете добавлять в них элементы из разных потоков. Изменяемые операции, такие как вставка на карту или добавление буфера, все еще должны быть синхронизированы.