Ошибка выполнения в Windows при попытке многопроцессорной обработки python

#python #multiprocessing

#python #Windows #многопроцессорная обработка

Вопрос:

Я пробую свою самую первую официальную программу на python с использованием многопоточности и многопроцессорности на компьютере с Windows. Однако я не могу запустить процессы, поскольку python выдает следующее сообщение. Дело в том, что я не запускаю свои потоки в основном модуле. Потоки обрабатываются в отдельном модуле внутри класса.

РЕДАКТИРОВАТЬ: Кстати, этот код отлично работает в ubuntu. Не совсем в Windows

 RuntimeError: 
            Attempt to start a new process before the current process
            has finished its bootstrapping phase.
            This probably means that you are on Windows and you have
            forgotten to use the proper idiom in the main module:
                if __name__ == '__main__':
                    freeze_support()
                    ...
            The "freeze_support()" line can be omitted if the program
            is not going to be frozen to produce a Windows executable.
  

Мой исходный код довольно длинный, но я смог воспроизвести ошибку в сокращенной версии кода. Он разделен на два файла, первый является основным модулем и выполняет очень мало, кроме импорта модуля, который обрабатывает процессы / потоки и вызывает метод. Во втором модуле находится основная часть кода.


testMain.py:

 import parallelTestModule

extractor = parallelTestModule.ParallelExtractor()
extractor.runInParallel(numProcesses=2, numThreads=4)
  

parallelTestModule.py:

 import multiprocessing
from multiprocessing import Process
import threading

class ThreadRunner(threading.Thread):
    """ This class represents a single instance of a running thread"""
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name
    def run(self):
        print self.name,'n'

class ProcessRunner:
    """ This class represents a single instance of a running process """
    def runp(self, pid, numThreads):
        mythreads = []
        for tid in range(numThreads):
            name = "Proc-" str(pid) "-Thread-" str(tid)
            th = ThreadRunner(name)
            mythreads.append(th) 
        for i in mythreads:
            i.start()
        for i in mythreads:
            i.join()

class ParallelExtractor:    
    def runInParallel(self, numProcesses, numThreads):
        myprocs = []
        prunner = ProcessRunner()
        for pid in range(numProcesses):
            pr = Process(target=prunner.runp, args=(pid, numThreads)) 
            myprocs.append(pr) 
#        if __name__ == 'parallelTestModule':    #This didnt work
#        if __name__ == '__main__':              #This obviously doesnt work
#        multiprocessing.freeze_support()        #added after seeing error to no avail
        for i in myprocs:
            i.start()

        for i in myprocs:
            i.join()
  

Комментарии:

1. @doctorlove я запускаю его как python testMain.py

2. Конечно, вам нужен if name == ‘ main ‘ см. Ответы и документы

3. @NGAlgo Ваш скрипт был мне очень полезен, когда я отлаживал проблему с pymongo и многопроцессорной обработкой. Спасибо!

Ответ №1:

В Windows подпроцессы будут импортировать (т. Е. Выполнять) Основной модуль при запуске. Вам необходимо вставить if __name__ == '__main__': защиту в основной модуль, чтобы избежать рекурсивного создания подпроцессов.

Изменено testMain.py :

 import parallelTestModule

if __name__ == '__main__':    
    extractor = parallelTestModule.ParallelExtractor()
    extractor.runInParallel(numProcesses=2, numThreads=4)
  

Комментарии:

1. (хлопает себя ладонью по лбу) Дох! Это работает!!!! Большое вам спасибо! Мне не хватало того факта, что повторно импортируется исходный основной модуль! Все это время я пытался проверить » name ==» прямо перед тем, как запустить свои процессы.

2. Кажется, я не могу импортировать ‘parallelTestModule’. Я использую Python 2.7. Должен ли он работать из коробки?

3. @Jonny Код для parallelTestModule. py является частью вопроса.

4. @DeshDeepSingh Фрагмент кода не является отдельным примером; это модификация кода OP

5. @DeshDeepSingh Этот модуль является частью вопроса.

Ответ №2:

Попробуйте поместить свой код внутри основной функции в testMain.py

 import parallelTestModule

if __name__ ==  '__main__':
  extractor = parallelTestModule.ParallelExtractor()
  extractor.runInParallel(numProcesses=2, numThreads=4)
  

Смотрите Документы:

 "For an explanation of why (on Windows) the if __name__ == '__main__' 
part is necessary, see Programming guidelines."
  

что говорит

«Убедитесь, что основной модуль может быть безопасно импортирован новым интерпретатором Python, не вызывая непреднамеренных побочных эффектов (например, запуска нового процесса)».

… используя if __name__ == '__main__'

Ответ №3:

Хотя предыдущие ответы верны, есть небольшое осложнение, на которое было бы полезно обратить внимание.

