#python #multithreading #object #queue
#python #многопоточность #объект #очередь
Вопрос:
У меня есть две задачи, одна из которых вызывается каждые две секунды, а другая вызывается в случайное время. Обоим необходимо получить доступ к объекту, который не может быть вызван до завершения предыдущего вызова (если это произойдет, мне нужно перезагрузить аппаратное устройство вручную).
Объект относится к классу, который позволяет осуществлять связь с аппаратным устройством через сокеты.
Для этого я создал класс потоков, чтобы запускать все в фоновом режиме, и никакие другие задачи не блокируются. В этом классе я реализовал очередь: Две разные функции помещают задачи в очередь, а рабочий должен выполнять задачи !!НЕТ!! одновременно.
Поскольку весь этот проект является сервером, он должен выполняться непрерывно.
Ну, вот мой код, и он, очевидно, не работает. Я был бы очень рад, если бы у кого-нибудь была подсказка о том, как это решить.
Обновление: 26.10.2020 Чтобы сделать мою проблему более понятной, я обновил код на основе ответа Артема Козырева.
import time
from threading import Lock, Thread
import threading
from queue import Queue
class ThreadWorker(Thread):
def __init__(self, _lock: Lock, _queue: Queue, name: str):
# daemon=False means that process waits until all threads are finished
# (not only main one and garbage collector)
super().__init__(name=name, daemon=False)
# lock prevents several worker threads do work simultaneously
self.lock = _lock
# tasks are send from the main thread via Queue
self.queue = _queue
def do_work(self, job):
# lock context manager prevents other worker threads from working in the same time
with self.lock:
time.sleep(3)
print(f"{threading.current_thread().getName()}: {job * 10}")
def run(self):
while True:
job = self.queue.get()
# "poison pillow" - stop message from queue
if not job:
break
self.do_work(job)
def TimeStamp(msg):
tElapsed = (time.time() - tStart) # Display Thread Info
sElap = int(tElapsed)
msElap = int((tElapsed - sElap) * 1000)
usElap = int((tElapsed - sElap - msElap / 1000) * 1000000)
print(msg , ': ', sElap, 's', msElap, 'ms', usElap, 'us')
def f1():
TimeStamp("f1 start")
time.sleep(2)
TimeStamp("f1 finished")
def f2():
TimeStamp("f2 start")
time.sleep(6)
TimeStamp("f2 finished")
def insertf1():
for i in range(10):
q.put(f1())
time.sleep(2)
def insertf2():
for i in range(10):
time.sleep(10)
q.put(f2())
q = Queue()
lock = Lock()
workers = [ThreadWorker(lock, q, f"Th-worker-{i}") for i in range(5)] # create workers
for w in workers:
w.start()
tStart = time.time()
threading.Thread(target=insertf1, daemon=True).start()
threading.Thread(target=insertf2, daemon=True).start()
Вывод:
начало f1: 0 с 0 мс 0 us
f1 завершено: 2 с 2 мс 515 us
запуск f1: 4 с 9 мс 335 us
f1 завершено: 6 с 9 мс 932 сша
запуск f1: 8 с 17 мс 428 us
запуск f2: 10 с 12 мс 794 us
f1 завершено: 10 с 28 мс 633 сша
начало f1: 12 с 29 мс 182 сша
f1 завершено: 14 с 34 мс 411 сша
f2 завершено: 16 с 19 мс 330 us
f1 началось до завершения f2, чего следует избегать.
Комментарии:
1. Если у вас есть только один поток, потребляющий из очереди, я не понимаю, почему вы беспокоитесь о одновременном выполнении 2 заданий?
2. Одно задание — это существующий фоновый поток, в котором после установления соединения с сокетом создается новый поток: CallFromExtern() . Другое задание также должно выполняться в фоновом режиме, чтобы не мешать работе пользователя. Для обоих заданий требуется доступ к похожему объекту, но разным функциям. Если пользователь выполняет этот запрос, а сервер в данный момент использует этот объект, аппаратное устройство выйдет из строя и его необходимо перезагрузить вручную. Поэтому мне нужна очередь, чтобы этот объект сохранялся в потоке и никогда не вызывался до завершения функции объектов.
3. Ваш
ControlQueue
класс наследует от обоихthreading.Thread
иqueue.Queue
, и в то же время внутри него есть экземпляры этих двух классов. Такая структура программы возможна, но, поскольку вы, похоже, вообще никогда не используете унаследованнуюqueue.Queue
функциональность, я бы рекомендовал пересмотреть проектные решения, прежде всего определить, сколько потоков и очередей необходимо, и какова их роль.4. Вы не ставите
f1
иf2
в очередь. Вы помещаете результаты вызова этих функций в очередь.
Ответ №1:
Для этого вам нужно объединить Queue
и Lock
. Блокировка не позволит рабочим потокам работать одновременно. Найдите пример кода ниже:
import time
from threading import Lock, Thread
import threading
from queue import Queue
class ThreadWorker(Thread):
def __init__(self, _lock: Lock, _queue: Queue, name: str):
# daemon=False means that process waits until all threads are finished
# (not only main one and garbage collector)
super().__init__(name=name, daemon=False)
# lock prevents several worker threads do work simultaneously
self.lock = _lock
# tasks are send from the main thread via Queue
self.queue = _queue
def do_work(self, job):
# lock context manager prevents other worker threads from working in the same time
with self.lock:
time.sleep(3)
print(f"{threading.current_thread().getName()}: {job * 10}")
def run(self):
while True:
job = self.queue.get()
# "poison pillow" - stop message from queue
if not job:
break
self.do_work(job)
if __name__ == '__main__':
q = Queue()
lock = Lock()
workers = [ThreadWorker(lock, q, f"Th-worker-{i}") for i in range(5)] # create workers
for w in workers:
w.start()
# produce tasks
for i in range(10):
q.put(i)
# stop tasks with "poison pillow"
for i in range(len(workers)):
q.put(None)
Редактировать на основе дополнений к вопросу (добавлена блокировка)
Основная идея заключается в том, что вы не должны запускать f1 и f2 без блокировки.
import time
from threading import Lock, Thread
import threading
from queue import Queue
class ThreadWorker(Thread):
def __init__(self, _lock: Lock, _queue: Queue, name: str):
# daemon=False means that process waits until all threads are finished
# (not only main one and garbage collector)
super().__init__(name=name, daemon=False)
# lock prevents several worker threads do work simultaneously
self.lock = _lock
# tasks are send from the main thread via Queue
self.queue = _queue
def do_work(self, f):
# lock context manager prevents other worker threads from working in the same time
with self.lock:
time.sleep(3)
print(f"{threading.current_thread().getName()}: {f()}")
def run(self):
while True:
job = self.queue.get()
# "poison pillow" - stop message from queue
if not job:
break
self.do_work(job)
def TimeStamp(msg):
tElapsed = (time.time() - tStart) # Display Thread Info
sElap = int(tElapsed)
msElap = int((tElapsed - sElap) * 1000)
usElap = int((tElapsed - sElap - msElap / 1000) * 1000000)
print(msg, ': ', sElap, 's', msElap, 'ms', usElap, 'us')
def f1():
TimeStamp("f1 start")
time.sleep(1)
TimeStamp("f1 finished")
return f"Func-1-{threading.current_thread().getName()}"
def f2():
TimeStamp("f2 start")
time.sleep(3)
TimeStamp("f2 finished")
return f"Func-2-{threading.current_thread().getName()}"
def insertf1():
for i in range(5):
q.put(f1) # do not run f1 here! Run it in worker thread with Lock
def insertf2():
for i in range(5):
q.put(f2) # do not run f2 here! Run it in worker thread with Lock
q = Queue()
lock = Lock()
workers = [ThreadWorker(lock, q, f"Th-worker-{i}") for i in range(5)] # create workers
for w in workers:
w.start()
tStart = time.time()
threading.Thread(target=insertf1, daemon=True).start()
threading.Thread(target=insertf2, daemon=True).start()
Комментарии:
1. Спасибо за ваш ответ, но это создает проблему, как у меня было раньше. Чтобы протестировать это, я создал два потока: threading. Поток (target= insertf1, daemon = True).start() многопоточность. Поток (target= insertf2, daemon = True).start(). Оба потока вставляют функции f1 (каждые 2 секунды) и f2 (каждые 10 секунд) в очередь. выполнение f1 занимает 2 секунды, а выполнение f2 — 6 секунд.
2. Теперь я напечатал время начала и остановки для каждой задачи: начало f1: 4 с 9 мс 335 us завершение f1: 6 с 9 мс 932 us начало f1: 8 с 17 мс 428 us начало f2: 10 с 12 мс 794 us завершение f1: 10 с 28 мс 633 us начало f1:12 с 29 мс 182 us f1 завершено: 14 с 34 мс 411 us f2 завершено: 16 с 19 мс 330 us Как вы можете видеть, f1 запускается до завершения f2.
3. @BastHut можете ли вы добавить к вопросу, что вы изменили в коде, чтобы добавить блокировку для функций? И на самом деле потоков в моем примере нет
daemon=True
, иначе они будут завершены, когда основной поток завершит свою работу4. Я обновил основной вопрос на основе вашего кода, включая выходные данные
5. @BastHut Я добавил информацию к своему первоначальному ответу, я проверил ваш код и увидел, что f1 и f2 не были запущены с блокировкой. Я изменил логику insert1 и insert2, теперь они не передают готовый результат в очередь, а отправляют функцию рабочему, где вызывается функция. Обратите внимание, что у вас есть другой выбор, вы можете изменить свой код другим способом и использовать блокировку внутри f1 и f2, тогда вы отправите готовые результаты рабочим вместо самой функции.