Чтение файлов с помощью ZipFile с использованием многопроцессорной обработки

#python #python-multiprocessing #zipfile

#python #python-многопроцессорная обработка #python-zipfile

Вопрос:

Я пытаюсь прочитать необработанные данные из zip-файла. Структура этого файла:

  • zipfile
    • данные
      • Spectral0.data
      • Spectral1.data
      • Спектральный […].данные
      • Spectral300.data
    • Заголовок

Цель состоит в том, чтобы прочитать все Spectral[…].data в 2D массив numpy (тогда как Spectral0.data будет первым столбцом). Однопоточный подход занимает много времени с момента его чтения .файл данных занимает несколько секунд.

 import zipfile
import numpy as np

spectralData = np.zeros(shape = (dimY, dimX), dtype=np.int16)
archive = zipfile.ZipFile(path, 'r')

for file in range(fileCount):
    spectralDataRaw = archive.read('data/Spectral'   str(file)   '.data') 
    spectralData[:,file] = np.frombuffer(spectralDataRaw, np.short)
    
  

И я подумал, что использование multiprocessing может ускорить процесс. Итак, я прочитал несколько руководств по настройке процедуры многопроцессорной обработки. Это то, что я придумал:

 import zipfile
import numpy as np
import multiprocessing
from joblib import Parallel, delayed

archive = zipfile.ZipFile(path, 'r')
numCores = multiprocessing.cpu_count()

def testMult(file):
    spectralDataRaw = archive.read('data/Spectral'   str(file)   '.data')
    return np.frombuffer(spectralDataRaw, np.short)


output = Parallel(n_jobs=numCores)(delayed(testMult)(file)for file in range(fileCount))
output = np.flipud(np.rot90(np.array(output), 1, axes = (0,2)))
  

Используя этот подход, я получаю следующую ошибку:

 numCores = multiprocessing.cpu_count()

def testMult(file):
    spectralDataRaw = archive.read('data/Spectral'   str(file)   '.data')
    return np.frombuffer(spectralDataRaw, np.short)


output = Parallel(n_jobs=numCores)(delayed(testMult)(file)for file in range(fileCount))
output = np.flipud(np.rot90(np.array(output), 1, axes = (0,2)))
_RemoteTraceback: 
"""
Traceback (most recent call last):
  File "C:ProgramDataAnaconda3envsdevEnv2libsite-packagesjoblibexternalslokybackendqueues.py", line 153, in _feed
    obj_ = dumps(obj, reducers=reducers)
  File "C:ProgramDataAnaconda3envsdevEnv2libsite-packagesjoblibexternalslokybackendreduction.py", line 271, in dumps
    dump(obj, buf, reducers=reducers, protocol=protocol)
  File "C:ProgramDataAnaconda3envsdevEnv2libsite-packagesjoblibexternalslokybackendreduction.py", line 264, in dump
    _LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
  File "C:ProgramDataAnaconda3envsdevEnv2libsite-packagesjoblibexternalscloudpicklecloudpickle_fast.py", line 563, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle '_io.BufferedReader' object
"""


The above exception was the direct cause of the following exception:

Traceback (most recent call last):

  File "<ipython-input-94-c4b007eea8e2>", line 8, in <module>
    output = Parallel(n_jobs=numCores)(delayed(testMult)(file)for file in range(fileCount))

  File "C:ProgramDataAnaconda3envsdevEnv2libsite-packagesjoblibparallel.py", line 1061, in __call__
    self.retrieve()

  File "C:ProgramDataAnaconda3envsdevEnv2libsite-packagesjoblibparallel.py", line 940, in retrieve
    self._output.extend(job.get(timeout=self.timeout))

  File "C:ProgramDataAnaconda3envsdevEnv2libsite-packagesjoblib_parallel_backends.py", line 542, in wrap_future_result
    return future.result(timeout=timeout)

  File "C:ProgramDataAnaconda3envsdevEnv2libconcurrentfutures_base.py", line 432, in result
    return self.__get_result()

  File "C:ProgramDataAnaconda3envsdevEnv2libconcurrentfutures_base.py", line 388, in __get_result
    raise self._exception

PicklingError: Could not pickle the task to send it to the workers.
  

Мой вопрос в том, как мне правильно настроить это распараллеливание. Я читал, что zipfile не является потокобезопасным, и поэтому мне может понадобиться другой подход для чтения содержимого zip в память (ОЗУ). Я бы предпочел не читать весь zip-файл в память, поскольку файл может быть довольно большим.

Я думал об использовании from numba import njit, prange , но там возникает проблема, из-за которой zip не поддерживается numba . Что еще я мог сделать, чтобы это сработало?