Сервер Python: как я могу непрерывно ставить задачи в очередь, следя за тем, чтобы обрабатывалась только одна за раз

#python #multithreading #object #queue

#python #многопоточность #объект #очередь

Вопрос:

У меня есть две задачи, одна из которых вызывается каждые две секунды, а другая вызывается в случайное время. Обоим необходимо получить доступ к объекту, который не может быть вызван до завершения предыдущего вызова (если это произойдет, мне нужно перезагрузить аппаратное устройство вручную).

Объект относится к классу, который позволяет осуществлять связь с аппаратным устройством через сокеты.

Для этого я создал класс потоков, чтобы запускать все в фоновом режиме, и никакие другие задачи не блокируются. В этом классе я реализовал очередь: Две разные функции помещают задачи в очередь, а рабочий должен выполнять задачи !!НЕТ!! одновременно.

Поскольку весь этот проект является сервером, он должен выполняться непрерывно.

Ну, вот мой код, и он, очевидно, не работает. Я был бы очень рад, если бы у кого-нибудь была подсказка о том, как это решить.

Обновление: 26.10.2020 Чтобы сделать мою проблему более понятной, я обновил код на основе ответа Артема Козырева.

 import time
from threading import Lock, Thread
import threading
from queue import Queue


class ThreadWorker(Thread):
    def __init__(self, _lock: Lock, _queue: Queue, name: str):
        # daemon=False means that process waits until all threads are finished
        # (not only main one and garbage collector)
        super().__init__(name=name, daemon=False)
        # lock prevents several worker threads do work simultaneously
        self.lock = _lock
        # tasks are send from the main thread via Queue
        self.queue = _queue

    def do_work(self, job):
        # lock context manager prevents other worker threads from working in the same time
        with self.lock:
            time.sleep(3)
            print(f"{threading.current_thread().getName()}: {job * 10}")

    def run(self):
        while True:
            job = self.queue.get()
            # "poison pillow" - stop message from queue
            if not job:
                break
            self.do_work(job)

def TimeStamp(msg):
    tElapsed = (time.time() - tStart)  # Display Thread Info
    sElap = int(tElapsed)
    msElap = int((tElapsed - sElap) * 1000)
    usElap = int((tElapsed - sElap - msElap / 1000) * 1000000)
    print(msg , ': ',  sElap, 's', msElap, 'ms', usElap, 'us')

def f1():
    TimeStamp("f1 start")
    time.sleep(2)
    TimeStamp("f1 finished")

def f2():
    TimeStamp("f2 start")
    time.sleep(6)
    TimeStamp("f2 finished")

def insertf1():
    for i in range(10):
        q.put(f1())
        time.sleep(2)

def insertf2():
    for i in range(10):
        time.sleep(10)
        q.put(f2())



q = Queue()
lock = Lock()
workers = [ThreadWorker(lock, q, f"Th-worker-{i}") for i in range(5)]  # create workers
for w in workers:
    w.start()

tStart = time.time()
threading.Thread(target=insertf1, daemon=True).start()
threading.Thread(target=insertf2, daemon=True).start()
  

Вывод:

начало f1: 0 с 0 мс 0 us

f1 завершено: 2 с 2 мс 515 us

запуск f1: 4 с 9 мс 335 us

f1 завершено: 6 с 9 мс 932 сша

запуск f1: 8 с 17 мс 428 us

запуск f2: 10 с 12 мс 794 us

f1 завершено: 10 с 28 мс 633 сша

начало f1: 12 с 29 мс 182 сша

f1 завершено: 14 с 34 мс 411 сша

f2 завершено: 16 с 19 мс 330 us

f1 началось до завершения f2, чего следует избегать.

Комментарии:

1. Если у вас есть только один поток, потребляющий из очереди, я не понимаю, почему вы беспокоитесь о одновременном выполнении 2 заданий?

2. Одно задание — это существующий фоновый поток, в котором после установления соединения с сокетом создается новый поток: CallFromExtern() . Другое задание также должно выполняться в фоновом режиме, чтобы не мешать работе пользователя. Для обоих заданий требуется доступ к похожему объекту, но разным функциям. Если пользователь выполняет этот запрос, а сервер в данный момент использует этот объект, аппаратное устройство выйдет из строя и его необходимо перезагрузить вручную. Поэтому мне нужна очередь, чтобы этот объект сохранялся в потоке и никогда не вызывался до завершения функции объектов.

3. Ваш ControlQueue класс наследует от обоих threading.Thread и queue.Queue , и в то же время внутри него есть экземпляры этих двух классов. Такая структура программы возможна, но, поскольку вы, похоже, вообще никогда не используете унаследованную queue.Queue функциональность, я бы рекомендовал пересмотреть проектные решения, прежде всего определить, сколько потоков и очередей необходимо, и какова их роль.

4. Вы не ставите f1 и f2 в очередь. Вы помещаете результаты вызова этих функций в очередь.

Ответ №1:

Для этого вам нужно объединить Queue и Lock . Блокировка не позволит рабочим потокам работать одновременно. Найдите пример кода ниже:

 import time
