как остановить запущенный подпроцесс при завершении другого подпроцесса (в многопроцессорной обработке)

#python #multiprocessing

#python #многопроцессорная обработка

Вопрос:

Я столкнулся с проблемой с многопроцессорной обработкой:

 class PythonHelper(object):
    @staticmethod
    def run_in_parallel(*functions):
        processes=list()
        for function in functions:
            process=Process(target=function)
            process.start()
            processes.append(process)
        for process in processes:
            process.join()
  

Вышеупомянутый статический метод используется мной для одновременного запуска нескольких функций (объединяя их в одном процессе). Все было хорошо, пока я не столкнулся с необходимостью принудительного завершения процесса при завершении одного из «подпроцессов».

Например:

 from PythonHelper import PythonHelper as ph
from Recorder import Recorder

class Logger(object):

    def run_recorder_proc(self):
        rec=Recorder()
        rec.record_video()

    def run_printer_proc(self):
    #hypothetical function: execution takes a long time
        for i in range(9000000):
            print("number: {}".format(i))

    def run_logger(self):
        ph.run_in_parallel(self.run_printer_proc,self.run_recorder_proc)
  

self.run_printer_proc и self.run_recorder_proc — это мои подпроцессы. Как «убить» оставшийся подпроцесс, пока один из них был завершен?


Редактировать: Полный исходный код:

 class PythonHelper(object):
    @staticmethod
    #with your fix
    def run_in_parallel(*functions):
        processes={}
        for function in functions:
            process=Process(target=function)
            process.start()
            processes[process.pid]=process
        # wait for any process to complete
        pid, status = os.waitpid(-1, 0)
        # one process terminated
        # join it
        processes[pid].join()
        del processes[pid]
        # terminate the rest
        for process in processes.values():
            process.terminate()
        for process in processes.values():
            process.join()


class Logger(object):
    def run_numbers_1(self):
        for i in range(900000):
            print("number: {}".format(i))
    def run_numbers_2(self):
        for i in range(100000):
            print("number: {}".format(i))
    def run_logger(self):
        ph.run_in_parallel(self.run_numbers_1,self.run_numbers_2)

if __name__=="__main__":
    logger=Logger()
    logger.run_logger()
  

Основываясь на приведенном выше примере, я хотел бы принудительно завершить run_numbers_1 во время завершения run_numbers_2.

Ответ №1:

Вы можете достичь этого, слегка изменив run_in_parallel() :

 def run_in_parallel(*functions):
    processes={}
    for function in functions:
        process=Process(target=function)
        process.start()
        processes[process.pid]=process
    # wait for any process to complete
    pid, status = os.waitpid(-1, 0) 
    # one process terminated
    # join it
    processes[pid].join()
    del processes[pid]
    # terminate the rest
    for process in processes.itervalues():
        process.terminate()
    for process in processes.itervalues():
        process.join()
  

[Обновить]
На основе вашего полного кода вот рабочий пример. Вместо race-prognant os.waitpid() он использует Event object, который другие процессы устанавливают по завершении:

 from multiprocessing import Process, Event

class MyProcess(Process):
    def __init__(self, event, *args, **kwargs):
        self.event = event
        Process.__init__(self, *args, **kwargs)

    def run(self):
        Process.run(self)
        self.event.set()

class PythonHelper(object):
    @staticmethod
    #with your fix
    def run_in_parallel(*functions):
        event = Event()
        processes=[]
        for function in functions:
            process=MyProcess(event, target=function)
            process.start()
            processes.append(process)
        # wait for any process to complete
        event.wait()
        # one process completed
        # terminate all child processes
        for process in processes:
            process.terminate()
        for process in processes:
            process.join()


class Logger(object):
    def run_numbers_1(self):
        for i in range(90000):
            print("1 number: {}".format(i))
    def run_numbers_2(self):
        for i in range(10000):
            print("2 number: {}".format(i))
    def run_logger(self):
        PythonHelper.run_in_parallel(self.run_numbers_1,self.run_numbers_2)

if __name__=="__main__":
    logger=Logger()
    logger.run_logger()
  

Комментарии:

1. К сожалению, это решение не хочет работать. Каково начальное значение pid? Каков статус? Он не используется

2. Должно быть, дочерние процессы завершились до его вызова waitpid . Можете ли вы опубликовать полный скрипт для его тестирования?

3. Я опубликовал полный исходный код. Я полагаю, проблема может быть в waitpid. Я думаю, вы счастливый пользователь Linux 🙂 К сожалению, в настоящее время я работаю над Win, и это может быть проблемой.

4. Я изменил исходный код (слишком много библиотек для вставки), но наиболее важные определения исходного кода являются исходными. run_numbers_1 и run_numbers_2 являются примерами def’ов, которые завершались в разные моменты.

5. Я вижу, что вы запускаете его в Windose, в котором отсутствует fork() , так что дочерний процесс не может получить доступ к классу, определяемому областью действия функции. Я обновил код, но на данный момент у меня нет Windows, чтобы попробовать это.