Является ли этот код параллельного массива scala потокобезопасным?

#scala #parallel-collections

#scala #параллельные коллекции

Вопрос:

Я хочу использовать параллельные массивы для задачи, и прежде чем я начну с кодирования, мне было бы интересно узнать, является ли этот небольшой фрагмент потокобезопасным:

 import collection.mutable._

var listBuffer = ListBuffer[String]("one","two","three","four","five","six","seven","eight","nine")
var jSyncList  = java.util.Collections.synchronizedList(new java.util.ArrayList[String]())
listBuffer.par.foreach { e =>
    println("processed :" e)
    // using sleep here to simulate a random delay
    Thread.sleep((scala.math.random * 1000).toLong)
    jSyncList.add(e)
}
jSyncList.toArray.foreach(println)
  

Существуют ли лучшие способы обработки чего-либо с помощью параллельных коллекций и накопления результатов в другом месте?

Ответ №1:

Опубликованный вами код совершенно безопасен; Однако я не уверен в предпосылке: зачем вам накапливать результаты параллельной коллекции в непараллельной? Одним из основных преимуществ параллельных коллекций является то, что они выглядят как другие коллекции.

Я думаю, что параллельные коллекции также предоставят seq метод переключения на последовательные. Так что вам, вероятно, следует использовать это!

Комментарии:

1. Где я должен искать, чтобы увидеть некоторые другие примеры накопления в параллельную коллекцию? Помимо использования map.

2. Ну, это трудно сказать, учитывая, что 2.9 (и, следовательно, parallel collections) еще не выпущен . Однако, как я уже сказал, параллельные коллекции практически идентичны последовательным с точки зрения интерфейса, поэтому вы могли бы использовать их таким же образом. Я думаю, важно понимать, какие из операций будут распараллелены. Например, foldLeft он не распараллеливается.

3. Хм, но это (или что-то подобное) происходит при применении ассоциативной операции. Есть ли что-нибудь подобное в 2.9?

4. Как мы можем выяснить, какие операции не поддаются распараллеливанию?

5. @ziggystar — это неверно, если только элемент, свернутый в , не является идентификатором для ассоциативной операции (т. Е. Что сгиб фактически является sum над моноидом — a la Scalaz). В противном случае, когда вы разделили коллекцию; что складывается в левую часть всех фрагментов?

Ответ №2:

Чтобы этот шаблон был безопасным:

 listBuffer.par.foreach { e => f(e) }
  

f должен быть способен безопасно выполняться одновременно. Я думаю, что применяются те же правила, которые вам нужны для безопасной многопоточности (доступ к общему состоянию должен быть потокобезопасным, порядок f вызовов для разных e не будет детерминированным, и вы можете столкнуться с взаимоблокировками, когда начнете синхронизировать свои инструкции в f ).

Кроме того, мне не ясно, какие гарантии дают вам параллельные коллекции в отношении изменения базовой коллекции во время обработки, поэтому изменяемый буфер списка, в который могут добавляться / удаляться элементы, возможно, является плохим выбором. Вы никогда не знаете, когда следующий программист вызовет что-то вроде foo(listBuffer) перед вашим foreach и передаст эту ссылку другому потоку, который может изменить список во время его обработки.

Кроме этого, я думаю, что для любого, f что займет много времени, может вызываться одновременно и где e может обрабатываться не по порядку, это прекрасный шаблон.

 immutCol.par.foreach { e => threadSafeOutOfOrderProcessingOf(e) }
  

отказ от ответственности: я сам не пробовал // colls, но я с нетерпением жду, когда вопросы / ответы SO покажут нам, что хорошо работает.

Ответ №3:

synchronisedList Должно быть безопасно, хотя println может привести к неожиданным результатам — у вас нет гарантий порядка, в котором будут напечатаны элементы, или даже того, что ваши адреса печати не будут чередоваться в середине символа.

Синхронизированный список также вряд ли будет самым быстрым способом, которым вы можете это сделать, более безопасным решением является map поверх неизменяемой коллекции ( Vector вероятно, это ваш лучший выбор здесь), а затем распечатать все строки (по порядку) впоследствии:

 val input = Vector("one","two","three","four","five","six","seven","eight","nine")
val output  = input.par.map { e =>
  val msg = "processed :"   e
  // using sleep here to simulate a random delay
  Thread.sleep((math.random * 1000).toLong)
  msg
}
println(output mkString "n")
  

Вы также заметите, что этот код имеет примерно такую же практическую полезность, как и ваш пример 🙂

Ответ №4:

Этот код просто странный — зачем добавлять материал параллельно чему-то, что нужно синхронизировать? Вы добавите конфликт и абсолютно ничего не получите взамен.

Принцип этого — накапливать результаты параллельной обработки, лучше достигается с помощью таких вещей, как fold , reduce или aggregate .

Комментарии:

1. На данный момент я мог думать только о накоплении с использованием синхронизированного списка коллекций, потому что я не знаю другого способа сделать приведенный выше код потокобезопасным.

2. @Geo Дело в том, что вы не преобразуете коллекцию. Вы просто вставляете элементы параллельной коллекции в синхронизированный список. Если бы вы показали, что вы хотите сделать, тогда мы могли бы дать совет о том, как это сделать. Прямо сейчас ваш код ничего не делает.

Ответ №5:

Опубликованный вами код безопасен — ошибок из-за несогласованного состояния вашего списка массивов не будет, поскольку доступ к нему синхронизирован.

Однако параллельные коллекции обрабатывают элементы одновременно (в одно и то же время) И не по порядку. Нарушение порядка означает, что 54. элемент может быть обработан до 2. элемент — ваш список синхронизированных массивов будет содержать элементы в не предопределенном порядке.

В общем, лучше использовать map , filter и другие функциональные комбинаторы для преобразования коллекции в другую коллекцию — они гарантируют, что гарантии упорядочения сохраняются, если в коллекции есть некоторые (например, Seq s do). Например:

 ParArray(1, 2, 3, 4).map(_   1)
  

всегда возвращает ParArray(2, 3, 4, 5) .

Однако, если вам нужен определенный потокобезопасный тип коллекции, такой как ConcurrentSkipListMap или синхронизированная коллекция, для передачи какому-либо методу в каком-либо API, безопасно изменять его из параллельного foreach.

Наконец, примечание — параллельная коллекция обеспечивает параллельные массовые операции с данными. Изменяемые параллельные коллекции не являются потокобезопасными в том смысле, что вы можете добавлять в них элементы из разных потоков. Изменяемые операции, такие как вставка на карту или добавление буфера, все еще должны быть синхронизированы.