#python #python-3.x #parallel-processing #multiprocessing #subprocess
#python #python-3.x #параллельная обработка #многопроцессорная обработка #подпроцесс
Вопрос:
Я пытаюсь протестировать свой распараллеленный код на Sun Grid Engine. Мой код, похоже, отлично работает на моей локальной машине Linux, однако я ограничен локально из-за небольшого количества процессоров. Мой код связан с некоторым другим кодом, который выполняет некоторые более сложные вычисления на основе некоторых входных данных, которые вычисляет мой код. Я стремлюсь запустить около 7000 процессов в виде блоков по 31, используя 32 процессора, которые доступны мне непосредственно в grid Engine, без отправки в систему очередей. Мой код выполняет некоторые действия с использованием этих 32 процессоров, а затем отправляет задание в систему очередей с помощью подпроцесса.Popen([qsub submission_script.sh ], shell=True, стандартный вывод=подпроцесс.Для каждого из этих заданий, отправленных в очередь, требуется 8 процессоров, каждый с 6 ГБ переменной памяти. Используя Pool и map_async, я могу отправлять пакеты из 31 задания в очередь с помощью моего скрипта, пока не будут выполнены все 7000 входных данных. Затем мой код ожидает завершения заданий в очереди, извлекает из них некоторые данные, а затем на основе этого генерирует еще один пакет заданий для повторной отправки в очередь (таким образом, для каждого из 7000 начальных заданий есть 2 пакета отправки).
def run(gridPoint):
"""Takes each geometry and an index for each grid point and runs the series
of calculations specified by the user for either quantum chemistry code
in parallel. Data for each grid point is sorted in global arrays according
to the index."""
index, geom = gridPoint
if inputs['code'] == 'molpro':
[workdirSPE, inputSPE] = molpro.setupSPE(inputs, geom, pwd, index)
[normalTermination, outputSPE] = util.runCalculation(inputs['hpc'], molproKeys, pwd, workdirSPE, inputSPE, submissionScript, index) # Submit first round of calculations to grid engine and wait
if normalTermination: # If normal termination extract some data
data.energiesExtract(workdirSPE outputSPE, inputs['spe'], molproKeys['energy_regex'],
molproKeys['cas_prog'], index)
if inputs['nacme'] == 'yes': # Then submit another round of calculations to the grid engine based off initial ones - My script just stops here on the grid engine - although is fine on my local machine.
[nacmeWorkdir, nacmeInput, daxes] = molpro.nacmeSetup(inputs, geom, workdirSPE, index)
[nacmeNormalTermination, nacmeOutput] = util.runCalculation(inputs['hpc'], molproKeys, pwd, nacmeWorkdir, nacmeInput, submissionScript, index)
if nacmeNormalTermination:
data.nacmeExtract(nacmeWorkdir nacmeOutput, molproKeys['nacme_regex'], index, daxes)
else:
data.nacmes[:, :, :, index] = 'NaN'
if inputs['grad'] == 'yes':
[gradWorkdir, gradInput] = molpro.gradientSetup(inputs, geom, workdirSPE, index)
[gradNormalTermination, gradOutput] = util.runCalculation(inputs['hpc'], molproKeys, pwd, gradWorkdir, gradInput, submissionScript, index) # Submit third round of calcualtions to grid engine.
if gradNormalTermination:
data.gradExtractMolpro(gradWorkdir gradOutput, molproKeys['grad_regex'], molproKeys['numerical'], index)
else:
data.grads[:, :, :, index] = 'NaN'
else:
data.energies[index, :] = 'NaN'
if inputs['nacme'] == 'yes':
data.nacmes[:, :, :, index] = 'NaN'
if inputs['grad'] == 'yes':
data.grads[:, :, :, index] = 'NaN'
elif inputs['code'] == 'molcas': # TO DO
pass
if __name__ == "__main__":
pwd = os.getcwd() # parent dir for all calculations
listGeom = coordinateGenerator(refGeom) # Generate coordinates
if inputs['code'] == 'molpro': # All possible state couplings
couplings = molpro.stateCouplings(inputs['states'][-1])
elif inputs['code'] == 'molcas':
pass
pmanager = setup.ProccessManager() # Force global mem sharing for ouput data
pmanager.start()
data = setup.datastore(refGeom, inputs['states'][-1], couplings, len(listGeom), pmanager)
cpuCount = multiprocessing.cpu_count()-2
runPool = multiprocessing.Pool(processes=cpuCount)
runPool.map_async(run, [(k, v) for k, v in enumerate(listGeom)]) # listGeom contains 7000 inputs to be run in batches of n (where n=cpuCount).
runPool.close()
runPool.join()
np.save('ENERGIES.npy', data.energies) # Extract data
if inputs['nacme'] == 'yes':
np.save('NACMES.npy', data.nacmes)
if inputs['grad'] == 'yes':
Функция, которая выполняет отправку в grid engine, выглядит следующим образом,
def runCalculation(system: str, codeKeys: dict, pwd: str, workdir: str, inputfile: str, submitscript: str, index: int):
'''Submits calculations to either a local linux OS, or a HPC running eithers Sun Grid Engine or PBS Pro.
Waits for calculation to finish running and then returns True if it terminated with no errors. '''
outputfile = inputfile.split('.')[0] '.out'
os.chdir('%s' % (workdir))
if system == 'local': # Run calculation and wait for termination in each case
runLocal(codeKeys, inputfile)
elif system == 'sun grid engine':
gridEngineScript = setup_submit_script(submitscript, inputfile, index)
qsub(gridEngineScript)
elif system == 'pbs':
pass
terminationCode = calculationTermination(codeKeys, outputfile) # check normal termination
os.chdir(pwd)
return terminationCode
def subprocess_cmd(command, return_stdout): # Used for submission using qsub to grid engine.
''' Returns output for a shell command. '''
process = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True, universal_newlines=True)
proc_stdout = process.communicate()[0].strip()
if return_stdout:
return(proc_stdout)
Я опустил здесь некоторые функции для упрощения, но в основном каждый вызов runCalculation()
отправляется в механизм сетки с использованием подпроцесса.Попен(). При второй отправке я просто получаю сообщение об ошибке RuntimeError: can't start new thread
, я не уверен, почему as ulimit -u
говорит мне, что у меня есть ограничение в 200 потоков, и я выполняю эти вычисления только партиями по 31, когда у меня есть 32 процессора, доступных напрямую, и тяжелая работа, выполняемая программой, с которой я взаимодействую, заключается в отправил очередь. Проверка процессов, которые я запускаю ps -fLu [uid]
, показывает, что дополнительных неожиданных запущенных процессов нет. Сокращение количества выполняемых вычислений до 31 или запуск моего скрипта на python с использованием всего 10 процессоров по-прежнему приводит к этой проблеме.
Если у кого-нибудь есть представление о том, что происходит, это было бы здорово. Спасибо!