Как мне закончить этот сценарий «производитель-потребитель»?

#python #multithreading #design-patterns #queue #producer-consumer

#питон #многопоточность #дизайн-шаблоны #очередь #производитель-потребитель

Вопрос:

Я пытаюсь изучить шаблон «Производитель-потребитель», реализующий его в python. Я могу заставить это работать, но по какой-то причине потребители продолжают слушать что-то в очереди и не заканчивают сценарий.

Я знаю, что это ожидаемое поведение, поскольку производитель может продолжать добавлять вещи в очередь с другой скоростью, с которой потребляют потребители. Однако в моем случае у меня уже есть список, который будет обработан очередью, и я могу гарантировать, что в будущем никакие другие элементы не будут добавлены.

Вот полный рабочий код:

 from threading import Thread import time import random from queue import Queue  queue = Queue(10)  class ProducerThread(Thread):  def __init__(self, nums):  super().__init__()  self.nums = nums   def run(self):  global queue  while self.nums:  num = self.nums.pop(0)  queue.put(num)  print("Produced", num)  time.sleep(1)  class ConsumerThread(Thread):  def __init__(self, id):  super().__init__()  self.id = id   def run(self):  global queue  while True:  num = queue.get()  ##do something here  queue.task_done()  print(f"Consumed {num} in consumer {self.id}")  time.sleep(1)     p = ProducerThread(list(range(5)))  l1 = ConsumerThread(1) l2 = ConsumerThread(2)  p.start() l1.start() l2.start()  p.join() l1.join() l2.join()   

Какое условие я могу заменить в потребителе while True , чтобы он понял, что сценарий завершен?

Заранее спасибо.

Комментарии:

1. Вам нужно, чтобы ваша Producer нить каким-то образом говорила: «Я закончил». Обычно это делается либо путем помещения специального маркера в очередь, например «СТОП», либо путем установки события. В противном случае Consumer у него нет возможности узнать, закончен ли Продюсер или это займет много времени.

2. Не могли бы вы мягко привести пример? К сожалению, я очень новичок в очередях на python

Ответ №1:

Мой ответ написан, как вы и просили.

 STOP_TOKEN = "STOP" # Anything that wouldn't normally be in your queue.  class ProducerThread(Thread):  ...  def run(self):  global queue  while self.nums:  num = self.nums.pop(0)  queue.put(num)  print("Produced", num)  time.sleep(1)  queue.put(STOP_TOKEN)     class ConsumerThread(Thread):  ...  def run(self):  global queue  while True:  num = queue.get()  if num == STOP_TOKEN:  break  ##do something here  queue.task_done()  print(f"Consumed {num} in consumer {self.id}")  time.sleep(1)  

Комментарии:

1. Спасибо вам за ваше предложение. К сожалению, выполнение некоторых тестов с вашим решением показало, что код все еще выполняется при использовании нескольких пользователей. Я не знаю, почему.

2. я только что понял. Вам нужно STOP_TOKEN вернуться в очередь перед break выпиской. Таким образом, он уничтожит всех доступных потребителей. Я отредактирую ваш ответ. Большое спасибо!