Почему нельзя добавить обработчик файлов с формой self.fh в методе инициализации?

#python #python-3.x #python-multiprocessing

Вопрос:

информация об ОС и python:

 uname -a
Linux debian 5.10.0-8-amd64 #1 SMP Debian 5.10.46-4 (2021-08-03) x86_64 GNU/Linux
python3 --version
Python 3.9.2
 

Вот простой класс, который может запускать многопроцессорную обработку.

 from multiprocessing.pool import Pool    

class my_mp(object):
    def __init__(self):
        self.process_num = 3
        fh = open('test.txt', 'w')
    def run_task(self,i):
        print('process {} start'.format(str(i)))
        time.sleep(2)
        print('process {} end'.format(str(i)))
    def run(self):
        pool = Pool(processes = self.process_num)
        for i in range(self.process_num):
            pool.apply_async(self.run_task,args = (i,))
        pool.close()
        pool.join()
 

Инициализируйте my_mp класс,затем запустите многопроцессорность.

 ins = my_mp()
ins.run()
process 0 start
process 1 start
process 2 start
process 0 end
process 2 end
process 1 end
 

Теперь замените fh = open('test.txt', 'w') на self.fh = open('test.txt', 'w') в my_mp классе и повторите попытку.

 ins = my_mp()
ins.run()    
 

Нет выхода!Почему процесс не запускается?

 >>> from multiprocessing.pool import Pool    
>>> 
>>> class my_mp(object):
...     def __init__(self):
...         self.process_num = 3
...         fh = open('test.txt', 'w')
...     def run_task(self,i):
...         print('process {} start'.format(str(i)))
...         time.sleep(2)
...         print('process {} end'.format(str(i)))
...     def run(self):
...         pool = Pool(processes = self.process_num)
...         for i in range(self.process_num):
...             pool.apply_async(self.run_task,args = (i,))
...         pool.close()
...         pool.join()
... 
>>> x = my_mp()
>>> x.run()
process 0 start
process 1 start
process 2 start
process 2 end
process 0 end
process 1 end
>>> class my_mp(object):
...     def __init__(self):
...         self.process_num = 3
...         self.fh = open('test.txt', 'w')
...     def run_task(self,i):
...         print('process {} start'.format(str(i)))
...         time.sleep(2)
...         print('process {} end'.format(str(i)))
...     def run(self):
...         pool = Pool(processes = self.process_num)
...         for i in range(self.process_num):
...             pool.apply_async(self.run_task,args = (i,))
...         pool.close()
...         pool.join()
... 
>>> x = my_mp()
>>> x.run()
>>> x.run()
>>> x = my_mp()
>>> class my_mp(object):
...     def __init__(self):
...         self.process_num = 3
...         fh = open('test.txt', 'w')
...         self.fh = fh
...     def run_task(self,i):
...         print('process {} start'.format(str(i)))
...         time.sleep(2)
...         print('process {} end'.format(str(i)))
...     def run(self):
...         pool = Pool(processes = self.process_num)
...         for i in range(self.process_num):
...             pool.apply_async(self.run_task,args = (i,))
...         pool.close()
...         pool.join()
... 
>>> x = my_mp()
>>> x.run()
>>> 
 

Почему нельзя добавить обработчик файла с формой self.fh в __init__ методе?Я никогда не вызывал обработчик файла __init__ , определенный в каком-либо процессе.

Ответ №1:

Проблема:

Многопроцессорная обработка Stdlib использует pickle для сериализации объектов. Все, что необходимо отправить через границу процесса, должно быть поддающимся маринованию.

Экземпляры пользовательских классов, как правило, можно выбрать, если все их атрибуты доступны для выбора — это работает путем импорта типа в подпроцесс и удаления атрибутов.

Проблема в том, что возвращаемый объект open() не поддается выборке.

 >>> class A:
...     pass
... 
>>> import pickle
>>> pickle.dumps(A())
b'x80x04x95x15x00x00x00x00x00x00x00x8cx08__main__x94x8cx01Ax94x93x94)x81x94.'
>>> class A:
...     def __init__(self):
...         self.fh = open("test.txt", "w")
... 
>>> pickle.dumps(A())
TypeError: cannot pickle '_io.TextIOWrapper' object
 

