#go #worker-pool
#Вперед #рабочий пул
Вопрос:
В приведенном ниже коде я не понимаю, почему «рабочие» методы, похоже, завершают работу вместо того, чтобы извлекать значения из входного канала «in» и обрабатывать их.
Я предполагал, что они вернутся только после использования всех входных данных из входного канала «in» и их обработки
package main
import (
"fmt"
"sync"
)
type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)
type Result struct {
i int
val int
}
func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
for item := range in {
item *= item // returns the square of the input value
fmt.Printf("=> %d: %dn", id, item)
out <- Result{item, id}
}
wg.Done()
fmt.Printf("%d exiting ", id)
}
func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
wg := sync.WaitGroup{}
for id := 0; id < n_workers; id {
fmt.Printf("Starting : %dn", id)
wg.Add(1)
go Worker(in, out, id, amp;wg)
}
wg.Wait() // wait for all workers to complete their tasks
close(out) // close the output channel when all tasks are completed
}
const (
NW = 4
)
func main() {
in := make(chan int)
out := make(chan Result)
go func() {
for i := 0; i < 100; i {
in <- i
}
close(in)
}()
Run_parallel(NW, in, out, Worker)
for item := range out {
fmt.Printf("From out : %d: %d", item.i, item.val)
}
}
Вывод
Starting : 0
Starting : 1
Starting : 2
Starting : 3
=> 3: 0
=> 0: 1
=> 1: 4
=> 2: 9
fatal error: all goroutines are asleep - deadlock!
Комментарии:
1. Рабочие не могут вернуться до
in
тех пор, пока он не будет закрыт. Как вы думаете, почему они возвращаются раньше?2. Это в основном тот же вопрос
3. Извините, я не понимаю. Вы говорите, что «они, похоже, завершаются», но они не завершаются. Что конкретно происходит, чего вы не ожидаете?
4. Возможно, я ошибаюсь в этом, и проблема может быть в другом: я добавил вывод после кода в ответ на ваш комментарий. Смотрите Вывод.
5. Вывод ошибки покажет, где заблокированы все программы. Я предполагаю, что вы заблокированы
Run_parallel
, потому что вы не потребляете fromout
доRun_parallel
возвращения after.
Ответ №1:
неустранимая ошибка: все программы находятся в спящем режиме — взаимоблокировка!
Полная ошибка показывает, где «застряла» каждая подпрограмма. Если вы запустите это на игровой площадке, он даже покажет вам номер строки. Это облегчило мне диагностику.
Ваши Run_parallel
запуски в main
программе, поэтому, прежде main
чем вы сможете прочитать out
, Run_parallel
должны вернуться. Прежде Run_parallel
чем может вернуться, он должен wg.Wait()
. Но перед вызовом workers wg.Done()
они должны записать в out
. Это то, что вызывает тупик.
Одно из решений простое: просто запустить Run_parallel
одновременно в своей собственной подпрограмме.
go Run_parallel(NW, in, out, Worker)
Теперь main
диапазоны превышены out
, ожидая out
закрытия s, чтобы сигнализировать о завершении. Run_parallel
ожидает, что рабочие будут работать wg.Wait()
, и рабочие будут находиться в диапазоне in
. Вся работа будет выполнена, и программа не завершится, пока все не будет сделано. (https://go.dev/play/p/oMrgH2U09tQ )
Ответ №2:
Решение :
Run_parallel должен выполняться в своей собственной программе:
package main
import (
"fmt"
"sync"
)
type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)
type Result struct {
id int
val int
}
func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
defer wg.Done()
for item := range in {
item *= 2 // returns the double of the input value (Bogus handling of data)
out <- Result{id, item}
}
}
func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
wg := sync.WaitGroup{}
for id := 0; id < n_workers; id {
wg.Add(1)
go Worker(in, out, id, amp;wg)
}
wg.Wait() // wait for all workers to complete their tasks
close(out) // close the output channel when all tasks are completed
}
const (
NW = 8
)
func main() {
in := make(chan int)
out := make(chan Result)
go func() {
for i := 0; i < 10; i {
in <- i
}
close(in)
}()
go Run_parallel(NW, in, out, Worker)
for item := range out {
fmt.Printf("From out [%d]: %dn", item.id, item.val)
}
println("- - - All done - - -")
}
Ответ №3:
Альтернативная формулировка решения:
В этой альтернативной формулировке нет необходимости запускать Run_parallel как goroutine (он запускает свою собственную goroutine). Я предпочитаю это второе решение, потому что оно автоматизирует тот факт, что Run_parallel() должен выполняться параллельно основной функции. Кроме того, по той же причине это безопаснее, менее подвержено ошибкам (не нужно помнить о запуске Run_parallel с ключевым словом go).
package main
import (
"fmt"
"sync"
)
type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)
type Result struct {
id int
val int
}
func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
defer wg.Done()
for item := range in {
item *= 2 // returns the double of the input value (Bogus handling of data)
out <- Result{id, item}
}
}
func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
go func() {
wg := sync.WaitGroup{}
defer close(out) // close the output channel when all tasks are completed
for id := 0; id < n_workers; id {
wg.Add(1)
go Worker(in, out, id, amp;wg)
}
wg.Wait() // wait for all workers to complete their tasks *and* trigger the -differed- close(out)
}()
}
const (
NW = 8
)
func main() {
in := make(chan int)
out := make(chan Result)
go func() {
defer close(in)
for i := 0; i < 10; i {
in <- i
}
}()
Run_parallel(NW, in, out, Worker)
for item := range out {
fmt.Printf("From out [%d]: %dn", item.id, item.val)
}
println("- - - All done - - -")
}