Многопроцессорная обработка не порождает все запрошенные процессы

#python #python-multiprocessing

Вопрос:

Я работаю с Orcaflex (программное обеспечение FEM для оффшорного анализа, но оно не должно иметь отношения к делу). Я создал сценарий, чтобы проверить, были ли успешно завершены выполненные мною симуляции (моделирование может завершиться неудачно из-за отсутствия сходимости). Поскольку я говорю о тысячах файлов, с которыми я пытался распараллелить процесс multiprocessing . Следуя моему коду. Извините, но я не могу привести для вас рабочий пример, но я постараюсь объяснить подробно. Я создал производный класс multiprocessing.Process и перезаписал run() его для выполнения проверок файлов моделирования. Затем __main__ я устанавливаю количество процессоров, разделяю файлы соответствующим образом и начинаю выполнение.

Проблема в том, что процессы порождаются не полностью, а в течение, как представляется, случайного промежутка времени от одного к другому. Это то, что должно быть, или я что-то упускаю? Что я имею в виду, говоря о том, что я не размножаюсь полностью, так это то, что я вижу:

 [Info/Worker-1] child process calling self.run()
 

и, например,:

 [Info/Worker-4] child process calling self.run()
 

примерно через 10 минут после запуска программы.

Заранее спасибо за любую помощь/предложения.

 import os
import subprocess
import glob
import multiprocessing
import logging
import sys
import OrcFxAPI as of

class Worker(multiprocessing.Process):

    myJobs = []

    def setJobs(self, jobList):
        self.myJobs = jobList

    @staticmethod
    def changedExtensionFileName(oldFileName, newExtension):
        return '.'.join((os.path.splitext(oldFileName)[0], newExtension))

    def run(self):
        failed = []
        model = of.Model(threadCount=1)

        for job in self.myJobs:
            try:
                print('%s starting' % job)
                sys.stdout.flush()
                model.LoadSimulation(job)
                if model.state == of.ModelState.SimulationStoppedUnstable:
                    newJob = job.replace('.sim', '.dat')
                    failed.append(newJob)

                    with open('Failed_Sim.txt', 'a') as f:
                        f.write(f'{newJob}n')
                        f.close()

                    model.LoadData(newJob)
                    model.general.ImplicitConstantTimeStep /= 2
                    model.SaveData(newJob)
                    print(f'{job} has failed, reducing time step')

            except of.DLLError as err:
                print('%s ERROR: %s' % (job, err))
                sys.stdout.flush()
                with open(self.changedExtensionFileName(job, 'FAIL'), 'w') as f:
                    f.write('%s error: %s' % (job, err))
                    f.close()
        return



if __name__ == '__main__':
    import re
    sim_file = [f for f in os.listdir() if re.search(r'dddd.*.sim', f)]    

    # begin multprocessing
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)

    corecount = 14 

    workers = []

    chunkSize = int(len(sim_file) / corecount)
    chunkRemainder = int(len(sim_file) % corecount)
    print('%s jobs found, dividing across %s workers - %s each remainder %s' % (str(len(sim_file)), str(corecount), chunkSize, chunkRemainder))

    start = 0
    for coreNum in range(0, corecount):
        worker = Worker()
        workers.append(worker)
        end = start   chunkSize
        if chunkRemainder>0:
            chunkRemainder -= 1
            end  = 1
        if end>len(sim_file):
            end = len(sim_file)
        worker.setJobs(sim_file[start:end])
        worker.start()
        start = end
        if start>=len(sim_file):
            break

    for worker in workers:
        worker.join()
    print('Done...')
 

Комментарии:

1. Процессы нереста должны длиться порядка секунд. Когда вы добавляете работника к работникам — разве это не должно быть сделано в сценарии? (после worker.setJobs) Или это не имеет значения? Обычно я бы занимался SMP с помощью apply_async, поэтому, к сожалению, не очень знаком с вашим подходом (и поэтому я не очень полезен!). Если никто не ответит быстрым решением, я предложу, что бы я сделал, но это было бы немного реструктуризацией, так что, возможно, лучше избегать!

