Как изменить канал в цикле выбора

# #go

Вопрос:

Я хочу создать динамический пул каналов ,который может прослушивать сотни тысяч каналов ,и все они находятся под контролем, и, как я исключил, я хочу, чтобы он автоматически обновлялся, если прослушивается слишком много каналов (goroutine => reflect =>> selectn) Но во время кодирования наблюдателя канала selectN я был заблокирован заменой канала

Я хочу заменить chan во время выполнения, которое находится в цикле выбора, и я несколько раз пытался сделать его доступным, но все пошло не так хорошо.

 func Test_Change(t *testing.T) {
type A struct {
    ch chan interface{}
}
a := amp;A{
    ch: make(chan interface{}),
}
go func() {
    for {
        select {
        case v := <-a.ch:
            fmt.Println(v)
        }
    }
}()

newCh := make(chan interface{})
go func() {
    for i := 0; i < 200; i   {
        a.ch <- i
    }
    a.ch = newCh
}()
go func() {
    for i := 1000; i < 1010; i   {
        newCh <- i
    }
}()
for {
    select {}
}}
 

Он заблокировал

 func Test_Change(t *testing.T) {
type A struct {
    ch chan interface{}
    bh chan interface{}
}
a := amp;A{
    ch: make(chan interface{}),
    bh: make(chan interface{}),
}
notify := make(chan struct{})
go func() {
    for {
        select {
        case v := <-a.ch:
            fmt.Println(v)
        case <-notify:
            fmt.Println("notify")
        }
    }
}()
newCh := make(chan interface{})
go func() {
    for i := 0; i < 200; i   {
        a.ch <- i
    }
    a.ch = newCh
    notify <- struct{}{}
}()
go func() {
    for i := 1000; i < 1010; i   {
        newCh <- i
    }
}()
for {
    select {}
}}
 

И это сработало

Комментарии:

1. Вы пишете a.ch в одной гороутине и читаете ее в другой без синхронизации: гонка данных, неопределенное поведение.

2. Зачем «заменять» исходный чан, просто продолжая отправлять в него сообщения с разных серверов? Или просто есть два (или более) «случая»в инструкции select, которую вы читаете?

3. Я хочу создать динамический пул каналов ,который может прослушивать сотни тысяч , и все они находятся под контролем, и, как я уже говорил, я хочу, чтобы он автоматически обновлялся, если прослушивается слишком много каналов (goroutine => reflect =>> selectn).

4. for { select {} } почему?

5. возможно, вы также захотите разделить эти каналы на отдельные подпрограммы и объединить их на лету в свой основной оператор select. это вполне выполнимо.

Ответ №1:

Вы правильно определили, что select блок на произвольно большом динамическом количестве последовательностей невозможен с обычным синтаксисом (фиксированное количество случаев) и возможен с использованием reflect пакета.

Однако я не уверен, что это лучший способ достичь вашей цели. Если у вас есть тысячи каналов для просмотра (например, тысячи удаленных клиентов, подключенных одновременно), вы можете использовать шаблон «разветвления», чтобы записать все на очень небольшое фиксированное количество каналов и выбрать его.

Вместо

     for {
        select {
        case <-sigterm:
            cleanup()
            os.Exit(1)
        case msg := <-client1:
            // process msg...
        case msg := <-client2:
            // process msg...
        // HOW CAN I DYNAMICALLY ADD AND REMOVE A CLIENT HERE?
        }
    }
 

Подумайте о чем-то вроде:

     for {
        select {
        case <-sigterm:
            cleanup()
            os.Exit(1)
        case msg := <-clients:
            // process msg...
        }
    }
 
 func addClient(client chan Message) {
    // Fan-in: read all future messages from client, and write them
    // to clients.
    go func(){
        for msg := range client {
            clients <- msg
        }
    }()
}
 

Замена значения переменной канала не является потокобезопасной (может привести к гонке данных), однако совершенно нормально , если несколько каналов одновременно записывают и считывают данные с одного и того же канала clients .

Комментарии:

1. вы уклоняетесь от самой трудной части. Синхронизируйте все остальное вокруг этого.

2. видишь play.golang.org/p/fuiiWI_V9aL может показаться, что это работает, но в 5% случаев это не так.

3. @mh-cbon смотрите мой другой ответ о фактической замене канала внутри для/выбора. Если код, который вы предоставляете, не всегда ведет себя так, как вы ожидали, лучше всего было бы открыть отдельный вопрос.

Ответ №2:

Гонка данных a.ch -это серьезная ошибка и недостаток дизайна. Вы можете обнаружить скачки данных, выполнив тесты с go test -race помощью или с помощью программы go run -race program.go .

Можно заменить значение символов, используемых в цикле for/select, если это правильно сделано в тексте обращения, а не в коде другой параллельной подпрограммы.

     replace := time.After(3 * time.Second)

    for {
        select {
        case v, ok := <-ch1:
            // use v...
        case v, ok := <-ch2:
            // use v...
        case <-replace:
            ch1 = anotherChannel
        }
    }
 

Этот пример выполняемого кода является пикантным (не делайте этого). Вы можете сохранить его на своей рабочей станции и попробовать с помощью детектора гонки данных.

Этот исправленный пример кода не является пикантным.

Ответ №3:

Может быть, это сработает;

проверьте это очень тщательно, это выполнимо, но трудно сделать это правильно.

Я ожидаю, что это будет не идеально. И ему не хватает обширных документов о том, что делать и чего не делать.

Это также не та версия, которая объединяет входной канал, она всегда потребляет только один входной канал за раз. Что может стать проблемой для производительности.

Единственная гарантия, которую я даю, заключается в том, что это бесплатная гонка.

Хотя я оставляю в качестве упражнения для читателя задачу написать расширенную версию.

 package main

import (
    "fmt"
)

func main() {
    m := New()
    go m.Run()
    input := m.Resize(0)
    input <- 5
    input <- 4
    close(input)

    input = m.Resize(10)
    input <- 6
    input <- 7
    close(input)

    input = m.Resize(2)
    input <- 8
    input <- 9
    close(input)

    m.Close()

    fmt.Println()

}

type masterOfThings struct {
    notify    chan notification
    wantClose chan chan bool
}

func New() masterOfThings {
    return masterOfThings{
        notify:    make(chan notification, 1),
        wantClose: make(chan chan bool),
    }
}

type notification struct {
    N   int
    out chan chan interface{}
}

func (m masterOfThings) Resize(n int) chan<- interface{} {
    N := notification{
        N:   n,
        out: make(chan chan interface{}, 1),
    }
    m.notify <- N
    return <-N.out
}

func (m masterOfThings) Close() {
    closed := make(chan bool)
    m.wantClose <- closed
    <-closed
}

func (m masterOfThings) Run() {
    var input chan interface{}
    inputs := []chan interface{}{}
    closers := []chan bool{}
    defer func() {
        for _, c := range closers {
            close(c)
        }
    }()
    var wantClose bool
    for {
        select {
        case m := <-m.wantClose:
            closers = append(closers, m)
            wantClose = true
            if len(inputs) < 1 amp;amp; input == nil {
                return
            }
        case n, ok := <-input:
            if !ok {
                input = nil
                if len(inputs) > 0 {
                    input = inputs[0]
                    copy(inputs, inputs[1:])
                    inputs = inputs[:len(inputs)-1]
                } else if wantClose {
                    return
                }
                continue
            }
            fmt.Println(n)
        case n := <-m.notify:
            nInput := make(chan interface{}, n.N)
            if input == nil {
                input = nInput
            } else {
                inputs = append(inputs, nInput)
            }
            n.out <- nInput
        }
    }
}