#python #python-3.x #multiprocessing #python-3.7
#python #python-3.x #многопроцессорная обработка #python-3.7
Вопрос:
Я пытаюсь суммировать размеры всех файлов в каталоге, включая рекурсивные подкаталоги. Соответствующая функция ( self._count
) работает полностью нормально, если я просто вызову ее один раз. Но для больших объемов файлов я хочу использовать multiprocessing
, чтобы ускорить работу программы. Вот соответствующие части кода.
self._sum_dict
суммирует значения одних и тех же ключей данных dicts.
self._get_file_type
возвращает категорию (ключ stats
), в которую должен быть помещен файл.
self._categories
содержит список всех возможных категорий.
number_of_threats
указывает количество рабочих, которое должно использоваться.
path
содержит путь к каталогу, указанному в первом предложении.
import os
from multiprocessing import Pool
def _count(self, path):
stats = dict.fromkeys(self._categories, 0)
try:
dir_list = os.listdir(path)
except:
# I do some warning here, but removed it for SSCCE
return stats
for element in dir_list:
new_path = os.path.join(path, element)
if os.path.isdir(new_path):
add_stats = self._count(new_path)
stats = self._sum_dicts([stats, add_stats])
else:
file_type = self._get_file_type(element)
try:
size = os.path.getsize(new_path)
except Exception as e:
# I do some warning here, but removed it for SSCCE
continue
stats[file_type] = size
return stats
files = []
dirs = []
for e in dir_list:
new_name = os.path.join(path, e)
if os.path.isdir(new_name):
dirs.append(new_name)
else:
files.append(new_name)
with Pool(processes=number_of_threats) as pool:
res = pool.map(self._count, dirs)
self._stats = self._sum_dicts(res)
Я знаю, что этот код не будет учитывать файлы path
, но это то, что я могу легко добавить. При выполнении кода я получаю следующее исключение.
Exception has occurred: TypeError
cannot serialize '_io.TextIOWrapper' object
...
line ... in ...
res = pool.map(self._count, dirs)
Я обнаружил, что это исключение может возникать при совместном использовании ресурсов между процессами, с которыми, насколько я вижу, я работаю только stats = dict.fromkeys(self._categories, 0)
. Но замена этой строки жестко заданными значениями не решит проблему. Даже установка точки останова в этой строке мне не поможет, потому что она не достигнута.
У кого-нибудь есть идея, в чем причина этой проблемы и как я могу это исправить?
Ответ №1:
Проблема в том, что вы пытаетесь передать «self». Если у self есть объект Stream, он не может быть сериализован.
Попробуйте переместить многопроцессорный код за пределы класса.
Многопроцессорная обработка Python запускает новый интерпретатор, и если вы пытаетесь получить доступ к общему коду, который нельзя обработать (или сериализовать), он завершается неудачей. Обычно он не падает там, где вы думаете, что он разбился… но при попытке получить объект. Я преобразовал код с использованием потоков в многопроцессорную обработку, и у меня было много странных ошибок, даже если я не отправлял и не использовал эти объекты, но я использовал их родительский (self)
Комментарии:
1. Спасибо за ответ! Я попытался поместить код для
_count
функции и всех вызываемых в ней функций за пределы класса, эффективно сделав их обычными функциями. Но теперь у меня проблема в том, что я не знаю, как предоставить дополнительные параметры этим функциям, например список всех возможных категорий. Знаете ли вы, как этого можно достичь?2. Это еще одна проблема.. вы должны использовать менеджер из многопроцессорного amd Manager. очередь / список или другие… я не слишком вникал в это… но блокировка и очередь оттуда работают. U просто передает это как параметр при добавлении задания в пул.
3. Это если вы хотите изменить эти элементы, если не просто указать их в качестве параметров, и python сделает копию