Как решить проблему производителя/потребителя с помощью ThreadPoolExecutor

#python #multithreading #producer-consumer #concurrent.futures

Вопрос:

Я создал краткий класс потребителей/производителей, возможно, здесь уже есть некоторые замечания по потенциалу улучшения.

Мой вопрос: я прочитал в некоторых замечаниях, что это лучше сделать с помощью ThreadPoolExecutor. Я еще не выяснил, как реализовать запуск и остановку потока потребителей/производителей

 import logging
import random
from queue import Queue
from threading import  Thread, Event
from time import sleep


class Updater:
    def __init__(self):
        self.update_queue = Queue(5)
        self._logger = logging.getLogger(__name__)
        self._producer = None
        self._consumer = None
        self.producer_running = Event()
        self.consumer_running = Event()

    def producer(self):
        while self.producer_running.is_set():
            item = random.randint(0, 10)
            self.update_queue.put(item)
            sleep(0.1*float(random.randint(0,10)))

    def consumer(self):
        while self.consumer_running.is_set() or not self.update_queue.empty():
            item = self.update_queue.get()
            sleep(0.2*float(random.randint(0,10)))

    def start(self):
        if not self.producer_running.is_set():
            
            self.producer_running.set()
            self._producer = Thread( target=self.producer)
            self._producer.start()

            self.consumer_running.set()
            self._consumer = Thread( target=self.consumer)
            self._consumer.start()

    def stop(self):
            self.producer_running.clear()
            self._producer.join()

            self.consumer_running.clear()
            self._consumer.join()

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

    foo = Updater()
    foo.start()
    sleep(5)
    foo.stop()
 

Обновить

Теперь я создал метод остановки и запуска с помощью concurrent.futures , он работает, но я не уверен, так ли это задумано

     self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5) 

    def start(self):
        if not self.producer_running.is_set():
            
            self.producer_running.set()
            self._producer = self.executor.submit(self.producer)

            self.consumer_running.set()
            self._consumer = self.executor.submit(self.consumer)


    def stop(self):
            self.producer_running.clear()
            concurrent.futures.wait([self._producer])

            self.consumer_running.clear()
            concurrent.futures.wait([self._consumer])
            self.executor.shutdown()