Пакетный запуск процессов в grid Engine с использованием map_async()

#python #python-3.x #parallel-processing #multiprocessing #subprocess

#python #python-3.x #параллельная обработка #многопроцессорная обработка #подпроцесс

Вопрос:

Я пытаюсь протестировать свой распараллеленный код на Sun Grid Engine. Мой код, похоже, отлично работает на моей локальной машине Linux, однако я ограничен локально из-за небольшого количества процессоров. Мой код связан с некоторым другим кодом, который выполняет некоторые более сложные вычисления на основе некоторых входных данных, которые вычисляет мой код. Я стремлюсь запустить около 7000 процессов в виде блоков по 31, используя 32 процессора, которые доступны мне непосредственно в grid Engine, без отправки в систему очередей. Мой код выполняет некоторые действия с использованием этих 32 процессоров, а затем отправляет задание в систему очередей с помощью подпроцесса.Popen([qsub submission_script.sh ], shell=True, стандартный вывод=подпроцесс.Для каждого из этих заданий, отправленных в очередь, требуется 8 процессоров, каждый с 6 ГБ переменной памяти. Используя Pool и map_async, я могу отправлять пакеты из 31 задания в очередь с помощью моего скрипта, пока не будут выполнены все 7000 входных данных. Затем мой код ожидает завершения заданий в очереди, извлекает из них некоторые данные, а затем на основе этого генерирует еще один пакет заданий для повторной отправки в очередь (таким образом, для каждого из 7000 начальных заданий есть 2 пакета отправки).

 def run(gridPoint):
    """Takes each geometry and an index for each grid point and runs the series
       of calculations specified by the user for either quantum chemistry code
       in parallel. Data for each grid point is sorted in global arrays according
       to the index."""
    index, geom = gridPoint

    if inputs['code'] == 'molpro':

        [workdirSPE, inputSPE] = molpro.setupSPE(inputs, geom, pwd, index)
        [normalTermination, outputSPE] = util.runCalculation(inputs['hpc'], molproKeys, pwd, workdirSPE, inputSPE, submissionScript, index) # Submit first round of calculations to grid engine and wait
        if normalTermination:  # If normal termination extract some data
            data.energiesExtract(workdirSPE outputSPE, inputs['spe'], molproKeys['energy_regex'],
                                  molproKeys['cas_prog'], index)

            if inputs['nacme'] == 'yes':  # Then submit another round of calculations to the grid engine based off initial ones - My script just stops here on the grid engine - although is fine on my local machine.
                [nacmeWorkdir, nacmeInput, daxes] = molpro.nacmeSetup(inputs, geom, workdirSPE, index)
                [nacmeNormalTermination, nacmeOutput] = util.runCalculation(inputs['hpc'], molproKeys, pwd, nacmeWorkdir, nacmeInput, submissionScript, index) 
                if nacmeNormalTermination:
                    data.nacmeExtract(nacmeWorkdir nacmeOutput, molproKeys['nacme_regex'], index, daxes)
                else:
                    data.nacmes[:, :, :, index] = 'NaN'

            if inputs['grad'] == 'yes':
                [gradWorkdir, gradInput] = molpro.gradientSetup(inputs, geom, workdirSPE, index)
                [gradNormalTermination, gradOutput] = util.runCalculation(inputs['hpc'], molproKeys, pwd, gradWorkdir, gradInput, submissionScript, index)  # Submit third round of calcualtions to grid engine.
                if gradNormalTermination:
                    data.gradExtractMolpro(gradWorkdir gradOutput, molproKeys['grad_regex'], molproKeys['numerical'], index)
                else:
                    data.grads[:, :, :, index] = 'NaN'

        else:
            data.energies[index, :] = 'NaN'
            if inputs['nacme'] == 'yes':
                data.nacmes[:, :, :, index] = 'NaN'
            if inputs['grad'] == 'yes':
                data.grads[:, :, :, index] = 'NaN'


    elif inputs['code'] == 'molcas':   # TO DO
        pass


if __name__ == "__main__":
    pwd = os.getcwd()  # parent dir for all calculations
    listGeom = coordinateGenerator(refGeom)  # Generate coordinates
    if inputs['code'] == 'molpro':  # All possible state couplings
        couplings = molpro.stateCouplings(inputs['states'][-1])
    elif inputs['code'] == 'molcas':
        pass
    pmanager = setup.ProccessManager()  # Force global mem sharing for ouput data
    pmanager.start()
    data = setup.datastore(refGeom, inputs['states'][-1], couplings, len(listGeom), pmanager)
    cpuCount = multiprocessing.cpu_count()-2
    runPool = multiprocessing.Pool(processes=cpuCount)
    runPool.map_async(run, [(k, v) for k, v in enumerate(listGeom)]) # listGeom contains 7000 inputs to be run in batches of n (where n=cpuCount).
    runPool.close()
    runPool.join()
    np.save('ENERGIES.npy', data.energies)  # Extract data
    if inputs['nacme'] == 'yes':
        np.save('NACMES.npy', data.nacmes)
    if inputs['grad'] == 'yes':
 

Функция, которая выполняет отправку в grid engine, выглядит следующим образом,

 def runCalculation(system: str, codeKeys: dict, pwd: str, workdir: str, inputfile: str, submitscript: str, index: int):
    '''Submits calculations to either a local linux OS, or a HPC running eithers Sun Grid Engine or PBS Pro.                                                                                                                                                                               
       Waits for calculation to finish running and then returns True if it terminated with no errors. '''
    outputfile = inputfile.split('.')[0] '.out'
    os.chdir('%s' % (workdir))

    if system == 'local':  #  Run calculation and wait for termination in each case
        runLocal(codeKeys, inputfile)
    elif system == 'sun grid engine':
        gridEngineScript = setup_submit_script(submitscript, inputfile, index)
        qsub(gridEngineScript)
    elif system == 'pbs':
        pass

    terminationCode = calculationTermination(codeKeys, outputfile)  #  check normal termination
    os.chdir(pwd)
    return terminationCode


def subprocess_cmd(command, return_stdout):  # Used for submission using qsub to grid engine.
    ''' Returns output for a shell command. '''
    process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True, universal_newlines=True)
    proc_stdout = process.communicate()[0].strip()
    if return_stdout:
        return(proc_stdout)
 

Я опустил здесь некоторые функции для упрощения, но в основном каждый вызов runCalculation() отправляется в механизм сетки с использованием подпроцесса.Попен(). При второй отправке я просто получаю сообщение об ошибке RuntimeError: can't start new thread , я не уверен, почему as ulimit -u говорит мне, что у меня есть ограничение в 200 потоков, и я выполняю эти вычисления только партиями по 31, когда у меня есть 32 процессора, доступных напрямую, и тяжелая работа, выполняемая программой, с которой я взаимодействую, заключается в отправил очередь. Проверка процессов, которые я запускаю ps -fLu [uid] , показывает, что дополнительных неожиданных запущенных процессов нет. Сокращение количества выполняемых вычислений до 31 или запуск моего скрипта на python с использованием всего 10 процессоров по-прежнему приводит к этой проблеме.

Если у кого-нибудь есть представление о том, что происходит, это было бы здорово. Спасибо!