# #go
Вопрос:
Я хочу создать динамический пул каналов ,который может прослушивать сотни тысяч каналов ,и все они находятся под контролем, и, как я исключил, я хочу, чтобы он автоматически обновлялся, если прослушивается слишком много каналов (goroutine => reflect =>> selectn) Но во время кодирования наблюдателя канала selectN я был заблокирован заменой канала
Я хочу заменить chan во время выполнения, которое находится в цикле выбора, и я несколько раз пытался сделать его доступным, но все пошло не так хорошо.
func Test_Change(t *testing.T) {
type A struct {
ch chan interface{}
}
a := amp;A{
ch: make(chan interface{}),
}
go func() {
for {
select {
case v := <-a.ch:
fmt.Println(v)
}
}
}()
newCh := make(chan interface{})
go func() {
for i := 0; i < 200; i {
a.ch <- i
}
a.ch = newCh
}()
go func() {
for i := 1000; i < 1010; i {
newCh <- i
}
}()
for {
select {}
}}
Он заблокировал
func Test_Change(t *testing.T) {
type A struct {
ch chan interface{}
bh chan interface{}
}
a := amp;A{
ch: make(chan interface{}),
bh: make(chan interface{}),
}
notify := make(chan struct{})
go func() {
for {
select {
case v := <-a.ch:
fmt.Println(v)
case <-notify:
fmt.Println("notify")
}
}
}()
newCh := make(chan interface{})
go func() {
for i := 0; i < 200; i {
a.ch <- i
}
a.ch = newCh
notify <- struct{}{}
}()
go func() {
for i := 1000; i < 1010; i {
newCh <- i
}
}()
for {
select {}
}}
И это сработало
Комментарии:
1. Вы пишете
a.ch
в одной гороутине и читаете ее в другой без синхронизации: гонка данных, неопределенное поведение.2. Зачем «заменять» исходный чан, просто продолжая отправлять в него сообщения с разных серверов? Или просто есть два (или более) «случая»в инструкции select, которую вы читаете?
3. Я хочу создать динамический пул каналов ,который может прослушивать сотни тысяч , и все они находятся под контролем, и, как я уже говорил, я хочу, чтобы он автоматически обновлялся, если прослушивается слишком много каналов (goroutine => reflect =>> selectn).
4.
for { select {} }
почему?5. возможно, вы также захотите разделить эти каналы на отдельные подпрограммы и объединить их на лету в свой основной оператор select. это вполне выполнимо.
Ответ №1:
Вы правильно определили, что select
блок на произвольно большом динамическом количестве последовательностей невозможен с обычным синтаксисом (фиксированное количество случаев) и возможен с использованием reflect
пакета.
Однако я не уверен, что это лучший способ достичь вашей цели. Если у вас есть тысячи каналов для просмотра (например, тысячи удаленных клиентов, подключенных одновременно), вы можете использовать шаблон «разветвления», чтобы записать все на очень небольшое фиксированное количество каналов и выбрать его.
Вместо
for {
select {
case <-sigterm:
cleanup()
os.Exit(1)
case msg := <-client1:
// process msg...
case msg := <-client2:
// process msg...
// HOW CAN I DYNAMICALLY ADD AND REMOVE A CLIENT HERE?
}
}
Подумайте о чем-то вроде:
for {
select {
case <-sigterm:
cleanup()
os.Exit(1)
case msg := <-clients:
// process msg...
}
}
func addClient(client chan Message) {
// Fan-in: read all future messages from client, and write them
// to clients.
go func(){
for msg := range client {
clients <- msg
}
}()
}
Замена значения переменной канала не является потокобезопасной (может привести к гонке данных), однако совершенно нормально , если несколько каналов одновременно записывают и считывают данные с одного и того же канала clients
.
Комментарии:
1. вы уклоняетесь от самой трудной части. Синхронизируйте все остальное вокруг этого.
2. видишь play.golang.org/p/fuiiWI_V9aL может показаться, что это работает, но в 5% случаев это не так.
3. @mh-cbon смотрите мой другой ответ о фактической замене канала внутри для/выбора. Если код, который вы предоставляете, не всегда ведет себя так, как вы ожидали, лучше всего было бы открыть отдельный вопрос.
Ответ №2:
Гонка данных a.ch
-это серьезная ошибка и недостаток дизайна. Вы можете обнаружить скачки данных, выполнив тесты с go test -race
помощью или с помощью программы go run -race program.go
.
Можно заменить значение символов, используемых в цикле for/select, если это правильно сделано в тексте обращения, а не в коде другой параллельной подпрограммы.
replace := time.After(3 * time.Second)
for {
select {
case v, ok := <-ch1:
// use v...
case v, ok := <-ch2:
// use v...
case <-replace:
ch1 = anotherChannel
}
}
Этот пример выполняемого кода является пикантным (не делайте этого). Вы можете сохранить его на своей рабочей станции и попробовать с помощью детектора гонки данных.
Этот исправленный пример кода не является пикантным.
Ответ №3:
Может быть, это сработает;
проверьте это очень тщательно, это выполнимо, но трудно сделать это правильно.
Я ожидаю, что это будет не идеально. И ему не хватает обширных документов о том, что делать и чего не делать.
Это также не та версия, которая объединяет входной канал, она всегда потребляет только один входной канал за раз. Что может стать проблемой для производительности.
Единственная гарантия, которую я даю, заключается в том, что это бесплатная гонка.
Хотя я оставляю в качестве упражнения для читателя задачу написать расширенную версию.
package main
import (
"fmt"
)
func main() {
m := New()
go m.Run()
input := m.Resize(0)
input <- 5
input <- 4
close(input)
input = m.Resize(10)
input <- 6
input <- 7
close(input)
input = m.Resize(2)
input <- 8
input <- 9
close(input)
m.Close()
fmt.Println()
}
type masterOfThings struct {
notify chan notification
wantClose chan chan bool
}
func New() masterOfThings {
return masterOfThings{
notify: make(chan notification, 1),
wantClose: make(chan chan bool),
}
}
type notification struct {
N int
out chan chan interface{}
}
func (m masterOfThings) Resize(n int) chan<- interface{} {
N := notification{
N: n,
out: make(chan chan interface{}, 1),
}
m.notify <- N
return <-N.out
}
func (m masterOfThings) Close() {
closed := make(chan bool)
m.wantClose <- closed
<-closed
}
func (m masterOfThings) Run() {
var input chan interface{}
inputs := []chan interface{}{}
closers := []chan bool{}
defer func() {
for _, c := range closers {
close(c)
}
}()
var wantClose bool
for {
select {
case m := <-m.wantClose:
closers = append(closers, m)
wantClose = true
if len(inputs) < 1 amp;amp; input == nil {
return
}
case n, ok := <-input:
if !ok {
input = nil
if len(inputs) > 0 {
input = inputs[0]
copy(inputs, inputs[1:])
inputs = inputs[:len(inputs)-1]
} else if wantClose {
return
}
continue
}
fmt.Println(n)
case n := <-m.notify:
nInput := make(chan interface{}, n.N)
if input == nil {
input = nInput
} else {
inputs = append(inputs, nInput)
}
n.out <- nInput
}
}
}