Как написать счетчик прогресса / времени для параллелизма?

#python #concurrency #multiprocessing

#python #параллелизм #многопроцессорная обработка

Вопрос:

У меня есть очень простой Clock класс, который будет записывать время, количество задач и позволяет другим частям кода вызывать его статический метод Clock.increment() , чтобы пользователи могли получать отзывы о ходе выполнения. Однако многопроцессорная обработка создает отдельные копии для каждого процесса, и даже если я инициализирую класс в основном процессе, дочерний процесс не имеет возможности получить к нему доступ. Вот мой Clock класс:

 class Counter(object):
    '''
    This counter needs to be initiated
    '''
    startTime = time.time()
    currentProgress = 0

    def __init__(self, totalTask):
        # self.startTime = time.time()
        Counter.totalTask = totalTask
        print("Counter initiated")

    def increment():
        Counter.currentProgress  = 1
        Counter.expectedTime = ((time.time() - Counter.startTime) / Counter.currentProgress)*(Counter.totalTask - Counter.currentProgress)
        print("Progress: " str(Counter.currentProgress) " / "  str(Counter.totalTask)   " : "   str(float(Counter.currentProgress) / float(Counter.totalTask)*100) "%")
        print("Expected finish in: "   str(Counter.expectedTime/3600.0)   " hrs")

    increment = staticmethod(increment)
  

Я называю это как:

 if __name__ == "__main__":

    example = [1,2,3,4,5]

    counter = Counter(len(example))

    p = Pool(processes=2)
    p.map(conprintNum, example)

def conprintNum(num):
    print(num)
    Counter.increment()
  

И это не сработает. Я не хочу вручную указывать общий номер задачи для Clock класса, но это может быть последним средством (нарушающим все правила хорошего программирования). практика). Есть ли в любом случае, что я могу заставить это работать в параллелизме? Я также пробовал одноэлементный шаблон, но на самом деле это не решает проблему.

Комментарии:

1. Здесь вам понадобится некоторая блокировка, иначе вы получите несколько действительно неприятных, недетерминированных ошибок.

Ответ №1:

Вы можете использовать a multiprocessing.BaseManager для совместного использования вашего Counter экземпляра между всеми подпроцессами:

 import time
from functools import partial
from multiprocessing import Pool
from multiprocessing.managers import BaseManager

if __name__ == "__main__":

    example = [1,2,3,4,5]

    # Create our custom manager, register the Counter object with it,
    # start it, and then create our shared Counter instance.
    m = BaseManager()
    m.register('Counter', Counter)
    m.start()
    counter = m.Counter(len(example))

    p = Pool(processes=2)

    # We create a partial so that it's easier to pass the counter instance
    # along with every value in our example iterable.
    func = partial(conprintNum, counter)

    p.map(func, example)
  

Вывод:

 Counter initiated
1
Progress: 1 / 5 : 20.0%
Expected finish in: 4.0926668379e-05 hrs
2
Progress: 2 / 5 : 40.0%
Expected finish in: 1.61524613698e-05 hrs
3
Progress: 3 / 5 : 60.0%
Expected finish in: 7.86887274848e-06 hrs
4
Progress: 4 / 5 : 80.0%
Expected finish in: 3.15326783392e-06 hrs
5
Progress: 5 / 5 : 100.0%
Expected finish in: 0.0 hrs
  

Редактировать:

Как указано в комментарии, здесь есть условие гонки, когда несколько процессов могут increment одновременно вводить метод, в результате чего выходные данные будут отображаться не так, как вы хотели бы. Это можно увидеть более четко, если мы добавим sleep вызов в increment :

 def increment():
    Counter.currentProgress  = 1
    time.sleep(random.randint(1,6)) # Artificially delay execution.
    Counter.expectedTime = ((time.time() - Counter.startTime) / Counter.currentProgress)*(Counter.totalTask - Counter.currentProgress)
    print("Progress: " str(Counter.currentProgress) " / "  str(Counter.totalTask)   " : "   str(float(Counter.currentProgress) / float(Counter.totalTask)*100) "%")
    print("Expected finish in: "   str(Counter.expectedTime/3600.0)   " hrs")
  

Теперь вы получаете вывод следующим образом:

 Counter initiated
1
2
3
4
Progress: 4 / 5 : 80.0%
Expected finish in: 7.12237589889e-05 hrs
Progress: 4 / 5 : 80.0%
Expected finish in: 7.15903441111e-05 hrs
5
Progress: 5 / 5 : 100.0%
Expected finish in: 0.0 hrs
Progress: 5 / 5 : 100.0%
Expected finish in: 0.0 hrs
Progress: 5 / 5 : 100.0%
Expected finish in: 0.0 hrs
  

Очевидно, что это нехорошо. Однако этого легко избежать, выполнив вызов increment внутри Lock блока:

 def conprintNum(counter, lock, num):
    print(num)
    with lock:
        counter.increment()

if __name__ == "__main__":

    example = [1,2,3,4,5]

    m = SyncManager()  # SyncManager, rather than BaseManager
    m.register('Counter', Counter)
    m.start()
    lock = m.Lock()  # SyncManager comes with a shared Lock implementation.
    counter = m.Counter(len(example))
    p = Pool(processes=4)
    func = partial(conprintNum, counter, lock)

    p.map(func, example)
  

Комментарии:

1. Я думал, что, поскольку increment() это однонаправленность, я вообще не столкнусь с состоянием гонки.

2. @WindDweller Посмотрите пример вывода, который я включил. currentProgress Переменная будет увеличена должным образом, но то, что она выводит впоследствии, может быть испорчено другими процессами, также увеличивающимися currentProgress или изменяющимися expectedTime до того, как они смогут print их.