#python #multiprocessing #python-multiprocessing
#python #многопроцессорная обработка #python-многопроцессорная обработка
Вопрос:
Я хочу реализовать поисковый робот, который хранит данные в Mongo. Я хотел бы использовать multiprocessing
как способ «передачи» задач блокировки, таких как распаковка файлов, сканирование файлов и загрузка в Mongo. Существуют определенные задачи, которые зависят от других задач (т. Е. Файл необходимо разархивировать, прежде чем можно будет сканировать файлы), поэтому я хотел бы иметь возможность выполнять необходимые задачи и добавлять новые в ту же очередь задач.
Ниже приведено то, что у меня сейчас есть:
import multiprocessing
class Worker(multiprocessing.Process):
def __init__(self, task_queue: multiprocessing.Queue):
super(Worker, self).__init__()
self.task_queue = task_queue
def run(self):
for (function, *args) in iter(self.task_queue.get, None):
print(f'Running: {function.__name__}({*args,})')
# Run the provided function with its parameters in child process
function(*args)
self.task_queue.task_done()
def foo(task_queue: multiprocessing.Queue) -> None:
print('foo')
# Add new task to queue from this child process
task_queue.put((bar, 1))
def bar(x: int) -> None:
print(f'bar: {x}')
def main():
# Start workers on separate processes
workers = []
manager = multiprocessing.Manager()
task_queue = manager.Queue()
for i in range(multiprocessing.cpu_count()):
worker = Worker(task_queue)
workers.append(worker)
worker.start()
# Run foo on child process using the queue as parameter
task_queue.put((foo, task_queue))
for _ in workers:
task_queue.put(None)
# Block until workers complete and join main process
for worker in workers:
worker.join()
print('Program completed.')
if __name__ == '__main__':
main()
Ожидаемое поведение:
Running: foo((<AutoProxy[Queue] object, typeid 'Queue' at 0x1b963548908>,))
foo
Running: bar((1,))
bar: 1
Program completed.
Фактическое поведение:
Running: foo((<AutoProxy[Queue] object, typeid 'Queue' at 0x1b963548908>,))
foo
Program completed.
Я новичок в многопроцессорной обработке, поэтому буду признателен за любую помощь.
Комментарии:
1. Вы помещаете a
None
в очередь для каждого из рабочих (что приводит к их завершению), прежде чем у кого-либо из рабочих была возможность добавить свои собственные элементы в очередь. На самом деле, у вас интересная проблема. Вы не можете остановить ни одного работника, пока каждый работник не перейдет в состояние ожидания. Пока один рабочий все еще работает, есть шанс, что это добавит намного больше работы в очередь. Я не уверен в решении.2. Довольно неважно, но код говорит
print('Program completed.')
, что пока ваш выводProgram ended.
3. @lenin Упс, хотел бы улучшить ведение журнала. Исправлено.
4. @FrankYellin, ах да, имеет смысл. Я изучил дальнейшее ожидание, пока очередь не опустеет, и считаю, что нашел решение. Спасибо.
Ответ №1:
Как отметил @FrankYellin, это связано с тем, что None
вводится task_queue
до bar
того, как может быть добавлен.
Предполагая, что очередь будет либо непустой, либо ожидает завершения задачи во время выполнения программы (что верно в моем случае), join
можно использовать метод в очереди. Согласно документам:
Блокируется до тех пор, пока все элементы в очереди не будут получены и обработаны.
Количество незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь. Счетчик уменьшается всякий раз, когда поток-потребитель вызывает task_done(), чтобы указать, что элемент был извлечен и вся работа над ним завершена. Когда количество незавершенных задач падает до нуля, join() разблокируется.
Ниже приведен обновленный код:
import multiprocessing
class Worker(multiprocessing.Process):
def __init__(self, task_queue: multiprocessing.Queue):
super(Worker, self).__init__()
self.task_queue = task_queue
def run(self):
for (function, *args) in iter(self.task_queue.get, None):
print(f'Running: {function.__name__}({*args,})')
# Run the provided function with its parameters in child process
function(*args)
self.task_queue.task_done() # <-- Notify queue that task is complete
def foo(task_queue: multiprocessing.Queue) -> None:
print('foo')
# Add new task to queue from this child process
task_queue.put((bar, 1))
def bar(x: int) -> None:
print(f'bar: {x}')
def main():
# Start workers on separate processes
workers = []
manager = multiprocessing.Manager()
task_queue = manager.Queue()
for i in range(multiprocessing.cpu_count()):
worker = Worker(task_queue)
workers.append(worker)
worker.start()
# Run foo on child process using the queue as parameter
task_queue.put((foo, task_queue))
# Block until all items in queue are popped and completed
task_queue.join() # <---
for _ in workers:
task_queue.put(None)
# Block until workers complete and join main process
for worker in workers:
worker.join()
print('Program completed.')
if __name__ == '__main__':
main()
Кажется, это работает нормально. Я обновлю это, если обнаружу что-нибудь новое. Спасибо всем.
Комментарии:
1. Отличное решение. Я забыл о task_queue.task_done , который решает условие гонки, которое я не смог разрешить: как я могу узнать, когда очередь пуста и задачи не выполняются.