#python #multiprocessing
#python #многопроцессорная обработка
Вопрос:
Я столкнулся с проблемой с многопроцессорной обработкой:
class PythonHelper(object):
@staticmethod
def run_in_parallel(*functions):
processes=list()
for function in functions:
process=Process(target=function)
process.start()
processes.append(process)
for process in processes:
process.join()
Вышеупомянутый статический метод используется мной для одновременного запуска нескольких функций (объединяя их в одном процессе). Все было хорошо, пока я не столкнулся с необходимостью принудительного завершения процесса при завершении одного из «подпроцессов».
Например:
from PythonHelper import PythonHelper as ph
from Recorder import Recorder
class Logger(object):
def run_recorder_proc(self):
rec=Recorder()
rec.record_video()
def run_printer_proc(self):
#hypothetical function: execution takes a long time
for i in range(9000000):
print("number: {}".format(i))
def run_logger(self):
ph.run_in_parallel(self.run_printer_proc,self.run_recorder_proc)
self.run_printer_proc и self.run_recorder_proc — это мои подпроцессы. Как «убить» оставшийся подпроцесс, пока один из них был завершен?
Редактировать: Полный исходный код:
class PythonHelper(object):
@staticmethod
#with your fix
def run_in_parallel(*functions):
processes={}
for function in functions:
process=Process(target=function)
process.start()
processes[process.pid]=process
# wait for any process to complete
pid, status = os.waitpid(-1, 0)
# one process terminated
# join it
processes[pid].join()
del processes[pid]
# terminate the rest
for process in processes.values():
process.terminate()
for process in processes.values():
process.join()
class Logger(object):
def run_numbers_1(self):
for i in range(900000):
print("number: {}".format(i))
def run_numbers_2(self):
for i in range(100000):
print("number: {}".format(i))
def run_logger(self):
ph.run_in_parallel(self.run_numbers_1,self.run_numbers_2)
if __name__=="__main__":
logger=Logger()
logger.run_logger()
Основываясь на приведенном выше примере, я хотел бы принудительно завершить run_numbers_1 во время завершения run_numbers_2.
Ответ №1:
Вы можете достичь этого, слегка изменив run_in_parallel()
:
def run_in_parallel(*functions):
processes={}
for function in functions:
process=Process(target=function)
process.start()
processes[process.pid]=process
# wait for any process to complete
pid, status = os.waitpid(-1, 0)
# one process terminated
# join it
processes[pid].join()
del processes[pid]
# terminate the rest
for process in processes.itervalues():
process.terminate()
for process in processes.itervalues():
process.join()
[Обновить]
На основе вашего полного кода вот рабочий пример. Вместо race-prognant os.waitpid()
он использует Event
object, который другие процессы устанавливают по завершении:
from multiprocessing import Process, Event
class MyProcess(Process):
def __init__(self, event, *args, **kwargs):
self.event = event
Process.__init__(self, *args, **kwargs)
def run(self):
Process.run(self)
self.event.set()
class PythonHelper(object):
@staticmethod
#with your fix
def run_in_parallel(*functions):
event = Event()
processes=[]
for function in functions:
process=MyProcess(event, target=function)
process.start()
processes.append(process)
# wait for any process to complete
event.wait()
# one process completed
# terminate all child processes
for process in processes:
process.terminate()
for process in processes:
process.join()
class Logger(object):
def run_numbers_1(self):
for i in range(90000):
print("1 number: {}".format(i))
def run_numbers_2(self):
for i in range(10000):
print("2 number: {}".format(i))
def run_logger(self):
PythonHelper.run_in_parallel(self.run_numbers_1,self.run_numbers_2)
if __name__=="__main__":
logger=Logger()
logger.run_logger()
Комментарии:
1. К сожалению, это решение не хочет работать. Каково начальное значение pid? Каков статус? Он не используется
2. Должно быть, дочерние процессы завершились до его вызова
waitpid
. Можете ли вы опубликовать полный скрипт для его тестирования?3. Я опубликовал полный исходный код. Я полагаю, проблема может быть в waitpid. Я думаю, вы счастливый пользователь Linux 🙂 К сожалению, в настоящее время я работаю над Win, и это может быть проблемой.
4. Я изменил исходный код (слишком много библиотек для вставки), но наиболее важные определения исходного кода являются исходными. run_numbers_1 и run_numbers_2 являются примерами def’ов, которые завершались в разные моменты.
5. Я вижу, что вы запускаете его в Windose, в котором отсутствует
fork()
, так что дочерний процесс не может получить доступ к классу, определяемому областью действия функции. Я обновил код, но на данный момент у меня нет Windows, чтобы попробовать это.