Многопроцессорная обработка Python: добавление в очередь в дочернем процессе

#python #multiprocessing #python-multiprocessing

#python #многопроцессорная обработка #python-многопроцессорная обработка

Вопрос:

Я хочу реализовать поисковый робот, который хранит данные в Mongo. Я хотел бы использовать multiprocessing как способ «передачи» задач блокировки, таких как распаковка файлов, сканирование файлов и загрузка в Mongo. Существуют определенные задачи, которые зависят от других задач (т. Е. Файл необходимо разархивировать, прежде чем можно будет сканировать файлы), поэтому я хотел бы иметь возможность выполнять необходимые задачи и добавлять новые в ту же очередь задач.

Ниже приведено то, что у меня сейчас есть:

 import multiprocessing


class Worker(multiprocessing.Process):
    def __init__(self, task_queue: multiprocessing.Queue):
        super(Worker, self).__init__()
        self.task_queue = task_queue

    def run(self):
        for (function, *args) in iter(self.task_queue.get, None):
            print(f'Running: {function.__name__}({*args,})')

            # Run the provided function with its parameters in child process
            function(*args)

            self.task_queue.task_done()


def foo(task_queue: multiprocessing.Queue) -> None:
    print('foo')
    # Add new task to queue from this child process
    task_queue.put((bar, 1))


def bar(x: int) -> None:
    print(f'bar: {x}')


def main():
    # Start workers on separate processes
    workers = []
    manager = multiprocessing.Manager()
    task_queue = manager.Queue()
    for i in range(multiprocessing.cpu_count()):
        worker = Worker(task_queue)
        workers.append(worker)
        worker.start()

    # Run foo on child process using the queue as parameter
    task_queue.put((foo, task_queue))

    for _ in workers:
        task_queue.put(None)

    # Block until workers complete and join main process
    for worker in workers:
        worker.join()

    print('Program completed.')


if __name__ == '__main__':
    main()
 

Ожидаемое поведение:

 Running: foo((<AutoProxy[Queue] object, typeid 'Queue' at 0x1b963548908>,))
foo
Running: bar((1,))
bar: 1
Program completed.
 

Фактическое поведение:

 Running: foo((<AutoProxy[Queue] object, typeid 'Queue' at 0x1b963548908>,))
foo
Program completed.
 

Я новичок в многопроцессорной обработке, поэтому буду признателен за любую помощь.

Комментарии:

1. Вы помещаете a None в очередь для каждого из рабочих (что приводит к их завершению), прежде чем у кого-либо из рабочих была возможность добавить свои собственные элементы в очередь. На самом деле, у вас интересная проблема. Вы не можете остановить ни одного работника, пока каждый работник не перейдет в состояние ожидания. Пока один рабочий все еще работает, есть шанс, что это добавит намного больше работы в очередь. Я не уверен в решении.

2. Довольно неважно, но код говорит print('Program completed.') , что пока ваш вывод Program ended.

3. @lenin Упс, хотел бы улучшить ведение журнала. Исправлено.

4. @FrankYellin, ах да, имеет смысл. Я изучил дальнейшее ожидание, пока очередь не опустеет, и считаю, что нашел решение. Спасибо.

Ответ №1:

Как отметил @FrankYellin, это связано с тем, что None вводится task_queue до bar того, как может быть добавлен.

Предполагая, что очередь будет либо непустой, либо ожидает завершения задачи во время выполнения программы (что верно в моем случае), join можно использовать метод в очереди. Согласно документам:

Блокируется до тех пор, пока все элементы в очереди не будут получены и обработаны.

Количество незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь. Счетчик уменьшается всякий раз, когда поток-потребитель вызывает task_done(), чтобы указать, что элемент был извлечен и вся работа над ним завершена. Когда количество незавершенных задач падает до нуля, join() разблокируется.

Ниже приведен обновленный код:

 import multiprocessing


class Worker(multiprocessing.Process):
    def __init__(self, task_queue: multiprocessing.Queue):
        super(Worker, self).__init__()
        self.task_queue = task_queue

    def run(self):
        for (function, *args) in iter(self.task_queue.get, None):
            print(f'Running: {function.__name__}({*args,})')

            # Run the provided function with its parameters in child process
            function(*args)

            self.task_queue.task_done() # <-- Notify queue that task is complete


def foo(task_queue: multiprocessing.Queue) -> None:
    print('foo')
    # Add new task to queue from this child process
    task_queue.put((bar, 1))


def bar(x: int) -> None:
    print(f'bar: {x}')


def main():
    # Start workers on separate processes
    workers = []
    manager = multiprocessing.Manager()
    task_queue = manager.Queue()
    for i in range(multiprocessing.cpu_count()):
        worker = Worker(task_queue)
        workers.append(worker)
        worker.start()

    # Run foo on child process using the queue as parameter
    task_queue.put((foo, task_queue))

    # Block until all items in queue are popped and completed
    task_queue.join() # <---

    for _ in workers:
        task_queue.put(None)

    # Block until workers complete and join main process
    for worker in workers:
        worker.join()

    print('Program completed.')


if __name__ == '__main__':
    main()
 

Кажется, это работает нормально. Я обновлю это, если обнаружу что-нибудь новое. Спасибо всем.

Комментарии:

1. Отличное решение. Я забыл о task_queue.task_done , который решает условие гонки, которое я не смог разрешить: как я могу узнать, когда очередь пуста и задачи не выполняются.