from threading import Lock, Thread
import threading
from queue import Queue


class ThreadWorker(Thread):
    def __init__(self, _lock: Lock, _queue: Queue, name: str):
        # daemon=False means that process waits until all threads are finished 
        # (not only main one and garbage collector)
        super().__init__(name=name, daemon=False) 
        # lock prevents several worker threads do work simultaneously
        self.lock = _lock
        # tasks are send from the main thread via Queue
        self.queue = _queue

    def do_work(self, job):
        # lock context manager prevents other worker threads from working in the same time
        with self.lock:
            time.sleep(3)
            print(f"{threading.current_thread().getName()}: {job * 10}")

    def run(self):
        while True:
            job = self.queue.get()
            # "poison pillow" - stop message from queue
            if not job:
                break
            self.do_work(job)


if __name__ == '__main__':
    q = Queue()
    lock = Lock()
    workers = [ThreadWorker(lock, q, f"Th-worker-{i}") for i in range(5)]  # create workers
    for w in workers:
        w.start()
    # produce tasks
    for i in range(10):
        q.put(i)
    # stop tasks with "poison pillow"
    for i in range(len(workers)):
        q.put(None)

  

Редактировать на основе дополнений к вопросу (добавлена блокировка)

Основная идея заключается в том, что вы не должны запускать f1 и f2 без блокировки.

 import time
from threading import Lock, Thread
import threading
from queue import Queue


class ThreadWorker(Thread):
    def __init__(self, _lock: Lock, _queue: Queue, name: str):
        # daemon=False means that process waits until all threads are finished
        # (not only main one and garbage collector)
        super().__init__(name=name, daemon=False)
        # lock prevents several worker threads do work simultaneously
        self.lock = _lock
        # tasks are send from the main thread via Queue
        self.queue = _queue

    def do_work(self, f):
        # lock context manager prevents other worker threads from working in the same time
        with self.lock:
            time.sleep(3)
            print(f"{threading.current_thread().getName()}: {f()}")

    def run(self):
        while True:
            job = self.queue.get()
            # "poison pillow" - stop message from queue
            if not job:
                break
            self.do_work(job)


def TimeStamp(msg):
    tElapsed = (time.time() - tStart)  # Display Thread Info
    sElap = int(tElapsed)
    msElap = int((tElapsed - sElap) * 1000)
    usElap = int((tElapsed - sElap - msElap / 1000) * 1000000)
    print(msg, ': ',  sElap, 's', msElap, 'ms', usElap, 'us')


def f1():
    TimeStamp("f1 start")
    time.sleep(1)
    TimeStamp("f1 finished")
    return f"Func-1-{threading.current_thread().getName()}"


def f2():
    TimeStamp("f2 start")
    time.sleep(3)
    TimeStamp("f2 finished")
    return f"Func-2-{threading.current_thread().getName()}"


def insertf1():
    for i in range(5):
        q.put(f1)  # do not run f1 here! Run it in worker thread with Lock


def insertf2():
    for i in range(5):
        q.put(f2) # do not run f2 here! Run it in worker thread with Lock


q = Queue()
lock = Lock()
workers = [ThreadWorker(lock, q, f"Th-worker-{i}") for i in range(5)]  # create workers
for w in workers:
    w.start()

tStart = time.time()
threading.Thread(target=insertf1, daemon=True).start()
threading.Thread(target=insertf2, daemon=True).start()
  

Комментарии:

1. Спасибо за ваш ответ, но это создает проблему, как у меня было раньше. Чтобы протестировать это, я создал два потока: threading. Поток (target= insertf1, daemon = True).start() многопоточность. Поток (target= insertf2, daemon = True).start(). Оба потока вставляют функции f1 (каждые 2 секунды) и f2 (каждые 10 секунд) в очередь. выполнение f1 занимает 2 секунды, а выполнение f2 — 6 секунд.

2. Теперь я напечатал время начала и остановки для каждой задачи: начало f1: 4 с 9 мс 335 us завершение f1: 6 с 9 мс 932 us начало f1: 8 с 17 мс 428 us начало f2: 10 с 12 мс 794 us завершение f1: 10 с 28 мс 633 us начало f1:12 с 29 мс 182 us f1 завершено: 14 с 34 мс 411 us f2 завершено: 16 с 19 мс 330 us Как вы можете видеть, f1 запускается до завершения f2.

3. @BastHut можете ли вы добавить к вопросу, что вы изменили в коде, чтобы добавить блокировку для функций? И на самом деле потоков в моем примере нет daemon=True , иначе они будут завершены, когда основной поток завершит свою работу

4. Я обновил основной вопрос на основе вашего кода, включая выходные данные

5. @BastHut Я добавил информацию к своему первоначальному ответу, я проверил ваш код и увидел, что f1 и f2 не были запущены с блокировкой. Я изменил логику insert1 и insert2, теперь они не передают готовый результат в очередь, а отправляют функцию рабочему, где вызывается функция. Обратите внимание, что у вас есть другой выбор, вы можете изменить свой код другим способом и использовать блокировку внутри f1 и f2, тогда вы отправите готовые результаты рабочим вместо самой функции.