Как я могу одновременно выполнять два набора задач в python, когда один зависит от другого?

#python #multithreading #multiprocessing

#python #многопоточность #многопроцессорная обработка

Вопрос:

У меня есть большое количество небольших файлов для загрузки и обработки с s3.

Загрузка происходит довольно быстро, поскольку отдельные файлы составляют всего несколько мегабайт каждый. Вместе они составляют около 100 ГБ. Обработка занимает примерно в два раза больше времени, чем загрузка, и связана исключительно с процессором. Поэтому, завершая обработку в нескольких потоках при загрузке других файлов, должно быть возможно сократить общее время выполнения.

В настоящее время я загружаю файл, обрабатываю его и перехожу к следующему файлу. Есть ли в python способ, при котором я загружаю все файлы один за другим и обрабатываю каждый из них, как только он завершает загрузку? Ключевое отличие здесь в том, что пока каждый файл обрабатывается, другой всегда загружается.

Мой код выглядит так:

 files = {'txt': ['filepath1', 'filepath2', ...], 
         'tsv': ['filepath1', 'filepath2', ...]
        } 

for kind in files.keys():
    subprocess.check_call(f'mkdir -p {kind}', shell=True)
    subprocess.call(f'mkdir -p {kind}/normalized', shell=True)

    for i, file in enumerate(files[kind]):
        subprocess.call(f'aws s3 cp s3://mys3bucket.com/{file} {kind}/', shell=True)
        f = file.split('/')[-1]
        subprocess.check_call('my_process_function --input "{kind}/{f}" --output "{kind}/normalized/normalize_{f}" --units relab', shell=True)
 

Я также написал многопроцессорное решение, в котором я могу одновременно загружать и обрабатывать несколько файлов, но это не приводит к повышению скорости, поскольку скорость сети уже была насыщена. Узкое место находится в обработке. Я включил это на случай, если это поможет вам, ребята.

 from contextlib import closing
from os import cpu_count
from multiprocessing import Pool

def download_and_proc(file, kind='txt'):
    subprocess.call(f'aws s3 cp s3://mys3bucket.com/{file} {kind}/', shell=True)
    f = file.split('/')[-1]
    subprocess.check_call('my_process_function --input "{kind}/{f}" --output "{kind}/normalized/normalize_{f}" --units relab', shell=True)

with closing(Pool(processes=cpu_count()*2)) as pool:
        pool.map(download_and_proc, files)
 

Комментарии:

1. В чем проблема с вашим многопроцессорным кодом? Кажется, идея правильная — если сеть заполнена, то это узкое место, но это не мешает обработке завершенных файлов, да?

2. Вы немного неясны в своем описании; производительность зависит от времени загрузки файлов или обработки файлов? И в любом случае, почему ваше multiprocessing решение не помогает? Я не понимаю из вашего описания.

3. @Stabledog Спасибо за ответ. Я думаю, ключ в том, что обработка занимает больше времени, чем загрузка. Итак, хотя я могу загружать несколько файлов одновременно, и это хорошо, я не использую свою сеть постоянно. Он загрузит файлы (повысит скорость сети), затем обработает файлы, ничего не загружая. затем повторите.

4. @JohanL Я добавил некоторые подробности выше, которые могут помочь объяснить дальше.

Ответ №1:

Ваш текущий многопроцессорный код должен быть довольно близок к оптимальному в долгосрочной перспективе. Загрузка не всегда будет происходить с максимальной скоростью, поскольку те же потоки выполнения, которые отвечают за загрузку файла, будут ждать, пока файл не будет обработан, прежде чем загружать другой. Но обычно при обработке должен использоваться весь процессор, даже если некоторая пропускная способность сети не используется. Если вы попытаетесь всегда загружать тоже, у вас в конечном итоге закончатся файлы для загрузки, и сеть будет простаивать столько же времени, просто все в конце пакетного задания.

Одним из возможных исключений является то, что время, затрачиваемое на обработку файла, всегда одинаково. Тогда вы можете обнаружить, что ваши рабочие работают синхронно, где все они загружаются одновременно, а затем все обрабатываются одновременно, даже если рабочих больше, чем процессоров для их запуска. Если обработка каким-то образом не привязана к часам реального времени, это вряд ли будет происходить очень долго. В большинстве случаев некоторые процессы заканчиваются раньше других, и поэтому загрузка в конечном итоге будет происходить в шахматном порядке.

Таким образом, улучшение вашего кода вряд ли даст вам значительное ускорение. Если вы считаете, что вам это нужно, вы можете разделить загрузку и обработку на два отдельных пула. Возможно, даже получится выполнить одну из них как цикл с одним процессом в основном процессе, но я покажу полную версию с двумя пулами здесь:

 def download_worker(file, kind='txt'):
    subprocess.call(f'aws s3 cp s3://mys3bucket.com/{file} {kind}/', shell=True)
    return file

def processing_worker(file, kind='txt')
    f = file.split('/')[-1]
    subprocess.check_call('my_process_function --input "{kind}/{f}" --output "{kind}/normalized/normalize_{f}" --units relab', shell=True)

with Pool() as download_pool, Pool() as processing_pool:
    downloaded_iterator = download_pool.imap(download_worker, files)  # imap returns an iterator
    processing_pool.map(processing_worker, downloaded_iterator)
 

Это должно загружаться и обрабатываться так быстро, как позволяет ваша система. Если загрузка файла занимает меньше времени, чем его обработка, то вполне вероятно, что первый пул будет выполнен раньше второго, с которым код справится просто отлично. Если обработка не является узким местом, она также будет поддерживать это (второй пул будет некоторое время простаивать, ожидая завершения загрузки файлов).