#python #multithreading #producer-consumer #concurrent.futures
Вопрос:
Я создал краткий класс потребителей/производителей, возможно, здесь уже есть некоторые замечания по потенциалу улучшения.
Мой вопрос: я прочитал в некоторых замечаниях, что это лучше сделать с помощью ThreadPoolExecutor. Я еще не выяснил, как реализовать запуск и остановку потока потребителей/производителей
import logging
import random
from queue import Queue
from threading import Thread, Event
from time import sleep
class Updater:
def __init__(self):
self.update_queue = Queue(5)
self._logger = logging.getLogger(__name__)
self._producer = None
self._consumer = None
self.producer_running = Event()
self.consumer_running = Event()
def producer(self):
while self.producer_running.is_set():
item = random.randint(0, 10)
self.update_queue.put(item)
sleep(0.1*float(random.randint(0,10)))
def consumer(self):
while self.consumer_running.is_set() or not self.update_queue.empty():
item = self.update_queue.get()
sleep(0.2*float(random.randint(0,10)))
def start(self):
if not self.producer_running.is_set():
self.producer_running.set()
self._producer = Thread( target=self.producer)
self._producer.start()
self.consumer_running.set()
self._consumer = Thread( target=self.consumer)
self._consumer.start()
def stop(self):
self.producer_running.clear()
self._producer.join()
self.consumer_running.clear()
self._consumer.join()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
foo = Updater()
foo.start()
sleep(5)
foo.stop()
Обновить
Теперь я создал метод остановки и запуска с помощью concurrent.futures , он работает, но я не уверен, так ли это задумано
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
def start(self):
if not self.producer_running.is_set():
self.producer_running.set()
self._producer = self.executor.submit(self.producer)
self.consumer_running.set()
self._consumer = self.executor.submit(self.consumer)
def stop(self):
self.producer_running.clear()
concurrent.futures.wait([self._producer])
self.consumer_running.clear()
concurrent.futures.wait([self._consumer])
self.executor.shutdown()