Тупиковая ситуация при попытке закодировать пул рабочих методов

#go #worker-pool

#Вперед #рабочий пул

Вопрос:

В приведенном ниже коде я не понимаю, почему «рабочие» методы, похоже, завершают работу вместо того, чтобы извлекать значения из входного канала «in» и обрабатывать их.

Я предполагал, что они вернутся только после использования всех входных данных из входного канала «in» и их обработки

 package main

import (
    "fmt"
    "sync"
)

type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)

type Result struct {
    i   int
    val int
}

func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
    for item := range in {
        item *= item // returns the square of the input value
        fmt.Printf("=> %d: %dn", id, item)
        out <- Result{item, id}
    }
    wg.Done()
    fmt.Printf("%d exiting ", id)
}

func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
    wg := sync.WaitGroup{}
    for id := 0; id < n_workers; id   {
        fmt.Printf("Starting : %dn", id)
        wg.Add(1)
        go Worker(in, out, id, amp;wg)
    }
    wg.Wait()  // wait for all workers to complete their tasks
    close(out) // close the output channel when all tasks are completed
}

const (
    NW = 4
)

func main() {
    in := make(chan int)
    out := make(chan Result)

    go func() {
        for i := 0; i < 100; i   {
            in <- i
        }
        close(in)
    }()
    Run_parallel(NW, in, out, Worker)

    for item := range out {
        fmt.Printf("From out : %d: %d", item.i, item.val)
    }
}


 

Вывод

 Starting : 0
Starting : 1
Starting : 2
Starting : 3
=> 3: 0
=> 0: 1
=> 1: 4
=> 2: 9
fatal error: all goroutines are asleep - deadlock!
 

Комментарии:

1. Рабочие не могут вернуться до in тех пор, пока он не будет закрыт. Как вы думаете, почему они возвращаются раньше?

2. Это в основном тот же вопрос

3. Извините, я не понимаю. Вы говорите, что «они, похоже, завершаются», но они не завершаются. Что конкретно происходит, чего вы не ожидаете?

4. Возможно, я ошибаюсь в этом, и проблема может быть в другом: я добавил вывод после кода в ответ на ваш комментарий. Смотрите Вывод.

5. Вывод ошибки покажет, где заблокированы все программы. Я предполагаю, что вы заблокированы Run_parallel , потому что вы не потребляете from out до Run_parallel возвращения after.

Ответ №1:

неустранимая ошибка: все программы находятся в спящем режиме — взаимоблокировка!

Полная ошибка показывает, где «застряла» каждая подпрограмма. Если вы запустите это на игровой площадке, он даже покажет вам номер строки. Это облегчило мне диагностику.

Ваши Run_parallel запуски в main программе, поэтому, прежде main чем вы сможете прочитать out , Run_parallel должны вернуться. Прежде Run_parallel чем может вернуться, он должен wg.Wait() . Но перед вызовом workers wg.Done() они должны записать в out . Это то, что вызывает тупик.

Одно из решений простое: просто запустить Run_parallel одновременно в своей собственной подпрограмме.

     go Run_parallel(NW, in, out, Worker)
 

Теперь main диапазоны превышены out , ожидая out закрытия s, чтобы сигнализировать о завершении. Run_parallel ожидает, что рабочие будут работать wg.Wait() , и рабочие будут находиться в диапазоне in . Вся работа будет выполнена, и программа не завершится, пока все не будет сделано. (https://go.dev/play/p/oMrgH2U09tQ )

Ответ №2:

Решение :

Run_parallel должен выполняться в своей собственной программе:

 package main

import (
    "fmt"
    "sync"
)

type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)

type Result struct {
    id  int
    val int
}

func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
    defer wg.Done()
    for item := range in {
        item *= 2 // returns the double of the input value (Bogus handling of data)
        out <- Result{id, item}
    }
}

func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
    wg := sync.WaitGroup{}
    for id := 0; id < n_workers; id   {
        wg.Add(1)
        go Worker(in, out, id, amp;wg)
    }
    wg.Wait()  // wait for all workers to complete their tasks
    close(out) // close the output channel when all tasks are completed
}

const (
    NW = 8
)

func main() {

    in := make(chan int)
    out := make(chan Result)

    go func() {
        for i := 0; i < 10; i   {
            in <- i
        }
        close(in)
    }()

    go Run_parallel(NW, in, out, Worker)

    for item := range out {
        fmt.Printf("From out [%d]: %dn", item.id, item.val)
    }

    println("- - - All done - - -")

}

 

Ответ №3:

Альтернативная формулировка решения:

В этой альтернативной формулировке нет необходимости запускать Run_parallel как goroutine (он запускает свою собственную goroutine). Я предпочитаю это второе решение, потому что оно автоматизирует тот факт, что Run_parallel() должен выполняться параллельно основной функции. Кроме того, по той же причине это безопаснее, менее подвержено ошибкам (не нужно помнить о запуске Run_parallel с ключевым словом go).

 package main

import (
    "fmt"
    "sync"
)

type ParallelCallback func(chan int, chan Result, int, *sync.WaitGroup)

type Result struct {
    id  int
    val int
}

func Worker(in chan int, out chan Result, id int, wg *sync.WaitGroup) {
    defer wg.Done()
    for item := range in {
        item *= 2 // returns the double of the input value (Bogus handling of data)
        out <- Result{id, item}
    }
}

func Run_parallel(n_workers int, in chan int, out chan Result, Worker ParallelCallback) {
    go func() {
        wg := sync.WaitGroup{}
        defer close(out) // close the output channel when all tasks are completed
        for id := 0; id < n_workers; id   {
            wg.Add(1)
            go Worker(in, out, id, amp;wg)
        }
        wg.Wait() // wait for all workers to complete their tasks *and* trigger the -differed- close(out)
    }()
}

const (
    NW = 8
)

func main() {

    in := make(chan int)
    out := make(chan Result)

    go func() {
        defer close(in)
        for i := 0; i < 10; i   {
            in <- i
        }
    }()

    Run_parallel(NW, in, out, Worker)

    for item := range out {
        fmt.Printf("From out [%d]: %dn", item.id, item.val)
    }

    println("- - - All done - - -")
}