2. @Amiga500 я тоже так подумал.. Я тоже ожидал увидеть, что все процессы будут происходить почти одновременно.

Ответ №1:

Хорошо, так что никто не поднял руку, чтобы ответить на это незначительной поправкой (что я не знаю, как сделать!), Так что вот вам более крупное предложение о пересмотре…

 def worker(inpData):
    #The worker process

    failed1 = []
    failed2 = []

    for job in inpData:   #I'm not sure of the data shape of the chunks, has your original method split them into coherent chunks capable of being processed independently? My step here could be wrong. 
        try:
            #print('%s starting' % job)  #Prints won't appear on console from worker processes from windows, so commented them all out
            model.LoadSimulation(job)
            if model.state == of.ModelState.SimulationStoppedUnstable:
                newJob = job.replace('.sim', '.dat')
                failed1.append(newJob)

                #I'd recommend we pass the list "failed" back to master and write to text from there, otherwise you could have several processes updating the text file at once, leading to possible loss of data
                #with open('Failed_Sim.txt', 'a') as f:
                #     f.write(f'{newJob}n')
                #     f.close()

                model.LoadData(newJob)
                model.general.ImplicitConstantTimeStep /= 2
                model.SaveData(newJob)
                #print(f'{job} has failed, reducing time step')   

            except of.DLLError as err:
                #print('%s ERROR: %s' % (job, err))
                #sys.stdout.flush()
                #with open(self.changedExtensionFileName(job, 'FAIL'), 'w') as f:
                #    f.write('%s error: %s' % (job, err))
                #    f.close()
                failed2.append(job)

#Note I've made two failed lists to pass back, for both failure types

return failed1, failed2


if __name__ == "__main__":
    import re
    import multiprocessing as mp
    nCPUs = mp.cpu_count()

    sim_file = [f for f in os.listdir() if re.search(r'dddd.*.sim', f)] 

    #Make the chunks
    chunkSize = int(len(sim_file) / corecount)
    chunkRemainder = int(len(sim_file) % corecount)
    print('%s jobs found, dividing across %s workers - %s each remainder %s' % (str(len(sim_file)), str(corecount), chunkSize, chunkRemainder))

    chunks = []
    start = 0
    for iChunk in range(0, nCPUs)
        end = start   chunkSize
        if chunkRemainder>0:
            chunkRemainder -= 1
            end  = 1
            if end>len(sim_file):
                end = len(sim_file)
        chunk.append(sim_file[start:end])


    #Send to workers
    pool = mp.Pool(processes=nCPUs)
    futA = []

    for iChunk in range(0, nCPUs):
        futA.append(pool.apply_async(worker, args=(chunk[iChunk],))
    

    #Gather results
    if futA:
        failedDat = []
        failedSim = []
        for iChunk in range(0, len(futA)):
            resA, resB = futA[iChunk].get()
            failedDat.extend(resA)
            failedSim.extend(resB)
    pool.close()
            
    if failedDat:
        print("Following jobs failed, reducing timesteps:")
        print(failedDat)
    if failedSim:
        print("Following sims failed due to errors")
        print(failedSim) 
 

Комментарии:

1. спасибо за усилия, я обязательно попробую ваш подход! Спасибо, что поделились

2. Возможно, я не совсем точно знаю, что вы делаете в своем наборе данных, но бит многопроцессорной обработки будет звучать. Если вы можете запустить фрагмент через worker() в одном процессе, то единственное, что остановит его выполнение в многопроцессорном режиме, — это необходимость передачи данных процессу или создания новых импортных данных в этом процессе worker (). (Поскольку порождение процесса не извлекает данные из глобальных объектов, которые в противном случае могли бы быть доступны, но я ожидаю, что вы уже сталкивались с этим.)