#python #multithreading #asynchronous #pickle
#python #многопоточность #асинхронный #рассортировать
Вопрос:
У меня есть каталог с выделенными списками, которые я хотел бы загружать последовательно, использовать как часть операции, а затем отбрасывать. Размер файлов составляет около 0,75 — 2 ГБ каждый при мариновании, и я могу загрузить номер в память в любой момент, хотя далеко не все из них. Каждый обработанный файл представляет собой данные за один день.
В настоящее время процесс распаковки занимает значительную часть времени выполнения программы. Мое предлагаемое решение состоит в том, чтобы загрузить первый файл и, пока выполняется операция с этим файлом, асинхронно загрузить следующий файл в списке.
Я подумал о двух способах, которыми я мог бы это сделать: 1) Потоковая обработка и 2) Асинхронная обработка. Я пробовал оба из них, но ни один, похоже, не сработал. Ниже приведена моя (предпринятая) реализация решения на основе потоков.
import os
import threading
import pickle
class DataSource:
def __init__(self, folder):
self.folder = folder
self.next_file = None
def get(self):
if self.next_file is None:
self.load_file()
data = self.next_file
io_thread = threading.Thread(target=self.load_file, daemon=True)
io_thread.start()
return data
def get_next_file(self):
for filename in sorted(os.listdir(self.folder)):
yield self.folder filename
def load_file(self):
self.next_file = pickle.load(open(next(self.get_next_file()), "rb"))
Основная программа вызовет DataSource().get() для извлечения каждого файла. При первой загрузке load_file() загрузит файл в next_file, где он будет сохранен. Затем поток io_thread должен загружать каждый последующий файл в next_file, который при необходимости будет возвращен через get().
Запущенный поток, похоже, выполняет некоторую работу (он потребляет огромное количество оперативной памяти, ~ 60 ГБ), однако, похоже, он не обновляет next_file.
Может кто-нибудь подсказать, почему это не работает? И, кроме того, есть ли лучший способ достичь этого результата?
Спасибо
Ответ №1:
DataSource().get()
похоже, это ваша первая проблема: это означает, что вы всегда создаете новый экземпляр класса DataSource и загружаете только первый файл, потому что вы никогда больше не вызываете тот же экземпляр объекта DataSource, чтобы перейти к следующему файлу. Может быть, вы имеете в виду сделать что-то вроде:
datasource = DataSource()
while datasource.not_done():
datasource.get()
Было бы полезно поделиться полным кодом, и предпочтительно на repl.it или где-нибудь, где это может быть выполнено.
Кроме того, если вы хотите повысить производительность, возможно, стоит изучить модуль многопроцессорной обработки, поскольку Python блокирует некоторые операции с помощью глобальной блокировки интерпретатора (GIL), так что одновременно выполняется только один поток, даже если у вас несколько ядер процессора. Это может не быть проблемой, хотя в вашем случае, поскольку чтение с диска, вероятно, является узким местом, я бы предположил, что Python снимает блокировку при выполнении базового собственного кода для чтения из файловой системы.
Мне также интересно, как вы могли бы использовать asyncio для солений.. Я предполагаю, что вы сначала считываете содержимое файла в память из файла, а затем разархивируете, когда это будет сделано, выполняя другую обработку во время загрузки. Похоже, что это могло бы хорошо работать.
Наконец, я бы добавил отладочные распечатки, чтобы посмотреть, что происходит.
Обновление: Следующая проблема, похоже, заключается в том, что вы неправильно используете генератор get_next_file. Там вы каждый раз создаете новый генератор с помощью self.get_next_file(), поэтому вы загружаете только первый файл. Вы должны создать генератор только один раз, а затем вызвать next() на нем. Может быть, это помогает понять, также на повторном:
def get_next_file():
for filename in ['a', 'b', 'c']:
yield filename
for n in get_next_file():
print(n)
print("---")
print(next(get_next_file()))
print(next(get_next_file()))
print(next(get_next_file()))
print("---")
gen = get_next_file()
print(gen)
print(next(gen))
print(next(gen))
print(next(gen))
Вывод:
a
b
c
---
a
a
a
---
<generator object get_next_file at 0x7ff4757f6cf0>
a
b
c
https://repl.it/@ToniAlatalo/PythonYieldNext#main.py
Опять же, отладочные распечатки помогут вам увидеть, что происходит, какой файл вы загружаете, когда и т.д.
Комментарии:
1. Извиняюсь, я не был ясен. Я не создаю каждый раз новый источник данных, я просто использовал это в качестве примера. Фактический код, с которым я тестирую, таков:
d = DataSource(FOLDER_LOCATION) print(d.get()[0]["timestamp"]) time.sleep(60) print(d.get()[0]["timestamp"])
. Я взгляну на многопроцессорную обработку и посмотрю, работает ли это, и если нет, я загружу полный код. Спасибо за ваше предложение.2. Хорошо, ну, это было действительно плохо, чтобы поместить код в вопрос, который вы на самом деле не запускаете. Я думаю, что нашел еще одну ошибку, см. Обновление в ответе. Вам не нужна многопроцессорная обработка, если вы обрабатываете данные по одному, загрузка в фоновом режиме с помощью асинхронности или потоков подходит для этого (я бы, вероятно, использовал async). Я думаю, что многопроцессорная обработка действительно необходима в этом случае, только если вы не можете обновить асинхронную загрузку во время обработки или если вы хотите обрабатывать несколько файлов последовательно, используя несколько ядер.