В случае, если ваш основной модуль импортирует другой модуль, в котором глобальные переменные или переменные-члены класса определены и инициализированы для (или с использованием) некоторых новых объектов, вам, возможно, придется обусловить этот импорт таким же образом:

 if __name__ ==  '__main__':
  import my_module
  

Ответ №4:

Как сказал @Ofer, когда вы используете другие библиотеки или модули, вы должны импортировать их все в if __name__ == '__main__':

Итак, в моем случае все закончилось так:

 if __name__ == '__main__':       
    import librosa
    import os
    import pandas as pd
    run_my_program()
  

Ответ №5:

привет, вот моя структура для многопроцессорного

 from multiprocessing import Process
import time


start = time.perf_counter()


def do_something(time_for_sleep):
    print(f'Sleeping {time_for_sleep} second...')
    time.sleep(time_for_sleep)
    print('Done Sleeping...')



p1 = Process(target=do_something, args=[1])
p2 = Process(target=do_something, args=[2])


if __name__ == '__main__':
    p1.start()
    p2.start()

    p1.join()
    p2.join()

    finish = time.perf_counter()
    print(f'Finished in {round(finish-start,2 )} second(s)')
  

вам не нужно помещать импорт в if __name__ == '__main__': , просто запустите программу, которую вы хотите запустить внутри

Комментарии:

1. Я делаю то же самое. Однако я замечаю, что это приводит к тому, что процесс выполняется немного медленнее (в Windows). В частности, за time_for_sleep = 1 секунду процесс занимает 2 секунды, тогда как в Linux он занимает 1,02 секунды. Для time_for_sleep = 10 это занимает 11 секунд. Таким образом, у wondows накладные расходы составляют ~ 1 секунду.

Ответ №6:

В yolo v5 с python 3.8.5

 if __name__ == '__main__':
    from yolov5 import train
    train.run()
  

Ответ №7:

В моем случае это была простая ошибка в коде, использующая переменную до ее создания. Стоит проверить это, прежде чем пытаться использовать вышеуказанные решения. Почему я получил это конкретное сообщение об ошибке, Господь знает.

Ответ №8:

Приведенное ниже решение должно работать как для многопроцессорной обработки Python, так и для многопроцессорной обработки pytorch.

В качестве других ответов упоминалось, что исправление должно быть if __name__ == '__main__': , но я столкнулся с несколькими проблемами при определении, с чего начать, потому что я использую несколько сценариев и модулей. Когда я могу вызвать свою первую функцию внутри main, тогда все, что было до того, как оно начало создавать несколько процессов (не уверен, почему).

