#python #multithreading #multiprocessing
#python #многопоточность #многопроцессорная обработка
Вопрос:
Мне нужно разбить большие файлы объемом 15 Гб, чтобы их можно было обрабатывать как фрагменты параллельно, повторно собирать (по порядку) и сохранять. Этот код работает до определенной степени, он разбивает файлы на равные размеры для заданного количества потоков, а затем запускает эти потоки.
Проблема в том, что выполняется только один или два потока, в то время как остальные остановлены, даже если файл полностью загружен в память. Я использую .map
, потому что асинхронная многопроцессорная обработка или многопоточность не сохранят порядок, как я понимаю.
Я потратил на это слишком много времени и написал этот код 4 разными способами в соответствии с примерами на этом сайте. Есть ли способ распараллелить это?
Упрощенный код:
import multiprocessing
def load_mod (arg, p):
with open (arg, 'r') as fp:
fp = fp.readlines()
fp = [fp[i:i (len(fp)//p)] for i in range(0, len(fp), (len(fp)//p))] #make p# of chucks
pool = multiprocessing.Pool(p)
results = pool.map(process, fp)
sname = arg.rsplit('.', 1)[0] '_fmt.' arg.rsplit('.', 1)[-1]
with open (sname,'w') as f: f.write("".join(results))
def process (filebits):
newfile = ''
for l in filebits:
###process large amount of text line by line here and append to newfile
return newfile
process_list = [#A list of files#]
for i in process_list: load_mod(i, #number of processes#)
Комментарии:
1.
(len(fp)//p)
должно бытьmath.ceil(len(fp)/p)
, иначе вы создаете по крайней мереp
рабочие единицы, а не точноp
рабочие единицы. Кроме того, передайтеprocesses=p
.Pool
вызов для создания такого количества рабочих элементов.2.@AnonCoward: использование
pool = multiprocessing.Pool(p)
создаетp
количество рабочих процессов.3. Вы правы, неправильно поняли, извините за это.
4. @heatherfuke: Ваш вопрос о многопроцессорной обработке или многопоточности? (например, процессы или потоки) В Python это не одно и то же, и они не взаимозаменяемы.
5. Многопроцессорность, я знаю, что они разные. Однако многопроцессорная обработка также запускает несколько потоков.
Ответ №1:
Предупреждение
Передача таких больших объемов данных в подпроцессы сопряжена с изрядными накладными расходами. Если функция process
не требует больших затрат процессора, очень мало что можно получить, используя такую многопроцессорную стратегию разделения и подчинения для решения этой проблемы.
Следующий код должен работать. Я взял на себя смелость переименовать некоторые из ваших переменных в более значимые (по крайней мере, для меня) имена. Я также попытался оптимизировать часть кода, пытаясь избежать повторного создания пула несколько раз. Я также избегаю повторяющихся операций конкатенации строк. Итак, это компромисс: замена нескольких конкатенаций строк в функции process
несколькими записями файлов в функции load_mod
. Я также предоставил split
функцию, которая разделяет входной файл строк на четное количество фрагментов (по крайней мере, как можно более равномерно):
import multiprocessing
def load_mod (pool, n_processors, file):
with open (file, 'r') as fp:
# split the lines in n_processors chunks:
line_chunks = split(fp.readlines(), n_processors)
sname = arg.rsplit('.', 1)[0] '_fmt.' arg.rsplit('.', 1)[-1]
# imap allows results to be processed as they are returned
# and you do not need them all in memory at once:
new_line_chunks = pool.imap(process, line_chunks)
# do not join results, just write out each piece:
with open (sname, 'w') as f:
for lines in new_line_chunks:
for line in lines:
f.write(line)
# or maybe replace preceding two lines with: f.write(''.join(lines))
def split(a, n): # function to split a list in n even parts
k, m = divmod(len(a), n)
return list((a[i * k min(i, m):(i 1) * k min(i 1, m)] for i in range(n)))
def process(lines):
# repeatedly appending to strings is inefficient; just append to new_lines
new_lines = []
for line in lines:
###process large amount of text line by line here and append to newfile
new_lines.append(processed_line)
return new_lines
if __name__ == '__main__':
# number of processors on this computer:
n_processors = multiprocessing.cpu_count()
# create the pool once!
pool = multiprocessing.Pool(n_processors)
file_list = [""" file list """]
for file in file_list:
load_mod(pool, n_processors, file)