Последовательно разархивируйте большие файлы асинхронно

#python #multithreading #asynchronous #pickle

#python #многопоточность #асинхронный #рассортировать

Вопрос:

У меня есть каталог с выделенными списками, которые я хотел бы загружать последовательно, использовать как часть операции, а затем отбрасывать. Размер файлов составляет около 0,75 — 2 ГБ каждый при мариновании, и я могу загрузить номер в память в любой момент, хотя далеко не все из них. Каждый обработанный файл представляет собой данные за один день.

В настоящее время процесс распаковки занимает значительную часть времени выполнения программы. Мое предлагаемое решение состоит в том, чтобы загрузить первый файл и, пока выполняется операция с этим файлом, асинхронно загрузить следующий файл в списке.

Я подумал о двух способах, которыми я мог бы это сделать: 1) Потоковая обработка и 2) Асинхронная обработка. Я пробовал оба из них, но ни один, похоже, не сработал. Ниже приведена моя (предпринятая) реализация решения на основе потоков.

 import os
import threading
import pickle

class DataSource:
    def __init__(self, folder):
        self.folder = folder
        self.next_file = None

    def get(self):
        if self.next_file is None:
            self.load_file()
        data = self.next_file
    
        io_thread = threading.Thread(target=self.load_file, daemon=True)
        io_thread.start()

        return data

    def get_next_file(self):
        for filename in sorted(os.listdir(self.folder)):
            yield self.folder   filename

    def load_file(self):
        self.next_file = pickle.load(open(next(self.get_next_file()), "rb"))
  

Основная программа вызовет DataSource().get() для извлечения каждого файла. При первой загрузке load_file() загрузит файл в next_file, где он будет сохранен. Затем поток io_thread должен загружать каждый последующий файл в next_file, который при необходимости будет возвращен через get().

Запущенный поток, похоже, выполняет некоторую работу (он потребляет огромное количество оперативной памяти, ~ 60 ГБ), однако, похоже, он не обновляет next_file.

Может кто-нибудь подсказать, почему это не работает? И, кроме того, есть ли лучший способ достичь этого результата?

Спасибо

Ответ №1:

DataSource().get() похоже, это ваша первая проблема: это означает, что вы всегда создаете новый экземпляр класса DataSource и загружаете только первый файл, потому что вы никогда больше не вызываете тот же экземпляр объекта DataSource, чтобы перейти к следующему файлу. Может быть, вы имеете в виду сделать что-то вроде:

 datasource = DataSource()
while datasource.not_done():
    datasource.get() 
  

Было бы полезно поделиться полным кодом, и предпочтительно на repl.it или где-нибудь, где это может быть выполнено.

Кроме того, если вы хотите повысить производительность, возможно, стоит изучить модуль многопроцессорной обработки, поскольку Python блокирует некоторые операции с помощью глобальной блокировки интерпретатора (GIL), так что одновременно выполняется только один поток, даже если у вас несколько ядер процессора. Это может не быть проблемой, хотя в вашем случае, поскольку чтение с диска, вероятно, является узким местом, я бы предположил, что Python снимает блокировку при выполнении базового собственного кода для чтения из файловой системы.

Мне также интересно, как вы могли бы использовать asyncio для солений.. Я предполагаю, что вы сначала считываете содержимое файла в память из файла, а затем разархивируете, когда это будет сделано, выполняя другую обработку во время загрузки. Похоже, что это могло бы хорошо работать.

Наконец, я бы добавил отладочные распечатки, чтобы посмотреть, что происходит.

Обновление: Следующая проблема, похоже, заключается в том, что вы неправильно используете генератор get_next_file. Там вы каждый раз создаете новый генератор с помощью self.get_next_file(), поэтому вы загружаете только первый файл. Вы должны создать генератор только один раз, а затем вызвать next() на нем. Может быть, это помогает понять, также на повторном:

 def get_next_file():
        for filename in ['a', 'b', 'c']:
            yield filename

for n in get_next_file():
  print(n)

print("---")

print(next(get_next_file()))
print(next(get_next_file()))
print(next(get_next_file()))

print("---")

gen = get_next_file()
print(gen)
print(next(gen))
print(next(gen))
print(next(gen))
  

Вывод:

 a
b
c
---
a
a
a
---
<generator object get_next_file at 0x7ff4757f6cf0>
a
b
c
  

https://repl.it/@ToniAlatalo/PythonYieldNext#main.py

Опять же, отладочные распечатки помогут вам увидеть, что происходит, какой файл вы загружаете, когда и т.д.

Комментарии:

1. Извиняюсь, я не был ясен. Я не создаю каждый раз новый источник данных, я просто использовал это в качестве примера. Фактический код, с которым я тестирую, таков: d = DataSource(FOLDER_LOCATION) print(d.get()[0]["timestamp"]) time.sleep(60) print(d.get()[0]["timestamp"]) . Я взгляну на многопроцессорную обработку и посмотрю, работает ли это, и если нет, я загружу полный код. Спасибо за ваше предложение.

2. Хорошо, ну, это было действительно плохо, чтобы поместить код в вопрос, который вы на самом деле не запускаете. Я думаю, что нашел еще одну ошибку, см. Обновление в ответе. Вам не нужна многопроцессорная обработка, если вы обрабатываете данные по одному, загрузка в фоновом режиме с помощью асинхронности или потоков подходит для этого (я бы, вероятно, использовал async). Я думаю, что многопроцессорная обработка действительно необходима в этом случае, только если вы не можете обновить асинхронную загрузку во время обработки или если вы хотите обрабатывать несколько файлов последовательно, используя несколько ядер.