Многопроцессорная обработка Python с PyCUDA

#python #cuda #parallel-processing #multiprocessing #pycuda

#python #cuda #параллельная обработка #многопроцессорная обработка #pycuda

Вопрос:

У меня проблема, которую я хочу разделить на нескольких устройствах CUDA, но я подозреваю, что моя текущая системная архитектура сдерживает меня;

То, что я настроил, — это класс GPU с функциями, которые выполняют операции на GPU (странно, что). Эти операции выполнены в стиле

 for iteration in range(maxval):
    result[iteration]=gpuinstance.gpufunction(arguments,iteration)
  

Я предполагал, что будет N gpuinstances для N устройств, но я недостаточно разбираюсь в многопроцессорной обработке, чтобы увидеть простейший способ применения этого, чтобы каждое устройство назначалось асинхронно, и, как ни странно, немногие из примеров, с которыми я столкнулся, давали конкретные демонстрации сопоставления результатов после обработки.

Кто-нибудь может дать мне какие-либо указания в этой области?

ОБНОВЛЕНИЕ Спасибо, Калоян, за ваше руководство в области многопроцессорной обработки; если бы CUDA не была конкретно камнем преткновения, я бы отметил вас как ответившего. Извините.

Прежде чем играть с этой реализацией, класс gpuinstance инициировал устройство CUDA с помощью import pycuda.autoinit Но, похоже, это не сработало, выдавая invalid context ошибки, как только каждый поток (с правильной областью видимости) выполнял команду cuda. Затем я попробовал инициализацию вручную в __init__ конструкторе класса с помощью…

 pycuda.driver.init()
self.mydev=pycuda.driver.Device(devid) #this is passed at instantiation of class
self.ctx=self.mydev.make_context()
self.ctx.push()    
  

Мое предположение здесь заключается в том, что контекст сохраняется между созданием списка gpuinstances и их использованием потоками, поэтому каждое устройство прекрасно вписывается в свой собственный контекст.

(Я также внедрил деструктор для pop/detach очистки)

Проблема в том, invalid context что исключения по-прежнему появляются, как только поток пытается коснуться CUDA.

Есть идеи, ребята? И спасибо, что зашли так далеко. Автоматическое повышение голосов для людей, которые вкладывают «банан» в свой ответ! 😛

Комментарии:

1. Является gpuinstance.gpufunction(arguments,iteration) асинхронным или блокирует выполнение?

Ответ №1:

Сначала вам нужно разложить все ваши бананы по полочкам на стороне CUDA, а затем подумать о лучшем способе сделать это на Python [бесстыдный представитель, я знаю].

Модель CUDA с несколькими графическими процессорами довольно проста до версии 4.0 — каждый графический процессор имеет свой собственный контекст, и каждый контекст должен быть установлен другим потоком хоста. Итак, идея псевдокода заключается:

  1. Приложение запускается, процесс использует API для определения количества используемых графических процессоров (остерегайтесь таких вещей, как режим вычисления в Linux)
  2. Приложение запускает новый поток хоста для каждого графического процессора, передавая идентификатор графического процессора. Каждый поток неявно / явно вызывает эквивалент cuCtxCreate(), передавая идентификатор GPU, который ему был присвоен
  3. Прибыль!

В Python это может выглядеть примерно так:

 import threading
from pycuda import driver

class gpuThread(threading.Thread):
    def __init__(self, gpuid):
        threading.Thread.__init__(self)
        self.ctx  = driver.Device(gpuid).make_context()
        self.device = self.ctx.get_device()

    def run(self):
        print "%s has device %s, api version %s"  
             % (self.getName(), self.device.name(), self.ctx.get_api_version())
        # Profit!

    def join(self):
        self.ctx.detach()
        threading.Thread.join(self)

driver.init()
ngpus = driver.Device.count()
for i in range(ngpus):
    t = gpuThread(i)
    t.start()
    t.join()
  

Это предполагает, что безопасно просто установить контекст без какой-либо предварительной проверки устройства. В идеале вы должны проверить режим вычисления, чтобы убедиться, что это безопасно, а затем использовать обработчик исключений в случае, если устройство занято. Но, надеюсь, это дает основную идею.

Комментарии:

1. @talonmies как всегда, спасибо, но быстрый запрос: если я правильно понимаю, каждый поток «создается», выполняется и объединяется в строку. Не приводит ли это к последовательному выполнению? Я предполагаю, что простое решение — разбить t.join() s на отдельный цикл.

2. @Andrew Bolter: Да, я думаю, все методы start должны вызываться в цикле, а все объединения — позже. Мне тоже было немного интересно узнать о глобальной блокировке интерпретатора в этой ситуации… Должен признаться, я использовал mpi4py для своего многопроцессорного процессора python, у меня есть платформа pthreads, которую я использую и для мультипроцессорного процессора, но обычно только с C / C и Fortran.

3. @Andrew Bolter: Я только что добавил немного инструментария в модифицированную версию того кода, который я опубликовал, и я начинаю задаваться вопросом о разумности использования потоковой обработки python для этого. Я бы не хотел делать ставку на правильность того, что я опубликовал на этом этапе….

4. Я подозреваю, что собираюсь реорганизовать проблему с целью перехода на MPI, но мне кажется, что это должно быть более тривиальным. Кроме того, чтобы обойти недостатки потоковой обработки, я также рассматривал вместо этого многопроцессорную обработку.

5. Кроме того, я не совсем понимаю ваш комментарий «до 4.0», поскольку, насколько я понял, предыдущая операция с несколькими устройствами, соответствующая контексту, все еще поддерживалась?

Ответ №2:

Что вам нужно, так это многопоточная реализация map встроенной функции. Вот одна из реализаций. Это, с небольшой модификацией в соответствии с вашими конкретными потребностями, вы получаете:

 import threading

def cuda_map(args_list, gpu_instances):

    result = [None] * len(args_list)

    def task_wrapper(gpu_instance, task_indices):
        for i in task_indices:
            result[i] = gpu_instance.gpufunction(args_list[i])

    threads = [threading.Thread(
                    target=task_wrapper, 
                    args=(gpu_i, list(xrange(len(args_list)))[i::len(gpu_instances)])
              ) for i, gpu_i in enumerate(gpu_instances)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()

    return result
  

Это более или менее то же самое, что у вас есть выше, с большой разницей в том, что вы не тратите время на ожидание каждого отдельного завершения gpufunction .

Комментарии:

1. Спасибо за ваш комментарий, и он помог мне найти решение, но он столкнулся с проблемами, связанными с CUDA, в отношении контекстов устройств. Обновляю вопрос, чтобы отразить это сейчас