Как задержать прямую трансляцию?

#go

#Вперед

Вопрос:

Я пытаюсь создать сервис в Go, который задерживает прямую трансляцию (socketio / SignalR) на ~ 7 минут. Он также должен разрешать поток без задержек. Таким образом, служба Go должна иметь что-то вроде буфера или очереди, которая заставляет данные ожидать указанной продолжительности, прежде чем им будет разрешено быть использованными. Как бы вы сделали что-то подобное в Go? Будет ли отложенный поток отдельной подпрограммой? Какую структуру данных следует использовать для задержки данных?

Моя текущая идея заключалась бы в том, чтобы использовать time пакет для ожидания в течение 7 минут, прежде чем данные будут разрешены к использованию, но такое поведение блокировки может быть неоптимальным в этом сценарии.

Вот некоторый код, объясняющий, что я пытаюсь сделать. FakeStream это фиктивная функция, которая имитирует данные прямой трансляции, которые я получаю от внешней службы.

 package main

import (
    "fmt"
    "time"
)

func DelayStream(input chan string, output chan string, delay string) {

    // not working for some reason
    // delayDuration, _ := time.ParseDuration(delay)
    // fmt.Println(delayDuration.Seconds())

    if delay == "5s" {
        fmt.Println("sleeping")
        time.Sleep(5 * time.Second)
    }
    data := <-input
    output <- data
}

func FakeStream(live chan string) {

    ticks := time.Tick(2 * time.Second)
    for now := range ticks {
        live <- fmt.Sprintf("%v", now.Format(time.UnixDate))
    }
}

func main() {
    liveData := make(chan string)
    delayedData := make(chan string)

    go FakeStream(liveData)
    go DelayStream(liveData, delayedData, "5s")

    for {
        select {
        case live := <-liveData:
            fmt.Println("live: ", live)
        case delayed := <-delayedData:
            fmt.Println("delayed: ", delayed)
        }
    }
}
  

По какой-то причине отложенный канал выводится только один раз, и он не выводит ожидаемые данные. Он должен выводить первое, что отображается в прямом канале, но этого не происходит.

Комментарии:

1. Можете ли вы поделиться своим фрагментом кода, чтобы сделать вашу методологию более понятной для окружающих?

2. Добавить задержку в код легко. Напишите что-нибудь, когда вы застряли, затем спросите здесь и включите свой код.

3. @flimzy добавил немного кода

Ответ №1:

Вам нужен буфер достаточного размера. В простых случаях может работать буферизованный канал Go.

Спросите себя — сколько данных нужно сохранить во время этой задержки — у вас должен быть разумный верхний предел. Например, если ваш поток доставляет до N пакетов в секунду, то для задержки на 7 минут вам потребуется сохранить 420N пакетов.

Спросите себя — что произойдет, если во время окна задержки поступит больше данных, чем ожидалось? Вы можете удалить новые данные, или выбросить старые данные, или просто заблокировать входной поток. Какие из них возможны для вашего сценария? Каждый приводит к немного отличающемуся решению.

Спросите себя — как вычисляется задержка? С момента создания потока? С момента поступления каждого пакета? Задержка для каждого пакета отдельно или только для первого пакета в потоке?

Здесь вам нужно значительно сузить выбор дизайна, чтобы разработать некоторый пример кода.

Для некоторых из этих вариантов дизайна вот простой способ добавить задержку между каналами для каждого сообщения:


 package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    // in is a channel of strings with a buffer size of 10
    in := make(chan string, 10)

    // out is an unbuffered channel
    out := make(chan string)

    // this goroutine forwards messages from in to out, ading a delay
    // to each message.
    const delay = 3 * time.Second
    go func() {
        for msg := range in {
            time.Sleep(delay)
            out <- msg
        }
        close(out)
    }()

    var wg sync.WaitGroup
    wg.Add(1)
    // this goroutine drains the out channel
    go func() {
        for msg := range out {
            fmt.Printf("Got '%s' at time %sn", msg, time.Now().Format(time.Stamp))
        }
        wg.Done()
    }()

    // Send some messages into the in channel
    fmt.Printf("Sending '%s' at time %sn", "joe", time.Now().Format(time.Stamp))
    in <- "joe"

    time.Sleep(2 * time.Second)
    fmt.Printf("Sending '%s' at time %sn", "hello", time.Now().Format(time.Stamp))
    in <- "hello"

    time.Sleep(4 * time.Second)
    fmt.Printf("Sending '%s' at time %sn", "bye", time.Now().Format(time.Stamp))
    in <- "bye"
    close(in)

    wg.Wait()
}
  

Комментарии:

1. Можете ли вы объяснить, почему вы используете WaitGroup?

2. Группы ожидания являются ключевым инструментом для координации процедур go. В приведенном выше случае при отслеживании wg ссылок основная функция не завершится, пока не завершится go func() процедура перехода. В примере автономного исполняемого файла, подобном этому, это имеет решающее значение, поскольку программа просто быстро завершит работу — если завершится основная процедура запуска. В этом примере процедура задержки запуска все еще выполняется — так что вежливо дать ей закончиться.

3. @colminator Но почему первая подпрограмма игнорирует это?

4. процедуры go полностью независимы по своей конструкции. Для полезной программы нужна координация. Процедуры перехода могут взаимодействовать через каналы, группы ожидания, блокировки и т.д., Или не взаимодействовать вообще — это зависит от вас.

5. Также, как бы мне изменить это, чтобы разрешить двум функциям считывать данные с одного канала? Как упоминалось в OP, данные в реальном времени также должны передаваться в потоковом режиме. Мое текущее решение работает не слишком хорошо.