Получаем возвращаемое значение из потока, является ли этот код Kotlin потокобезопасным?

#multithreading #kotlin

#многопоточность #kotlin

Вопрос:

Я хотел бы запустить несколько этапов, дождаться, пока все они будут завершены, и получить результаты.

Возможный способ сделать это был бы в коде ниже. Но является ли он потокобезопасным?

 import kotlin.concurrent.thread

sealed class Errorneous<R>
data class Success<R>(val result: R) : Errorneous<R>()
data class Fail<R>(val error: Exception) : Errorneous<R>()

fun <R> thread_with_result(fn: () -> R): (() -> Errorneous<R>) {
  var r: Errorneous<R>? = null
  val t = thread {
    r = try { Success(fn()) } catch (e: Exception) { Fail(e) }
  }
  return {
    t.join()
    r!!
  }
}

fun main() {
  val tasks = listOf({ 2 * 2 }, { 3 * 3 })
  val results = tasks
    .map{ thread_with_result(it) }
    .map{ it() }
  println(results)
}
  

P.S.

Есть ли в Kotlin более эффективные встроенные инструменты для этого? Например, обрабатывать 10000 задач с пулом из 10 потоков?

Это должны быть потоки, а не сопрограммы, поскольку он будет использоваться с устаревшим кодом, и я не знаю, хорошо ли он работает с сопрограммами.

Комментарии:

1. Эта функция блокируется при присоединении к своему потоку, поэтому она не будет выполняться асинхронно. Нет причин, по которым сопрограммы не должны работать с устаревшим кодом. Но вы также могли бы использовать RxJava для пулов потоков.

2. @Tenfour04 не уверен, что вы подразумеваете под асинхронностью, но они выполняются параллельно, пожалуйста, проверьте этот пример gist.github.com/alexeypetrushin /…

3. Неважно. Я пропустил, что функция возвращает функцию.

Ответ №1:

Похоже, что Java Executors делает именно это

 fun <R> execute_in_parallel(tasks: List<() -> R>, threads: Int): List<Errorneous<R>> {
  val executor = Executors.newFixedThreadPool(threads)
  val fresults = executor.invokeAll(tasks.map { task ->
    Callable<Errorneous<R>> {
      try { Success(task()) } catch (e: Exception) { Fail(e) }
    }
  })
  return fresults.map { future -> future.get() }
}
  

Комментарии:

1. Да, платформа Executors была бы первым местом, на которое я бы тоже обратил внимание; она довольно стандартна для JVM и отлично подходит для очередей заданий и тому подобного.