Запуск дочерних сопрограмм параллельно с отменой

#kotlin #kotlin-coroutines #coroutinescope

Вопрос:

Допустим, у меня есть следующий код:

 interface Process {
    suspend fun run()
}

class ParallelProcess(initialChildren: List<Process>): Process {
    
    val children = mutableListOf(initialChildren)
    
    override suspend fun run() {
        if(!isActive()) throw IllegalStateException()
        
        // Run all the children here and wait for them
        // including the one that may have been added 
        // while this is running.
        
        markInactive()
    }
    
    fun addChild(child: Process) {
        // Add the child and start running it.
    }
    
    fun cancel() {
        // Cancel all the children and this process as well.
    }
}
 

Таким образом, идея заключается в том, что класс ParalleProcess будет запускать все дочерние процессы параллельно
, и пока он все еще работает, можно добавить больше дочерних процессов.

Дочерние процессы независимы и не влияют друг на друга, поэтому, если один из них выходит из строя, другие и родительские процессы не затрагиваются. Я думаю, что здесь SuprevisorJob может подойти для использования, но я не уверен, как использовать в этом случае.

Отмена родительского процесса также должна привести к отмене всех дочерних процессов.

Каков наилучший способ достижения вышеуказанной функциональности.

Комментарии:

1. Вы можете использовать параллельный список, но вы можете столкнуться с состоянием гонки между вызовом addChild и cancel . Это может привести к тому, что задание будет добавлено, но никогда не будет запущено или отменено.

Ответ №1:

 class DefaultProcess(private val delay: Long) : Process {
    override suspend fun run() {
        delay(delay)
        log("DefaultProcess($delay) done.")
    }
}

private fun CoroutineScope.runProcesses(parallelProcess: ParallelProcess) = launch {
    parallelProcess.run()
}

private fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

class ParallelProcess(private val initialChildren: List<Process>) : Process {
    private var jobs = mutableListOf<Job>()
    private val children = MutableSharedFlow<Process>(
        extraBufferCapacity = 1,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )

    override suspend fun run() {
        val time = measureTimeMillis {
            coroutineScope {
                initialChildren.forEach { child ->
                    jobs.add(launch { child.run() })
                }

                val flowJob = children
                    .onEach { child ->
                        log("added: DefaultProcess")
                        jobs.add(launch { child.run() })
                    }
                    .launchIn(this)

                while (jobs.isActive) delay(10) // Keep checking if jobs are done
                flowJob.cancel() // When done cancel the flowJob so that "run" can finally return
            }
        }
        log("Total running time: $time ms.")
    }

    fun addChild(child: Process): Boolean {
        return if (jobs.isActive) children.tryEmit(child) else false
    }

    fun cancel() {
        jobs.forEach { job -> job.cancel() }
    }

    private val List<Job>.isActive: Boolean
        get() = any { job -> job.isActive }
}
 

Пример (с помощью runBlocking и job.join() для блокировки основного потока до его завершения, чтобы можно было видеть вывод, не блокируйте основной поток с помощью runBlocking в производстве!!!):

 val processes = listOf(
        DefaultProcess(100),
        DefaultProcess(50),
        DefaultProcess(300),
        DefaultProcess(500)
    )

runBlocking {
    val scope = CoroutineScope(SupervisorJob()   Dispatchers.IO)
    val parallelProcess = ParallelProcess(processes)
    val job = scope.runProcesses(parallelProcess)

    /*// Comment out to check that adding new jobs works
    delay(300)

    // It will be added, but immediately dropped, because we're adding a new value down below, before this
    // one is processed, and we have a strategy onBufferOverflow = BufferOverflow.DROP_OLDEST
    var added = parallelProcess.addChild(DefaultProcess(300))
    log("added1: $added")

    // This one will be added
    added = parallelProcess.addChild(DefaultProcess(400))
    log("added2: $added")

    delay(600) // run already done, child won't be added
    added = parallelProcess.addChild(DefaultProcess(200))
    log("added3: $added")*/

    /*// Comment out to check that cancel also works
    // Delayed for 350, so the first three jobs are done, but if want to start adding without delaying, you need
    to delay for at least 1ms, to give some time to jobs to turn its state into active, otherwise isActive will
    return false and child will not be added.
    delay(350)
    val added = parallelProcess.addChild(DefaultProcess(4000))
    log("added: $added")
    delay(2000)
    parallelProcess.cancel() // Compare times with and without cancelling*/
    job.join()
}
 

Вы не сможете добавлять дочерние элементы одновременно (будет добавлен только последний дочерний элемент), если вы хотите не потерять какие-либо значения, сделайте addChild приостановку и вместо tryEmit использования emit и создания newChildren = MutableSharedFlow() без каких-либо параметров (значения по умолчанию). Это по-прежнему будет добавлять дочерние элементы последовательно, но приостановится, если будет добавлен новый дочерний элемент, до того, как последний будет добавлен в задания. Для параллелизма вам придется использовать каналы, где addChild будут создаваться значения и внутри run у вас будет несколько (в зависимости от количества одновременных потребителей, которых вы хотите иметь) сопрограмм, потребляющих эти значения. Но опять же, поскольку все эти потребители отключат jobs звук , вам придется использовать что-то для его синхронизации.

Ответ №2:

Вы могли бы использовать что-то вроде этого:

 class ParallelProcess(initialChildren: List<Process>): Process {
    
    val children = initialChildren.toMutableList()
    val parentScope = CoroutineScope(SupervisorJob())
    
    override suspend fun run(){ 
        parentScope.launch {
            if(!isActive) throw IllegalStateException()

            // Run all the children here and wait for them
            // including the one that may have been added 
            // while this is running.
            children.map { launch { it.run() } }.joinAll()
            //  markInactive()
        }
    }
    
    fun addChild(child: Process) {
        // Add the child and start running it.
        children  = child
        // either wait for all children to finish or call cancel()
        // Or you can make sure to run only processes that are not started
        parentScope.launch {
            run()
        }
    }
    
    fun cancel() {
        // Cancel all the children and this process as well.
        parentScope.cancel()
    }
}
 

Хотя добавление дочернего элемента создает еще один вопрос о параллелизме, который вы должны решить, основываясь на желаемом поведении. Я попытался добавить несколько комментариев, чтобы обсудить некоторые возможности

Комментарии:

1. Спасибо за ответ. Я хочу, чтобы run() отстранение не возвращалось до тех пор, пока все дети не закончат. В вашей версии run() will вернется немедленно.

2. Также addChild() не следует ждать и возвращаться сразу после добавления и запуска ребенка. run() Необходимо дождаться этого недавно добавленного ребенка.