#python #python-multiprocessing #zipfile
#python #python-многопроцессорная обработка #python-zipfile
Вопрос:
Я пытаюсь прочитать необработанные данные из zip-файла. Структура этого файла:
- zipfile
- данные
- Spectral0.data
- Spectral1.data
- Спектральный […].данные
- Spectral300.data
- Заголовок
- данные
Цель состоит в том, чтобы прочитать все Spectral[…].data в 2D массив numpy (тогда как Spectral0.data будет первым столбцом). Однопоточный подход занимает много времени с момента его чтения .файл данных занимает несколько секунд.
import zipfile
import numpy as np
spectralData = np.zeros(shape = (dimY, dimX), dtype=np.int16)
archive = zipfile.ZipFile(path, 'r')
for file in range(fileCount):
spectralDataRaw = archive.read('data/Spectral' str(file) '.data')
spectralData[:,file] = np.frombuffer(spectralDataRaw, np.short)
И я подумал, что использование multiprocessing
может ускорить процесс. Итак, я прочитал несколько руководств по настройке процедуры многопроцессорной обработки. Это то, что я придумал:
import zipfile
import numpy as np
import multiprocessing
from joblib import Parallel, delayed
archive = zipfile.ZipFile(path, 'r')
numCores = multiprocessing.cpu_count()
def testMult(file):
spectralDataRaw = archive.read('data/Spectral' str(file) '.data')
return np.frombuffer(spectralDataRaw, np.short)
output = Parallel(n_jobs=numCores)(delayed(testMult)(file)for file in range(fileCount))
output = np.flipud(np.rot90(np.array(output), 1, axes = (0,2)))
Используя этот подход, я получаю следующую ошибку:
numCores = multiprocessing.cpu_count()
def testMult(file):
spectralDataRaw = archive.read('data/Spectral' str(file) '.data')
return np.frombuffer(spectralDataRaw, np.short)
output = Parallel(n_jobs=numCores)(delayed(testMult)(file)for file in range(fileCount))
output = np.flipud(np.rot90(np.array(output), 1, axes = (0,2)))
_RemoteTraceback:
"""
Traceback (most recent call last):
File "C:ProgramDataAnaconda3envsdevEnv2libsite-packagesjoblibexternalslokybackendqueues.py", line 153, in _feed
obj_ = dumps(obj, reducers=reducers)
File "C:ProgramDataAnaconda3envsdevEnv2libsite-packagesjoblibexternalslokybackendreduction.py", line 271, in dumps
dump(obj, buf, reducers=reducers, protocol=protocol)
File "C:ProgramDataAnaconda3envsdevEnv2libsite-packagesjoblibexternalslokybackendreduction.py", line 264, in dump
_LokyPickler(file, reducers=reducers, protocol=protocol).dump(obj)
File "C:ProgramDataAnaconda3envsdevEnv2libsite-packagesjoblibexternalscloudpicklecloudpickle_fast.py", line 563, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle '_io.BufferedReader' object
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "<ipython-input-94-c4b007eea8e2>", line 8, in <module>
output = Parallel(n_jobs=numCores)(delayed(testMult)(file)for file in range(fileCount))
File "C:ProgramDataAnaconda3envsdevEnv2libsite-packagesjoblibparallel.py", line 1061, in __call__
self.retrieve()
File "C:ProgramDataAnaconda3envsdevEnv2libsite-packagesjoblibparallel.py", line 940, in retrieve
self._output.extend(job.get(timeout=self.timeout))
File "C:ProgramDataAnaconda3envsdevEnv2libsite-packagesjoblib_parallel_backends.py", line 542, in wrap_future_result
return future.result(timeout=timeout)
File "C:ProgramDataAnaconda3envsdevEnv2libconcurrentfutures_base.py", line 432, in result
return self.__get_result()
File "C:ProgramDataAnaconda3envsdevEnv2libconcurrentfutures_base.py", line 388, in __get_result
raise self._exception
PicklingError: Could not pickle the task to send it to the workers.
Мой вопрос в том, как мне правильно настроить это распараллеливание. Я читал, что zipfile не является потокобезопасным, и поэтому мне может понадобиться другой подход для чтения содержимого zip в память (ОЗУ). Я бы предпочел не читать весь zip-файл в память, поскольку файл может быть довольно большим.
Я думал об использовании from numba import njit, prange
, но там возникает проблема, из-за которой zip не поддерживается numba
. Что еще я мог сделать, чтобы это сработало?