Как дождаться, пока буферизованный канал (семафор) опустеет?

#go #semaphore #channel #goroutine

#Вперед #семафор #канал #goroutine

Вопрос:

У меня есть фрагмент целых чисел, которые обрабатываются одновременно:

 ints := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
  

Я использую буферизованный канал в качестве семафора, чтобы иметь верхнюю границу одновременно выполняемых процедур go:

 sem := make(chan struct{}, 2)

for _, i := range ints {
  // acquire semaphore
  sem <- struct{}{}

  // start long running go routine
  go func(id int, sem chan struct{}) {
    // do something

    // release semaphore
    <- sem
  }(i, sem)
}
  

Приведенный выше код работает довольно хорошо, пока не будут достигнуты последние или последние два целых числа, потому что программа завершается до завершения этих последних процедур go.

Вопрос: как мне дождаться, пока буферизованный канал разрядится?

Комментарии:

1. Вы должны использовать мьютекс или что-то в этом роде. Буферизованный канал блокируется, когда он заполнен, но нет языковой функции, которую можно заблокировать, пока он не опустеет.

Ответ №1:

Вы не можете использовать семафор (в данном случае канал) таким образом. Нет никакой гарантии, что он не будет пустым в любой момент, пока вы обрабатываете значения и отправляете больше подпрограмм. В данном случае это не проблема, поскольку вы выполняете работу синхронно, но поскольку нет способа проверить длину канала без гонки, нет примитива для ожидания, пока длина канала не достигнет 0.

Используйте a sync.WaitGroup , чтобы дождаться завершения всех процедур

 sem := make(chan struct{}, 2)

var wg sync.WaitGroup

for _, i := range ints {
    wg.Add(1)
    // acquire semaphore
    sem <- struct{}{}
    // start long running go routine
    go func(id int) {
        defer wg.Done()
        // do something
        // release semaphore
        <-sem
    }(i)
}

wg.Wait()
  

Комментарии:

1. Спасибо, я тоже думал об использовании WaitGroup . Похоже, это правильный путь!

Ответ №2:

Используйте «рабочий пул» для обработки ваших данных. Это проще, чем запускать goroutine для каждого int, выделять память для переменных внутри него и так далее…

 ints := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

ch := make(chan int)

var wg sync.WaitGroup

// run worker pool
for i := 2; i > 0; i-- {
    wg.Add(1)

    go func() {
        defer wg.Done()

        for id := range ch {
            // do something
            fmt.Println(id)
        }
    }()
}

// send ints to workers
for _, i := range ints {
    ch <- i
}

close(ch)

wg.Wait()
  

Ответ №3:

Очевидно, что никто не ждет завершения ваших подпрограмм go. Таким образом, программа завершается до завершения последних 2 процедур перехода. Вы можете использовать рабочую группу, чтобы дождаться завершения всех ваших процедур go до завершения программы. Это говорит об этом лучше — https://nathanleclaire.com/blog/2014/02/15/how-to-wait-for-all-goroutines-to-finish-executing-before-continuing/

Комментарии:

1. Спасибо, я искал, как этого избежать, и a WorkGroup работает отлично. Ответы Джимба прояснили это

Ответ №4:

Вы можете подождать свои «подпрограммы» с текущей подпрограммой в цикле for

 semLimit := 2
sem := make(chan struct{}, semLimit)

for _, i := range ints {
  // acquire semaphore
  sem <- struct{}{}

  // start long running go routine
  go func(id int, sem chan struct{}) {
    // do something

    // release semaphore
    <- sem
  }(i, sem)
}

// wait semaphore
for i := 0; i < semLimit; i   { 
  wg<-struct{}{} 
}

  

Дополнительно также возможно запрограммировать минималистичную «семафорированную группу ожидания» с экономией import sync

 semLimit := 2
// mini semaphored waitgroup 
wg := make(chan struct{}, semLimit)
// mini methods
wgAdd := func(){ wg<-struct{}{} }
wgDone := func(){ <-wg }
wgWait := func(){ for i := 0; i < semLimit; i   { wgAdd() } }

for _, i := range ints {
  // acquire semaphore
  wgAdd()

  // start long running go routine
  go func(id int, sem chan struct{}) {
    // do something

    // release semaphore
    wgDone()
  }(i, sem)
}

// wait semaphore
wgWait()
  

Ответ №5:

Вот рабочий пример. for Цикл в конце заставляет программу ждать, пока задания не будут выполнены:

 package main
import "time"

func w(n int, e chan error) {
   // do something
   println(n)
   time.Sleep(time.Second)
   // release semaphore
   <-e
}

func main() {
   a := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
   e := make(chan error, 2)
   for _, n := range a {
      // acquire semaphore
      e <- nil
      // start long running go routine
      go w(n, e)
   }
   for n := cap(e); n > 0; n-- {
      e <- nil
   }
}