#python #multithreading #design-patterns #queue #producer-consumer
#питон #многопоточность #дизайн-шаблоны #очередь #производитель-потребитель
Вопрос:
Я пытаюсь изучить шаблон «Производитель-потребитель», реализующий его в python. Я могу заставить это работать, но по какой-то причине потребители продолжают слушать что-то в очереди и не заканчивают сценарий.
Я знаю, что это ожидаемое поведение, поскольку производитель может продолжать добавлять вещи в очередь с другой скоростью, с которой потребляют потребители. Однако в моем случае у меня уже есть список, который будет обработан очередью, и я могу гарантировать, что в будущем никакие другие элементы не будут добавлены.
Вот полный рабочий код:
from threading import Thread import time import random from queue import Queue queue = Queue(10) class ProducerThread(Thread): def __init__(self, nums): super().__init__() self.nums = nums def run(self): global queue while self.nums: num = self.nums.pop(0) queue.put(num) print("Produced", num) time.sleep(1) class ConsumerThread(Thread): def __init__(self, id): super().__init__() self.id = id def run(self): global queue while True: num = queue.get() ##do something here queue.task_done() print(f"Consumed {num} in consumer {self.id}") time.sleep(1) p = ProducerThread(list(range(5))) l1 = ConsumerThread(1) l2 = ConsumerThread(2) p.start() l1.start() l2.start() p.join() l1.join() l2.join()
Какое условие я могу заменить в потребителе while True
, чтобы он понял, что сценарий завершен?
Заранее спасибо.
Комментарии:
1. Вам нужно, чтобы ваша
Producer
нить каким-то образом говорила: «Я закончил». Обычно это делается либо путем помещения специального маркера в очередь, например «СТОП», либо путем установки события. В противном случаеConsumer
у него нет возможности узнать, закончен ли Продюсер или это займет много времени.2. Не могли бы вы мягко привести пример? К сожалению, я очень новичок в очередях на python
Ответ №1:
Мой ответ написан, как вы и просили.
STOP_TOKEN = "STOP" # Anything that wouldn't normally be in your queue. class ProducerThread(Thread): ... def run(self): global queue while self.nums: num = self.nums.pop(0) queue.put(num) print("Produced", num) time.sleep(1) queue.put(STOP_TOKEN) class ConsumerThread(Thread): ... def run(self): global queue while True: num = queue.get() if num == STOP_TOKEN: break ##do something here queue.task_done() print(f"Consumed {num} in consumer {self.id}") time.sleep(1)
Комментарии:
1. Спасибо вам за ваше предложение. К сожалению, выполнение некоторых тестов с вашим решением показало, что код все еще выполняется при использовании нескольких пользователей. Я не знаю, почему.
2. я только что понял. Вам нужно
STOP_TOKEN
вернуться в очередь передbreak
выпиской. Таким образом, он уничтожит всех доступных потребителей. Я отредактирую ваш ответ. Большое спасибо!