В первом случае пул многопроцессорной обработки все еще работает, потому fh что это всего лишь локальная переменная, и она удаляется, как только выходит за рамки, т. е. Когда __init__ метод возвращается. Но как только вы сохраните этот дескриптор в пространстве имен экземпляра с self.fh = open(...) , останется ссылка, и ее нужно будет отправить через границу процесса.

Вы можете подумать, что, поскольку вы запланировали выполнение метода только self.run_task в пуле, набор состояний из __init__ не имеет значения, но это не так. Все еще есть ссылка:

 >>> ins = my_mp()
>>> ins.run_task.__self__.__dict__
{'process_num': 3,
 'fh': <_io.TextIOWrapper name='test.txt' mode='w' encoding='UTF-8'>}
 

Обратите внимание, что вызов ins = my_mp() запускает __init__ метод в основном процессе и ins.run_task является объектом, который отправляется за границу процесса.

Решение:

Существует сторонняя библиотека, которая обеспечивает замену многопроцессорного пула stdlib pip install pathos и заменяет многопроцессорный импорт на:

 from pathos.multiprocessing import Pool
 

В pathos используется dill, более мощная библиотека сериализации, чем pickle, поэтому она может сериализовать возвращаемые объекты open() . Ваш код должен снова работать без каких-либо других изменений. Однако следует помнить, что каждый рабочий процесс не будет знать о других процессах , в которые записываются байты self.fh , поэтому, какой бы рабочий процесс ни выполнял последнюю запись, он может перезаписать данные, записанные ранее из какого-либо другого процесса.

Ответ №2:

Я провел некоторое расследование, но оно не дает полного ответа на вопрос. Я собираюсь опубликовать результаты здесь на случай, если они помогут кому-то еще.

Во-первых, если подпроцесс завершается неудачно, обратная связь отсутствует. Поэтому я добавил дополнительную строку для отображения выходных данных подпроцессов. Так и должно быть None , если ошибок не происходит. Новый кодекс:

         for i in range(3):
            res = pool.apply_async(self.run_task, args=(i,))
            print(res.get())
 

Вывод

 Traceback (most recent call last):
  File "C:/temp/LeetCode-solutions/multithreading.py", line 43, in <module>
    mp.run()
  File "C:/temp/LeetCode-solutions/multithreading.py", line 19, in run
    self.multiprocessing()
  File "C:/temp/LeetCode-solutions/multithreading.py", line 30, in multiprocessing
    print(res.get())
  File "C:Program FilesWindowsAppsPythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0libmultiprocessingpool.py", line 771, in get
    raise self._value
  File "C:Program FilesWindowsAppsPythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0libmultiprocessingpool.py", line 537, in _handle_tasks
    put(task)
  File "C:Program FilesWindowsAppsPythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0libmultiprocessingconnection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "C:Program FilesWindowsAppsPythonSoftwareFoundation.Python.3.8_3.8.2800.0_x64__qbz5n2kfra8p0libmultiprocessingreduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_io.TextIOWrapper' object
 

Похоже, что программа получает объект file как часть аргумента to self.run_task . Ошибка имеет долгую историю в StackOverflow, но лучшее объяснение IMO здесь:
https://discuss.python.org/t/use-multiprocessing-module-to-handle-a-large-file-in-python/6604

Я не нашел, почему наличие атрибута, который является файловым объектом, делает все атрибуты файловых объектов класса, но я надеюсь, что это откроет какую-то тайну.

Заключительный тест: следующий код работает должным образом

 from multiprocessing.pool import Pool
import time


class MyMP(object):
    def __init__(self):
        self.process_num = 3

    def run(self):
        self.fh = open('test.txt', 'w')
        pool = Pool(processes=3)
        for i in range(3):
            res = pool.apply_async(run_task, args=(i,))
            print(res.get())
        pool.close()
        pool.join()
        self.fh.close()

def run_task(i):
    print('process {} start'.format(str(i)))
    time.sleep(2)
    print('process {} end'.format(str(i)))