Как параллельно обработать большой файл, не загружая все в память

#python #multithreading #parallel-processing

#python #многопоточность #параллельная обработка

Вопрос:

У меня есть большой файл строки json (каждая строка является объектом json). Я хотел бы прочитать каждую строку и обработать каждую строку параллельно. Вот что у меня получилось:

 import gzip
import multiprocessing as mp
import pandas as pd
from json import loads

def process_file(file_line):
    json_line = loads(file_line)
    data = json_line.get('data', None)
    if data:
        df = pd.DataFrame(data)
        return df.groupby(['style']).score.describe()
    return pd.DataFrame()

pool = mp.Pool(8)
jobs = []
with gzip.open('2018.jl.gz') as f:
    for line in f:
        jobs.append(pool.apply_async(process_file,(f)))

for job in jobs:
    job.get()

pool.close()
  

Однако обработка занимает больше времени, чем чтение файла, и в конечном итоге приводит к добавлению слишком большого количества заданий и вызывает проблемы с памятью.
Есть ли способ, которым я мог бы это распараллелить? Как я мог бы закодировать его таким образом, чтобы каждый поток захватывал строку и обрабатывал строку, и когда он закончит, он захватит новую строку, и цикл будет приостановлен, если все потоки заняты?

Комментарии:

1. Вы хотите создать фрейм данных для каждой строки?

2. Может быть, вам просто нужно это pandas.pydata.org/pandas-docs/stable/reference/api /… ? Он может читать gzip, а также другие сжатия.

3. @khachik да. каждая строка превращается во фрейм данных и выполняет с ним некоторую обработку.

4. И, наконец, если вы действительно хотите реализовать то, что вы описываете, вы можете подать заявку pool.imap(process_file, f) .

5. @khachik суть не в том, чтобы прочитать все вещи в памяти. Мне нужно обработать его построчно и выполнить некоторые агрегации. и я хотел бы сделать это параллельно.