#python #multithreading #parallel-processing
#python #многопоточность #параллельная обработка
Вопрос:
У меня есть большой файл строки json (каждая строка является объектом json). Я хотел бы прочитать каждую строку и обработать каждую строку параллельно. Вот что у меня получилось:
import gzip
import multiprocessing as mp
import pandas as pd
from json import loads
def process_file(file_line):
json_line = loads(file_line)
data = json_line.get('data', None)
if data:
df = pd.DataFrame(data)
return df.groupby(['style']).score.describe()
return pd.DataFrame()
pool = mp.Pool(8)
jobs = []
with gzip.open('2018.jl.gz') as f:
for line in f:
jobs.append(pool.apply_async(process_file,(f)))
for job in jobs:
job.get()
pool.close()
Однако обработка занимает больше времени, чем чтение файла, и в конечном итоге приводит к добавлению слишком большого количества заданий и вызывает проблемы с памятью.
Есть ли способ, которым я мог бы это распараллелить? Как я мог бы закодировать его таким образом, чтобы каждый поток захватывал строку и обрабатывал строку, и когда он закончит, он захватит новую строку, и цикл будет приостановлен, если все потоки заняты?
Комментарии:
1. Вы хотите создать фрейм данных для каждой строки?
2. Может быть, вам просто нужно это pandas.pydata.org/pandas-docs/stable/reference/api /… ? Он может читать gzip, а также другие сжатия.
3. @khachik да. каждая строка превращается во фрейм данных и выполняет с ним некоторую обработку.
4. И, наконец, если вы действительно хотите реализовать то, что вы описываете, вы можете подать заявку
pool.imap(process_file, f)
.5. @khachik суть не в том, чтобы прочитать все вещи в памяти. Мне нужно обработать его построчно и выполнить некоторые агрегации. и я хотел бы сделать это параллельно.