#go #semaphore #channel #goroutine
#Вперед #семафор #канал #goroutine
Вопрос:
У меня есть фрагмент целых чисел, которые обрабатываются одновременно:
ints := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
Я использую буферизованный канал в качестве семафора, чтобы иметь верхнюю границу одновременно выполняемых процедур go:
sem := make(chan struct{}, 2)
for _, i := range ints {
// acquire semaphore
sem <- struct{}{}
// start long running go routine
go func(id int, sem chan struct{}) {
// do something
// release semaphore
<- sem
}(i, sem)
}
Приведенный выше код работает довольно хорошо, пока не будут достигнуты последние или последние два целых числа, потому что программа завершается до завершения этих последних процедур go.
Вопрос: как мне дождаться, пока буферизованный канал разрядится?
Комментарии:
1. Вы должны использовать мьютекс или что-то в этом роде. Буферизованный канал блокируется, когда он заполнен, но нет языковой функции, которую можно заблокировать, пока он не опустеет.
Ответ №1:
Вы не можете использовать семафор (в данном случае канал) таким образом. Нет никакой гарантии, что он не будет пустым в любой момент, пока вы обрабатываете значения и отправляете больше подпрограмм. В данном случае это не проблема, поскольку вы выполняете работу синхронно, но поскольку нет способа проверить длину канала без гонки, нет примитива для ожидания, пока длина канала не достигнет 0.
Используйте a sync.WaitGroup
, чтобы дождаться завершения всех процедур
sem := make(chan struct{}, 2)
var wg sync.WaitGroup
for _, i := range ints {
wg.Add(1)
// acquire semaphore
sem <- struct{}{}
// start long running go routine
go func(id int) {
defer wg.Done()
// do something
// release semaphore
<-sem
}(i)
}
wg.Wait()
Комментарии:
1. Спасибо, я тоже думал об использовании
WaitGroup
. Похоже, это правильный путь!
Ответ №2:
Используйте «рабочий пул» для обработки ваших данных. Это проще, чем запускать goroutine для каждого int, выделять память для переменных внутри него и так далее…
ints := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
ch := make(chan int)
var wg sync.WaitGroup
// run worker pool
for i := 2; i > 0; i-- {
wg.Add(1)
go func() {
defer wg.Done()
for id := range ch {
// do something
fmt.Println(id)
}
}()
}
// send ints to workers
for _, i := range ints {
ch <- i
}
close(ch)
wg.Wait()
Ответ №3:
Очевидно, что никто не ждет завершения ваших подпрограмм go. Таким образом, программа завершается до завершения последних 2 процедур перехода. Вы можете использовать рабочую группу, чтобы дождаться завершения всех ваших процедур go до завершения программы. Это говорит об этом лучше — https://nathanleclaire.com/blog/2014/02/15/how-to-wait-for-all-goroutines-to-finish-executing-before-continuing/
Комментарии:
1. Спасибо, я искал, как этого избежать, и a
WorkGroup
работает отлично. Ответы Джимба прояснили это
Ответ №4:
Вы можете подождать свои «подпрограммы» с текущей подпрограммой в цикле for
semLimit := 2
sem := make(chan struct{}, semLimit)
for _, i := range ints {
// acquire semaphore
sem <- struct{}{}
// start long running go routine
go func(id int, sem chan struct{}) {
// do something
// release semaphore
<- sem
}(i, sem)
}
// wait semaphore
for i := 0; i < semLimit; i {
wg<-struct{}{}
}
Дополнительно также возможно запрограммировать минималистичную «семафорированную группу ожидания» с экономией import sync
semLimit := 2
// mini semaphored waitgroup
wg := make(chan struct{}, semLimit)
// mini methods
wgAdd := func(){ wg<-struct{}{} }
wgDone := func(){ <-wg }
wgWait := func(){ for i := 0; i < semLimit; i { wgAdd() } }
for _, i := range ints {
// acquire semaphore
wgAdd()
// start long running go routine
go func(id int, sem chan struct{}) {
// do something
// release semaphore
wgDone()
}(i, sem)
}
// wait semaphore
wgWait()
Ответ №5:
Вот рабочий пример. for
Цикл в конце заставляет программу ждать, пока задания не будут выполнены:
package main
import "time"
func w(n int, e chan error) {
// do something
println(n)
time.Sleep(time.Second)
// release semaphore
<-e
}
func main() {
a := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
e := make(chan error, 2)
for _, n := range a {
// acquire semaphore
e <- nil
// start long running go routine
go w(n, e)
}
for n := cap(e); n > 0; n-- {
e <- nil
}
}