Использование нескольких процессов на основе значения последовательно

#python #optimization #parallel-processing #multiprocessing #cpu

#python #оптимизация #параллельная обработка #многопроцессорная обработка #процессор

Вопрос:

прошло много времени с тех пор, как я использовал многопроцессорную обработку python. У меня есть задача, которая могла бы извлечь из этого пользу, или я ошибаюсь в этом предположении. Я могу предоставить прокси только для моего варианта использования.

По сути, я моделирую сценарий ввода в реальном времени. Если исходными данными является видео уличной сцены, моими входными данными будут состояния однозначно идентифицированных и отслеживаемых транспортных средств от кадра к кадру, например state_0: v_id=0; x=0; y=1; vx=2.5; vy=-3 .

Эти состояния будут приходить по одному, а не кадр за кадром. Итак, если приведенный выше пример был из кадра 1, если транспортное средство 0 отслеживается в кадре 2 и забирается новое транспортное средство, я получу:

state_1: v_id=0; x=2.5; y=-2; vx=3; vy=-2.5
state_2: v_id=1; x=5.6; y=3; vx=-1; vy=0

Нет информации о фрейме, просто состояние за состоянием.

Состояния представлены State объектом с пользовательским итератором, который возвращает v_id , [x, y, vx, vy] .

Я хочу провести некоторый анализ этих состояний, используя предыдущую информацию для данного транспортного средства. Вот еще несколько прокси-кодов:

 self.analysis = {}

def estimate(self):

    for v_id, state in self.State:

        if v_id not in self.analysis.keys():
            self.analysis[v_id] = state

        state_estimate = self.do_something(self.analysis[v_id], state)
        self.analysis[vid].append(state_estimate)
  

То, что у меня работает, но я хочу знать, могу ли я использовать многопроцессорную обработку для ее улучшения. Моя идея заключалась в том, могу ли я отправить все состояния v_id=0 в процесс 1, все состояния v_id=1 в процесс 2 и т.д.

Поскольку do_something() функция занимает так много времени, я подумал, что было бы быстрее, если State итератор передает состояния нескольким процессам.

Я считаю, что итератор будет узким местом, потому что я не позволяю себе «заглядывать вперед» в любое будущее состояние. Но могу ли я настроить 16 процессов и на основе v_id текущего состояния выполнить анализ оценки состояния, выполняемый данным процессом? Уместна ли здесь многопроцессорность?

Комментарии:

1. Это почти полностью будет зависеть от того, что do_something есть. Если это связано с процессором, то многопроцессорная может помочь. Если он в основном ожидает новых состояний (или обычно связан с вводом-выводом), то это, вероятно, не сильно улучшит производительность. Но вам придется либо включить подробную информацию об этом методе, либо просто профилировать / тестировать код.

2. Для упрощения, do_something — это предиктор машинного обучения, который использует прошлые прогнозы и текущее состояние для создания нового прогноза. Я думаю, что это связано с процессором? Помогает ли это? Извините, что я не могу дать фактический код.

3. Похоже, вам придется попробовать это, чтобы знать наверняка. Это всегда компромисс. Новый процесс не является бесплатным, и поэтому вы должны сбалансировать создание и использование ресурсов тех, которые приносят какую-либо пользу для вычислений. Если вычисление занимает значительно больше времени, чем требуется для поступления новых данных, то это очень хорошо может помочь в распараллеливании. Я бы рекомендовал вам попробовать это! Профилируйте код, распараллеливайте его, а затем снова профилируйте 🙂

4. Но как мне это сделать? Я профилировал код, поэтому знаю, что do_something это временная нагрузка. Я просто недостаточно знаком с многопроцессорной обработкой. Где мне создать процессы в этом коде? Как мне передать им правильные данные?

5. Взгляните на multiprocessing документы . Ваш базовый подход к отправке данных работникам на основе идентификаторов звучит неплохо (при условии, что каждый идентификатор одинаково вероятен). Я не могу написать код для вас, но кратко: создайте очередь для каждого процесса; добавляйте новые состояния в очередь; рабочие запускают бесконечный цикл, ожидая новых данных в своей очереди, выполняют их обработку и отправляют или сохраняют результаты, как вам нужно.

Ответ №1:

Я знаю, что это не сработает, но это что-то.

Я борюсь с тем, как передавать задачи работникам, если все они выполняют один и тот же вызываемый объект, но с разными v_id s и state s.

первое редактирование: я думаю, что это немного ближе к цели, у меня есть очередь для каждого процесса. state s добавляются в очереди на основе v_id . Все еще пытаюсь вычислить работника.

второе редактирование: может быть, мне нужны как входные, так и выходные очереди? И, может быть, мне вообще не нужно отслеживать процессы? Пытаюсь основать все это на примерах.

Текущее решение:

 self.analysis = {}

self.queues = {}
self.n_workers = multiprocessing.cpu_count()
for worker in range(self.n_workers):
    q_i = multiprocessing.Queue()
    q_o = multiprocessing.Queue()
    multiprocessing.Process(target=self.worker, args=(q_i, q_o), name=str(worker)).start()
    self.queues[v_id] = {'input': q_i, 'output': q_o}

def worker(self, input_queue, output_queue):

    ??? Maybe something here takes the place of self.analysis ???

def loop_states(self):

    for v_id, state in self.State:

        if v_id not in self.analysis.keys():
            self.analysis[v_id] = state

        queue = self.queues[v_id % self.n_workers]['input']
        queue.put(self.estimate, (v_id, state))

def estimate(self, v_id, state):

    state_estimate = self.do_something(self.analysis[v_id], state)
    self.analysis[vid].append(state_estimate)

  

Комментарии:

1. Здесь вы на правильном пути. worker Функция действительно должна включать в себя любую функцию, которую вам нужно распараллелить. Фактически это должен быть цикл while, который получает элемент из входной очереди, вызывает do_something этот элемент, а затем отправляет результаты обратно в выходную очередь.