Многопоточные фрагменты файла и повторная сборка

#python #multithreading #multiprocessing

#python #многопоточность #многопроцессорная обработка

Вопрос:

Мне нужно разбить большие файлы объемом 15 Гб, чтобы их можно было обрабатывать как фрагменты параллельно, повторно собирать (по порядку) и сохранять. Этот код работает до определенной степени, он разбивает файлы на равные размеры для заданного количества потоков, а затем запускает эти потоки.

Проблема в том, что выполняется только один или два потока, в то время как остальные остановлены, даже если файл полностью загружен в память. Я использую .map , потому что асинхронная многопроцессорная обработка или многопоточность не сохранят порядок, как я понимаю.

Я потратил на это слишком много времени и написал этот код 4 разными способами в соответствии с примерами на этом сайте. Есть ли способ распараллелить это?

Упрощенный код:

 import multiprocessing

def load_mod (arg, p):
    with open (arg, 'r') as fp:
        fp = fp.readlines()
        fp = [fp[i:i   (len(fp)//p)] for i in range(0, len(fp), (len(fp)//p))] #make p# of chucks
    pool = multiprocessing.Pool(p)
    results = pool.map(process, fp)
    sname = arg.rsplit('.', 1)[0]   '_fmt.'   arg.rsplit('.', 1)[-1]
    with open (sname,'w') as f: f.write("".join(results))

def process (filebits):
        newfile = ''
        for l in filebits:
           ###process large amount of text line by line here and append to newfile
        return newfile

process_list = [#A list of files#]
for i in process_list: load_mod(i, #number of processes#)
 

Комментарии:

1. (len(fp)//p) должно быть math.ceil(len(fp)/p) , иначе вы создаете по крайней мере p рабочие единицы, а не точно p рабочие единицы. Кроме того, передайте processes=p .Pool вызов для создания такого количества рабочих элементов.

2.@AnonCoward: использование pool = multiprocessing.Pool(p) создает p количество рабочих процессов.

3. Вы правы, неправильно поняли, извините за это.

4. @heatherfuke: Ваш вопрос о многопроцессорной обработке или многопоточности? (например, процессы или потоки) В Python это не одно и то же, и они не взаимозаменяемы.

5. Многопроцессорность, я знаю, что они разные. Однако многопроцессорная обработка также запускает несколько потоков.

Ответ №1:

Предупреждение

Передача таких больших объемов данных в подпроцессы сопряжена с изрядными накладными расходами. Если функция process не требует больших затрат процессора, очень мало что можно получить, используя такую многопроцессорную стратегию разделения и подчинения для решения этой проблемы.


Следующий код должен работать. Я взял на себя смелость переименовать некоторые из ваших переменных в более значимые (по крайней мере, для меня) имена. Я также попытался оптимизировать часть кода, пытаясь избежать повторного создания пула несколько раз. Я также избегаю повторяющихся операций конкатенации строк. Итак, это компромисс: замена нескольких конкатенаций строк в функции process несколькими записями файлов в функции load_mod . Я также предоставил split функцию, которая разделяет входной файл строк на четное количество фрагментов (по крайней мере, как можно более равномерно):

 import multiprocessing


def load_mod (pool, n_processors, file):
    with open (file, 'r') as fp:
        # split the lines in n_processors chunks:
        line_chunks = split(fp.readlines(), n_processors)
    sname = arg.rsplit('.', 1)[0]   '_fmt.'   arg.rsplit('.', 1)[-1]
    # imap allows results to be processed as they are returned
    # and you do not need them all in memory at once:
    new_line_chunks = pool.imap(process, line_chunks)
    # do not join results, just write out each piece:
    with open (sname, 'w') as f:
        for lines in new_line_chunks:
            for line in lines:
                f.write(line)
            # or maybe replace preceding two lines with: f.write(''.join(lines))

def split(a, n):  # function to split a list in n even parts
    k, m = divmod(len(a), n)
    return list((a[i * k   min(i, m):(i   1) * k   min(i   1, m)] for i in range(n)))

def process(lines):
    # repeatedly appending to strings is inefficient; just append to new_lines
    new_lines = []
    for line in lines:
        ###process large amount of text line by line here and append to newfile
        new_lines.append(processed_line)
    return new_lines

if __name__ == '__main__':
    # number of processors on this computer:
    n_processors = multiprocessing.cpu_count()
    # create the pool once!
    pool = multiprocessing.Pool(n_processors)
    file_list = [""" file list """]
    for file in file_list:
        load_mod(pool, n_processors, file)