Python: параллельная обработка при получении

#python #multiprocessing #yield

Вопрос:

Я создаю генератор строк в списке файлов, мой подход примерно такой:

 def load(f_name: str):
    with open(f_name, "r") as f:
        lines = f.readlines()
    # some calculations
    return lines

def iter_files(f_names: list):
    for f in f_names:
        for line in load(f):
            yield line
 

Что я хотел бы сделать, если это возможно и полезно, так это загрузить следующий файл, одновременно переходя к другому.
Будучи совершенно новичком в многопроцессорной обработке, я попробовал следующее:

 cache = dict()

def load(f_name: str, id: int):
    global cache
    with open(f_name, "r") as f:
        lines = f.readlines()
    # some calculations
    cache[id] = lines

def iter_list(arr):
    for x in arr:
        yield x

def iter_files(f_names: list):
    global cache
    num_files = len(f_names)
    load(f_names[0], 0)
    for n in range(num_files - 1):
        current = mp.Process(target=iter_list, args=(cache[n],))
        next = mp.Process(target=load, args=(f_names[n   1], n   1))
        current.start()
        next.start()
        current.join()
        next.join()
        del cache[n]
    iter_list(cache[num_files - 1])
    del cache[num_files - 1]
 

Но помимо того, что это выглядит слишком сложно, это не работает.

Во-первых, если я не введу основной код в «если __имя__ = =» __main__»: «(что я бы предпочел не делать обязательным) Я получаю следующую ошибку:

 RuntimeError:
      An attempt has been made to start a new process before the
      current process has finished its bootstrapping phase.
 

Но даже если я это сделаю, файлы не будут добавлены в кэш:

 current = mp.Process(target=iter_list, args=(cache[n],))
KeyError: 1
 

Возможно ли достичь того, что я пытаюсь сделать? Что я делаю не так?

Спасибо вам всем

Комментарии:

1. «Что я хотел бы сделать, если это возможно и полезно». Возможно, конечно. Полезно? Это зависит от того, что вы делаете с этими строками. Если объем обработки невелик по сравнению с вводом-выводом (который легко может быть в 100 раз медленнее), вы не увидите значительного ускорения из-за дополнительной сложности.

2. Файлы @Thomas довольно малы (в среднем 50 КБ), но они закодированы таким образом, что каждый байт более или менее соответствует записи обработанного списка, поэтому я предполагаю, что медленная часть алгоритма загрузки-это «# некоторые вычисления», а не фактическое изменение файлов.

Ответ №1:

multiprocessing.Queue Класс идеально подходит для этого. Вы put вводите строки на одном конце (подпроцесс), а get затем возвращаете их на другом конце (основной процесс). К сожалению, нет встроенного способа пометить очередь как «законченную», поэтому нам нужно значение put sentinel, например None , чтобы указать, что все строки обработаны.

 import multiprocessing as mp

def load(f_name: str):
    with open(f_name, "r") as f:
        lines = f.readlines()
    # some calculations
    return lines

def iter_files(f_names: list, queue: mp.Queue):
    for f in f_names:
        for line in load(f):
            queue.put(line)
    queue.put(None)

def iter_files_process(f_names: list):
    queue = mp.Queue()
    process = mp.Process(target=iter_files, args=(f_names, queue))
    process.start()
    while True:
        line = queue.get()
        if line is None:  # End-of-queue value.
            break
        yield line
    process.join()  # Wait for the process to be completely finished.

if __name__ == "__main__":
    for line in iter_files_process(['a.txt', 'b.txt']):
        print(line, end='')
 

Комментарии:

1. Когда я выполняю queue.get (), значение удаляется из очереди, верно? Кроме того, таким образом процесс пытается загрузить все файлы, есть ли способ ограничить его только следующим (и текущим, если он еще не находится в очереди)?

2. Да, queue.get() удаляет и возвращает следующий элемент. Вы можете передать конструктору максимальный размер очереди Queue() , чтобы процесс чтения файлов не продвинулся слишком далеко вперед (например queue = mp.Queue(1) ).

Ответ №2:

