Очередь Python, ожидающая потока перед получением следующего элемента

#python #multithreading #queue

#python #многопоточность #очередь

Вопрос:

У меня есть очередь, которая всегда должна быть готова обрабатывать элементы, когда они добавляются в нее. Функция, которая выполняется для каждого элемента в очереди, создает и запускает поток для выполнения операции в фоновом режиме, чтобы программа могла выполнять другие действия.

Однако функция, которую я вызываю для каждого элемента в очереди, просто запускает поток, а затем завершает выполнение, независимо от того, завершен ли поток, который она запустила, или нет. Из-за этого цикл перейдет к следующему элементу в очереди до того, как программа закончит обработку последнего элемента.

Вот код, чтобы лучше продемонстрировать, что я пытаюсь сделать:

 queue = Queue.Queue()
t = threading.Thread(target=worker)
t.start()

def addTask():
    queue.put(SomeObject())

def worker():
    while True:
        try:
            # If an item is put onto the queue, immediately execute it (unless 
            # an item on the queue is still being processed, in which case wait 
            # for it to complete before moving on to the next item in the queue)
            item = queue.get()
            runTests(item)
            # I want to wait for 'runTests' to complete before moving past this point
        except Queue.Empty, err:
            # If the queue is empty, just keep running the loop until something 
            # is put on top of it.
            pass

def runTests(args):
    op_thread = SomeThread(args)
    op_thread.start()
    # My problem is once this last line 't.start()' starts the thread, 
    # the 'runTests' function completes operation, but the operation executed
    # by some thread is not yet done executing because it is still running in
    # the background. I do not want the 'runTests' function to actually complete
    # execution until the operation in thread t is done executing.
    """t.join()"""
    # I tried putting this line after 't.start()', but that did not solve anything.
    # I have commented it out because it is not necessary to demonstrate what 
    # I am trying to do, but I just wanted to show that I tried it.
  

Некоторые примечания:

Все это выполняется в приложении PyGTK. Как только операция ‘SomeThread’ завершена, она отправляет обратный вызов в графический интерфейс для отображения результатов операции.

Я не знаю, насколько это влияет на проблему, с которой я сталкиваюсь, но я подумал, что это может быть важно.

Комментарии:

1. Я не понимаю проблемы. Вы можете использовать Thread.join для остановки выполнения до завершения потока, если это то, что вы ищете. Но ваш вопрос совершенно неясен.

2. вы назначаете t дважды, один раз в глобальной области видимости и один раз в функции runTests. Это действительно репрезентативный пример? Можете ли вы показать нам полный пример кода, который демонстрирует проблему, с которой вы столкнулись?

3. На самом деле это было не так в моем коде. Я пытался выразить то, что я хотел сделать, в более простых терминах и избавился от всей фактической обработки данных, происходящей в каждой функции. В любом случае, я рассматриваю возможность просто записать псевдокод того, что я хочу сделать, и посмотреть, знает ли кто-нибудь, как это сделать, потому что у меня пограничный синдром дауна и я не могу правильно передать свой вопрос.

Ответ №1:

Фундаментальная проблема с потоками Python заключается в том, что вы не можете просто убить их — они должны согласиться умереть.

Что вы должны сделать, это:

  1. Реализуйте поток как класс
  2. Добавьте threading.Event элемент, который join метод очищает, и основной цикл потока время от времени проверяет. Если он видит, что он очищен, он возвращается. Для этого переопределения threading.Thread.join необходимо проверить событие, а затем вызвать Thread.join само себя
  3. Чтобы разрешить (2), выполните чтение из Queue блока с небольшим таймаутом. Таким образом, «время отклика» вашего потока на запрос на уничтожение будет равняться таймауту, и OTOH не блокирует процессор

Вот некоторый код из потока клиента сокета, который у меня есть, который имеет ту же проблему с блокировкой в очереди:

 class SocketClientThread(threading.Thread):
    """ Implements the threading.Thread interface (start, join, etc.) and
        can be controlled via the cmd_q Queue attribute. Replies are placed in
        the reply_q Queue attribute.
    """
    def __init__(self, cmd_q=Queue.Queue(), reply_q=Queue.Queue()):
        super(SocketClientThread, self).__init__()
        self.cmd_q = cmd_q
        self.reply_q = reply_q
        self.alive = threading.Event()
        self.alive.set()
        self.socket = None

        self.handlers = {
            ClientCommand.CONNECT: self._handle_CONNECT,
            ClientCommand.CLOSE: self._handle_CLOSE,
            ClientCommand.SEND: self._handle_SEND,
            ClientCommand.RECEIVE: self._handle_RECEIVE,
        }

    def run(self):
        while self.alive.isSet():
            try:
                # Queue.get with timeout to allow checking self.alive
                cmd = self.cmd_q.get(True, 0.1)
                self.handlers[cmd.type](cmd)
            except Queue.Empty as e:
                continue

    def join(self, timeout=None):
        self.alive.clear()
        threading.Thread.join(self, timeout)
  

Обратите внимание self.alive и цикл в run .

Комментарии:

1. В итоге я обнаружил проблему в своей программе (скрипт BASH далеко-далеко во внешней системе, который вмешивался в процессы [возможно, признак того, что моя программа плохо спроектирована, но это проблема для другого раза]). Тем не менее, это очень хороший ответ и намного более элегантный, чем то, что я в итоге сделал. Как только я пойму ваш код, я попробую реализовать что-то подобное в своем. Я бы сделал это, если бы у меня был представитель для этого.

2. @Kededro: np. не стесняйтесь спрашивать в комментарии, нужна ли вам помощь в понимании этого