#python #multithreading #queue
#python #многопоточность #очередь
Вопрос:
У меня есть очередь, которая всегда должна быть готова обрабатывать элементы, когда они добавляются в нее. Функция, которая выполняется для каждого элемента в очереди, создает и запускает поток для выполнения операции в фоновом режиме, чтобы программа могла выполнять другие действия.
Однако функция, которую я вызываю для каждого элемента в очереди, просто запускает поток, а затем завершает выполнение, независимо от того, завершен ли поток, который она запустила, или нет. Из-за этого цикл перейдет к следующему элементу в очереди до того, как программа закончит обработку последнего элемента.
Вот код, чтобы лучше продемонстрировать, что я пытаюсь сделать:
queue = Queue.Queue()
t = threading.Thread(target=worker)
t.start()
def addTask():
queue.put(SomeObject())
def worker():
while True:
try:
# If an item is put onto the queue, immediately execute it (unless
# an item on the queue is still being processed, in which case wait
# for it to complete before moving on to the next item in the queue)
item = queue.get()
runTests(item)
# I want to wait for 'runTests' to complete before moving past this point
except Queue.Empty, err:
# If the queue is empty, just keep running the loop until something
# is put on top of it.
pass
def runTests(args):
op_thread = SomeThread(args)
op_thread.start()
# My problem is once this last line 't.start()' starts the thread,
# the 'runTests' function completes operation, but the operation executed
# by some thread is not yet done executing because it is still running in
# the background. I do not want the 'runTests' function to actually complete
# execution until the operation in thread t is done executing.
"""t.join()"""
# I tried putting this line after 't.start()', but that did not solve anything.
# I have commented it out because it is not necessary to demonstrate what
# I am trying to do, but I just wanted to show that I tried it.
Некоторые примечания:
Все это выполняется в приложении PyGTK. Как только операция ‘SomeThread’ завершена, она отправляет обратный вызов в графический интерфейс для отображения результатов операции.
Я не знаю, насколько это влияет на проблему, с которой я сталкиваюсь, но я подумал, что это может быть важно.
Комментарии:
1. Я не понимаю проблемы. Вы можете использовать
Thread.join
для остановки выполнения до завершения потока, если это то, что вы ищете. Но ваш вопрос совершенно неясен.2. вы назначаете t дважды, один раз в глобальной области видимости и один раз в функции runTests. Это действительно репрезентативный пример? Можете ли вы показать нам полный пример кода, который демонстрирует проблему, с которой вы столкнулись?
3. На самом деле это было не так в моем коде. Я пытался выразить то, что я хотел сделать, в более простых терминах и избавился от всей фактической обработки данных, происходящей в каждой функции. В любом случае, я рассматриваю возможность просто записать псевдокод того, что я хочу сделать, и посмотреть, знает ли кто-нибудь, как это сделать, потому что у меня пограничный синдром дауна и я не могу правильно передать свой вопрос.
Ответ №1:
Фундаментальная проблема с потоками Python заключается в том, что вы не можете просто убить их — они должны согласиться умереть.
Что вы должны сделать, это:
- Реализуйте поток как класс
- Добавьте
threading.Event
элемент, которыйjoin
метод очищает, и основной цикл потока время от времени проверяет. Если он видит, что он очищен, он возвращается. Для этого переопределенияthreading.Thread.join
необходимо проверить событие, а затем вызватьThread.join
само себя - Чтобы разрешить (2), выполните чтение из
Queue
блока с небольшим таймаутом. Таким образом, «время отклика» вашего потока на запрос на уничтожение будет равняться таймауту, и OTOH не блокирует процессор
Вот некоторый код из потока клиента сокета, который у меня есть, который имеет ту же проблему с блокировкой в очереди:
class SocketClientThread(threading.Thread):
""" Implements the threading.Thread interface (start, join, etc.) and
can be controlled via the cmd_q Queue attribute. Replies are placed in
the reply_q Queue attribute.
"""
def __init__(self, cmd_q=Queue.Queue(), reply_q=Queue.Queue()):
super(SocketClientThread, self).__init__()
self.cmd_q = cmd_q
self.reply_q = reply_q
self.alive = threading.Event()
self.alive.set()
self.socket = None
self.handlers = {
ClientCommand.CONNECT: self._handle_CONNECT,
ClientCommand.CLOSE: self._handle_CLOSE,
ClientCommand.SEND: self._handle_SEND,
ClientCommand.RECEIVE: self._handle_RECEIVE,
}
def run(self):
while self.alive.isSet():
try:
# Queue.get with timeout to allow checking self.alive
cmd = self.cmd_q.get(True, 0.1)
self.handlers[cmd.type](cmd)
except Queue.Empty as e:
continue
def join(self, timeout=None):
self.alive.clear()
threading.Thread.join(self, timeout)
Обратите внимание self.alive
и цикл в run
.
Комментарии:
1. В итоге я обнаружил проблему в своей программе (скрипт BASH далеко-далеко во внешней системе, который вмешивался в процессы [возможно, признак того, что моя программа плохо спроектирована, но это проблема для другого раза]). Тем не менее, это очень хороший ответ и намного более элегантный, чем то, что я в итоге сделал. Как только я пойму ваш код, я попробую реализовать что-то подобное в своем. Я бы сделал это, если бы у меня был представитель для этого.
2. @Kededro: np. не стесняйтесь спрашивать в комментарии, нужна ли вам помощь в понимании этого