Помещение его в самую первую строку (даже до импорта) сработало. Только вызов первой функции возвращает ошибку тайм-аута. Ниже приведен первый файл моего кода, и многопроцессорная обработка используется после вызова нескольких функций, но включение main в первую, похоже, единственное исправление здесь.

 if __name__ == '__main__':
    from mjrl.utils.gym_env import GymEnv
    from mjrl.policies.gaussian_mlp import MLP
    from mjrl.baselines.quadratic_baseline import QuadraticBaseline
    from mjrl.baselines.mlp_baseline import MLPBaseline
    from mjrl.algos.npg_cg import NPG
    from mjrl.algos.dapg import DAPG
    from mjrl.algos.behavior_cloning import BC
    from mjrl.utils.train_agent import train_agent
    from mjrl.samplers.core import sample_paths
    import os
    import json
    import mjrl.envs
    import mj_envs
    import time as timer
    import pickle
    import argparse

    import numpy as np 

    # ===============================================================================
    # Get command line arguments
    # ===============================================================================

    parser = argparse.ArgumentParser(description='Policy gradient algorithms with demonstration data.')
    parser.add_argument('--output', type=str, required=True, help='location to store results')
    parser.add_argument('--config', type=str, required=True, help='path to config file with exp params')
    args = parser.parse_args()
    JOB_DIR = args.output
    if not os.path.exists(JOB_DIR):
        os.mkdir(JOB_DIR)
    with open(args.config, 'r') as f:
        job_data = eval(f.read())
    assert 'algorithm' in job_data.keys()
    assert any([job_data['algorithm'] == a for a in ['NPG', 'BCRL', 'DAPG']])
    job_data['lam_0'] = 0.0 if 'lam_0' not in job_data.keys() else job_data['lam_0']
    job_data['lam_1'] = 0.0 if 'lam_1' not in job_data.keys() else job_data['lam_1']
    EXP_FILE = JOB_DIR   '/job_config.json'
    with open(EXP_FILE, 'w') as f:
        json.dump(job_data, f, indent=4)

    # ===============================================================================
    # Train Loop
    # ===============================================================================

    e = GymEnv(job_data['env'])
    policy = MLP(e.spec, hidden_sizes=job_data['policy_size'], seed=job_data['seed'])
    baseline = MLPBaseline(e.spec, reg_coef=1e-3, batch_size=job_data['vf_batch_size'],
                           epochs=job_data['vf_epochs'], learn_rate=job_data['vf_learn_rate'])

    # Get demonstration data if necessary and behavior clone
    if job_data['algorithm'] != 'NPG':
        print("========================================")
        print("Collecting expert demonstrations")
        print("========================================")
        demo_paths = pickle.load(open(job_data['demo_file'], 'rb'))

        ########################################################################################
        demo_paths = demo_paths[0:3]
        print (job_data['demo_file'], len(demo_paths))
        for d in range(len(demo_paths)):
            feats = demo_paths[d]['features']
            feats = np.vstack(feats)
            demo_paths[d]['observations'] = feats

        ########################################################################################

        bc_agent = BC(demo_paths, policy=policy, epochs=job_data['bc_epochs'], batch_size=job_data['bc_batch_size'],
                      lr=job_data['bc_learn_rate'], loss_type='MSE', set_transforms=False)

        in_shift, in_scale, out_shift, out_scale = bc_agent.compute_transformations()
        bc_agent.set_transformations(in_shift, in_scale, out_shift, out_scale)
        bc_agent.set_variance_with_data(out_scale)

        ts = timer.time()
        print("========================================")
        print("Running BC with expert demonstrations")
        print("========================================")
        bc_agent.train()
        print("========================================")
        print("BC training complete !!!")
        print("time taken = %f" % (timer.time() - ts))
        print("========================================")

        # if job_data['eval_rollouts'] >= 1:
        #     score = e.evaluate_policy(policy, num_episodes=job_data['eval_rollouts'], mean_action=True)
        #     print("Score with behavior cloning = %f" % score[0][0])

    if job_data['algorithm'] != 'DAPG':
        # We throw away the demo data when training from scratch or fine-tuning with RL without explicit augmentation
        demo_paths = None

    # ===============================================================================
    # RL Loop
    # ===============================================================================

    rl_agent = DAPG(e, policy, baseline, demo_paths,
                    normalized_step_size=job_data['rl_step_size'],
                    lam_0=job_data['lam_0'], lam_1=job_data['lam_1'],
                    seed=job_data['seed'], save_logs=True
                    )

    print("========================================")
    print("Starting reinforcement learning phase")
    print("========================================")


    ts = timer.time()
    train_agent(job_name=JOB_DIR,
                agent=rl_agent,
                seed=job_data['seed'],
                niter=job_data['rl_num_iter'],
                gamma=job_data['rl_gamma'],
                gae_lambda=job_data['rl_gae'],
                num_cpu=job_data['num_cpu'],
                sample_mode='trajectories',
                num_traj=job_data['rl_num_traj'],
                num_samples= job_data['rl_num_samples'],
                save_freq=job_data['save_freq'],
                evaluation_rollouts=job_data['eval_rollouts'])
    print("time taken = %f" % (timer.time()-ts))
  

Ответ №9:

Я столкнулся с той же проблемой. метод @ofter является правильным, потому что есть некоторые детали, на которые следует обратить внимание. Ниже приведен код успешной отладки, который я изменил для вашей справки:

 
if __name__ == '__main__':
    import matplotlib.pyplot as plt
    import numpy as np
    def imgshow(img):
        img = img / 2   0.5
        np_img = img.numpy()
        plt.imshow(np.transpose(np_img, (1, 2, 0)))
        plt.show()

    dataiter = iter(train_loader)
    images, labels = dataiter.next()

    imgshow(torchvision.utils.make_grid(images))
    print(' '.join('%5s' % classes[labels[i]] for i in range(4)))
  

Для справки, у меня нет подпрограммы, у меня просто есть основная программа, но у меня та же проблема, что и у вас. Это демонстрирует, что при импорте файла библиотеки Python в середине сегмента программы мы должны добавить:

 if __name__ == '__main__':
  

Ответ №10:

Я попробовал приемы, упомянутые выше, в следующем очень простом коде. но я все еще не могу остановить его сброс на любом из моих оконных компьютеров с Python 3.8 / 3.10. Я был бы очень признателен, если бы вы могли сказать мне, где я ошибаюсь.

 print('script reset')

def do_something(inp):
    print('Done!')

if __name__ == '__main__':
    from multiprocessing import Process, get_start_method
    print('main reset')
    print(get_start_method())
    Process(target=do_something, args=[1]).start()
    print('Finished')
  

вывод отображает:

 script reset
main reset
spawn
Finished
script reset
Done!
  

Обновить:

Насколько я понимаю, вы, ребята, не препятствуете сбросу скрипта, содержащего __main__ или .start() (что не происходит в Linux), скорее вы предлагаете обходные пути, чтобы мы не увидели сброс. Нужно сделать весь импорт минимальным и поместить их в каждую функцию отдельно, но это все равно, по сравнению с Linux, медленно.