Как отбросить значения в параллельном конвейере фильтрации с помощью Go?

# #go #concurrency #channels

Вопрос:

Я хотел бы знать и понять, как выполнить параллельный конвейер фильтрации с помощью go в схеме производитель/потребитель.

Мне удалось написать версию, которая проверяет значение и, если оно в порядке, отправляет его на один канал, а если нет, значение отправляется на другой канал.

После считывания и обработки значений are две программы отвечают за считывание обработанных значений и запись их в файл. Эта версия работает нормально. Но…

  1. Предположим, что мне не нужны недопустимые значения. Есть ли способ изменить оператор select (или пользовательскую подпрограмму) таким образом, чтобы выводились только правильные значения (т. Е. Использовался только один выходной канал). Я попытался удалить этот канал недействительных значений, но мне это не удалось.
  2. Я попытался поместить оператор select в if valid? ; с одной ветвью с полным оператором, как в этой версии, и в ложную ветвь, просто ожидая завершения канала. Таким образом, я мог бы отбросить недопустимые значения и использовать один канал, но при таком подходе мне это не удалось.

Есть идеи, как это исправить?

  1. Более того, в этой схеме я хотел бы знать, почему, если я опущу подпрограмму, которая удаляет значения из канала invalidValues, программа не завершится? Означает ли это, что канал необходимо очистить, иначе он останется заблокированным? Есть ли более элегантный способ сделать это, чтобы сделать диапазон значений?

Спасибо!!

 //Consumers var wg sync.WaitGroup wg.Add(Workers) for i := 0; i lt; Workers; i   {  // Deploy #Workers to read from the inputStream perform validation and output the valid results to one channel and the invalid to another  go func() {  for value := range inputStream {  var c *chan string  dataToWrite := value  if valid := checkValue(value); valid {  dataToWrite = value  c = amp;outputStream  } else {  c = amp;invalidValues  }  select {  case *c lt;- dataToWrite:  case lt;-done:  return  }  time.Sleep(time.Duration(5) * time.Second)  }  wg.Done()  }() }  

Вот полная версия кода

 done := make(chan struct{}) defer close(done) inputStream := make(chan string) outputStream := make(chan string) invalidValues := make(chan string)  //Producer reads a file with values and stores them in a channel go func() {  count := 0  scanner := bufio.NewScanner(file)  for scanner.Scan() {  inputStream lt;- strings.TrimSpace(scanner.Text())  count = count   1  }  close(inputStream) }()  //Consumers var wg sync.WaitGroup wg.Add(Workers) for i := 0; i lt; Workers; i   {  // Deploy #Workers to read from the inputStream perform validation and output the valid results to one channel and the invalid to another  go func() {  for value := range inputStream {  var c *chan string  dataToWrite := value  if valid := checkValue(value); valid {  dataToWrite = value  c = amp;outputStream  } else {  c = amp;invalidValues  }  select {  case *c lt;- dataToWrite:  case lt;-done:  return  }  time.Sleep(time.Duration(5) * time.Second)  }  wg.Done()  }() }  go func() {  wg.Wait()  close(outputStream)  close(invalidValues) }()  //Write outputStream file resultFile, err := os.Create("outputStream.txt") if err != nil {  log.Fatal(err) }  //Error file errorFile, err := os.Create("errors.txt") if err != nil {  log.Fatal(err) }  //Create two goruotines for writing the outputStream file var wg2 sync.WaitGroup wg2.Add(2) go func() {  //Write outputStream and error to files  for r := range outputStream {  _, err := resultFile.WriteString(r   "n")  if err != nil {  log.Fatal(err)  }  }  resultFile.Close()  wg2.Done() }()  go func() {  for r := range invalidValues {  _, err := errorFile.WriteString(r   "n")  if err != nil {  log.Fatal(err)  }  }  errorFile.Close()  wg2.Done() }() wg2.Wait()  

Ответ №1:

Чтобы удалить недопустимый канал:

 for value := range inputStream {  var c *chan string  if valid := checkValue(value); valid {  select {  case outputStream lt;- value  case lt;-done:  return  }  }  }  

Если вы удалите подпрограмму чтения недопустимых значений, вам необходимо изменить группу ожидания на:

 wg2.Add(1)  

так что вам не придется ждать бесконечно.