Python: нет улучшения производительности при многопроцессорной обработке.Pool() с использованием apply_async

#python #python-3.x #performance #multiprocessing #pool

#python #python-3.x #Производительность #многопроцессорная обработка #Бассейн

Вопрос:

я извлекаю и декодирую большой файл трассировки с несколькими миллионами сообщений. Данные сообщения декодируются с использованием файла .dbc и пакета cantools.

Из-за размера списка с сообщениями я попытался декодировать их параллельно, используя модуль многопроцессорной обработки. Поскольку я не могу повысить производительность, используя различные методы, реализующие многопроцессорную обработку.Очередь () и многопроцессорная обработка.Manager () я пришел после некоторых исследований к этой реализации с использованием многопроцессорной обработки.Класс Pool() .

Насколько я понимаю, это должно быть наиболее подходящим решением. Но это также даже замедляет процесс декодирования, и я не понимаю, почему.

Минимальный код для декодирования сообщений с помощью Pool():

 import cantools
import multiprocessing as mp

CORES_USED = 4


def decode_multiplepool(self, msgs):
    """ tries to decode multiple msgs on every can decoding interface """

    pool = mp.Pool(processes=CORES_USED)
    starts = [int(core * len(msgs) / CORES_USED) if core else 0 for core in range(CORES_USED)]
    ends = [int((core 1) * len(msgs) / CORES_USED) for core in range(CORES_USED)]
    results = [pool.apply_async(self._aux_decode_list, args=(msgs[starts[core]:ends[core]])) for core in range(CORES_USED)]

    msgs = []
    for elem in results:
        msgs.extend(elem.get())
    return msgs
    
def _aux_decode_list(self, msgs, mask):
    """ helper function for decoding messages in different processes 
    
    for msg in msgs:
        msg.msg_data = self.decode_msg(msg.msg_id, msg.raw_data)
    return msgs
 

Моя реализация разбивает исходный большой список сообщений на более мелкие части, декодирует каждую часть в другом процессе и объединяет список обратно.

Для сравнения, это очень простая реализация путем перебора полного списка:

 def decode_msgs_serial(self, msgs):
    """ tries to decode multiple msgs on every can decoding interface """
        
    for msg in msgs:
        msg.msg_data = self.decode_msg(msg.msg_id, msg.raw_data)
    return msgs
 

В результате последовательная реализация работает быстрее. Пробовал на 5 файлах трассировки немного разного размера. Все время указано в секундах.

 C:workspaceTrace_Decoder>main.py

Modes.ORIGINAL - this took 55.905028104782104
Modes.ORIGINAL - this took 53.10664439201355
Modes.ORIGINAL - this took 69.6441662311554
Modes.ORIGINAL - this took 55.500239849090576
Modes.ORIGINAL - this took 67.86816716194153

Modes.MULTICORE - this took 81.50722122192383
Modes.MULTICORE - this took 82.05325841903687
Modes.MULTICORE - this took 74.6193528175354
Modes.MULTICORE - this took 65.481980323791504
Modes.MULTICORE - this took 70.18386173248291
 

Любая помощь очень ценится, так как я думаю, что это очень интересная тема для изучения!