Я считаю, что решение, предложенное Томасом, является интересным подходом, однако:

  1. A Pipe , хотя и менее гибкая конструкция , чем a Queue , — это все, что здесь требуется, поскольку есть только один отправитель и один получатель, и она гораздо более производительна.
  2. Я тестировал как с использованием многопоточности, так и с многопроцессорной обработкой, и многопоточность также намного быстрее.
 from multiprocessing import Pipe
from multiprocessing.connection import Connection
import threading

def load(f_name: str):
    with open(f_name, "r", encoding="utf8") as f:
        lines = f.readlines()
    # some calculations
    return lines

def iter_files(f_names: list, send_conn: Connection):
    for f in f_names:
        for line in load(f):
            send_conn.send(line)
    send_conn.send(None)

def generate_lines(f_names: list):
    recv_conn, send_conn = Pipe(False)
    threading.Thread(target=iter_files, args=(f_names, send_conn), daemon=True).start()
    for line in iter(recv_conn.recv, None):
        yield line

if __name__ == "__main__":
    import time

    t = time.time()
    lines = list(generate_lines(['irv.py', 'waitList.py', 'send_mail_async.py', '../ajaxtcr.js', '../browser_detection.php']))
    elapsed = time.time() - t
    print(len(lines), elapsed)
 

С принтами:

 6537 0.18297886848449707
 

Обратите внимание, что код Томаса без изменений, за исключением добавления аргумента кодировки в open вызов, занял для этого списка файлов 0,24300265312194824 секунды.

Но сравните с исходным кодом:

 def load(f_name: str):
    with open(f_name, "r", encoding="utf8") as f:
        lines = f.readlines()
    # some calculations
    return lines

def iter_files(f_names: list):
    for f in f_names:
        for line in load(f):
            yield line

import time
t = time.time()
lines = list(iter_files(['irv.py', 'waitList.py', 'send_mail_async.py', '../ajaxtcr.js', '../browser_detection.php']))
elapsed = time.time() - t
print(len(lines), elapsed)
 

С принтами:

 6537 0.07400083541870117
 

Таким образом, этот подход не предлагает никаких реальных улучшений. Другой подход заключается в одновременной обработке всех файлов:

 from multiprocessing.pool import ThreadPool

def load(f_name: str):
    with open(f_name, "r", encoding='utf8') as f:
        lines = f.readlines()
    # some calculations
    return lines

def generate_lines(f_names: list):
    with ThreadPool(len(f_names)) as pool:
        for lines in pool.imap(load, f_names):
            for line in lines:
                yield line

if __name__ == "__main__":
    import time

    t = time.time()
    lines = list(generate_lines(['irv.py', 'waitList.py', 'send_mail_async.py', '../ajaxtcr.js', '../browser_detection.php']))
    elapsed = time.time() - t
    print(len(lines), elapsed)
 

С принтами:

 6537 0.010999441146850586
 

Важное Примечание

Конечно, при одновременном использовании нескольких файлов могут возникнуть разногласия, и причина, по которой производительность была такой хорошей в этом последнем тесте, заключалась в том, что все или большая часть считываемых данных, вероятно, находилась в кэше, потому что эти файлы были прочитаны так много раз во всех этих тестах. Решение с каналом (или очередью) использует тот же кэш и явно работает хуже, чем исходный код, поэтому единственный реальный вопрос заключается в том, будет ли версия пула потоков работать лучше с некэшированными данными.

Мне пришлось бы ждать сейчас значительное количество времени (или перезагрузиться?) чтобы убедиться, что кэш был очищен, а затем повторно запустите пример пула потоков, чтобы получить более точные данные. Или используйте новый список файлов с кодом пула потоков, а затем запустите исходный код, который будет иметь преимущество кэширования и сравнения.

Обновить

Я повторяю это с другим списком файлов, и решение пула потоков работает хуже, чем исходный код. Я бы просто придерживался исходного кода и избегал любых попыток включить многопоточность, многопроцессорность или асинхронность в попытке повысить производительность.

В зависимости от того, что такое «некоторые вычисления», упомянутые в функции load , вы можете рассмотреть:

 def load(f_name: str):
    with open(f_name, "r" as f:
        for line in f:
            # some calculations
            yield line

def iter_files(f_names: list):
    for f in f_names:
        yield from load(f)
 

Это позволило бы сэкономить ресурсы памяти.