Многопоточный сервер python (mp.Pool) с очередью задач

#python #multithreading #server #queue #pool

#python #многопоточность #сервер #очередь #Бассейн

Вопрос:

Итак, я пишу бесплатный сервер задач python для Autodesk Maya, который содержит очередь из x числа «рабочих». В любой момент сервер может принять «задачу» и поместить эту задачу в очередь, через которую проходят рабочие.

Из очереди каждый рабочий получает «taskDict», который представляет собой словарь, отправляемый на сервер, в котором указывается, где находится файл Maya, и какой код запускать, когда мы открываем приложение Maya без головы (mayapy.exe / standalone)

Я переписывал это много раз, сначала используя свою собственную систему очередей, но потом решил использовать python. Далее, используя пул, используя очередь.Очередь, используя mp.Manager.Очередь и пул и т. Д. Мне трудно найти какие-либо примеры простого многопоточного сервера, который получает информацию и запускает поток, но использует очередь, когда получает слишком много запросов.

Я просто принципиально не понимаю, как размещать информацию в очереди, и чтобы mp.pool перемещался по очереди, запуская процессы apply_async, которые используют эти данные, и сообщая очереди, когда она завершена.

Вот текущее состояние кода:

 import tempfile
import os
import subprocess
import threading
import multiprocessing as mp
import socket
import sys

from PySide import QtGui, QtCore

import serverUtils

selfDirectory = os.path.dirname(__file__)
uiFile = selfDirectory   '/server.ui'
if os.path.isfile(uiFile):
    form_class, base_class = serverUtils.loadUiType(uiFile)
else:
    print('Cannot find UI file: '   uiFile)


def show():
    global mayaTaskServerWindow
    try:
        mayaTaskServerWindow.close()
    except:
        pass

        mayaTaskServerWindow = mayaTaskServer()
        mayaTaskServerWindow.show()
    return mayaTaskServerWindow

class MayaTaskServer(base_class, form_class):
    refreshSignal = QtCore.Signal()

    def __init__(self):
        super(MayaTaskServer, self).__init__()

        self.setupUi(self)

        self.mainJobServer = None
        self.mpPool = None
        self.manager = None
        self.q = None

        self.workerDict = {}

        self.refreshSignal.connect(self.refreshTree)
        self.startLocalCoresBTN.clicked.connect(self.startLocalCoresFn)
        self.killLocalCoresBTN.clicked.connect(self.killLocalCoresFn)
        self.jobTree.setContextMenuPolicy(QtCore.Qt.CustomContextMenu)
        self.jobTree.customContextMenuRequested.connect(self.openMenu)

        self.startJobServer(6006)
        self.startQueue()

        # set the default temp folder
        filepath = os.path.realpath(__file__)
        self.localTempFolderEDT.setText(filepath.replace(filepath.split('\')[-1], ''))

    ## JOB SYSTEM
    ####################################################################

    class MayaWorker(object):
        def __init__(self, host, port, cpuID):
            self.host = host
            self.port = port
            self.location = None
            self.cpuID = cpuID

            self.location = self.host

            self.busy = False
            self.task = None
            self.taskHistory = {}

        def runTask(self, task):
            print 'starting task - ', self.task['description']
            self.busy = True
            serverUtils.spawnMaya(task)
            win.refreshSignal.emit()

        def taskComplete(self, arg):
            self.busy = False
            self.task = None
            self.mayaFile = None
            win.refreshSignal.emit()

    def bootUpLocalWorkers(self, numProcs):
        self.mpPool = mp.Pool(processes=numProcs)
        for i in range(0, numProcs):
            mw = self.MayaWorker('localhost', 6006, i)
            win.mpPool.apply_async(mw, args=(win.q))
            win.workerDict['CPU_'   str(i)] = mw

    ## USER INTERFACE
    ####################################################################

    #UI code here you don't care about

    ## JOB LISTENER / SERVER / QUEUE
    ####################################################################
    class JobServer(threading.Thread):
        def __init__(self, port):
            threading.Thread.__init__(self)
            self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.server_socket.bind(('localhost', port))
            self.server_socket.listen(5)

            self.port = port
            self.running = True

            self.mpPool = None

        def addToQueue(self, task):
            #add to queue
            win.q.put(task, timeout=1000)

            #update UI
            wid1 = QtGui.QTreeWidgetItem()
            wid1.setText(0, str(task))
            win.queTree.addTopLevelItem(wid1)

        def run(self, debug=1):
            print 'Starting Task Server @'   socket.gethostbyname(socket.gethostname())   ':'   str(self.port)
            while self.running:
                client_socket, address = self.server_socket.accept()
                ip = str(address[0])
                data = client_socket.recv(512)
                if 'runTask' in data:
                    taskDict = eval(data.split(' >> ')[-1])
                    print 'SERVER>> Received task:', str(taskDict)
                    self.addToQueue(taskDict)

    class TaskQueueServer(threading.Thread):
        def __init__(self):
            q = self.q_in
            while True:
                if self.q_in:
                    worker = win.findLocalWorker()
                    if worker:
                        taskDict = self.q_in[0]
                        worker.task = taskDict
                        worker.startTask()
                        self.q_in.pop[0]


    def startJobServer(self, port):
        self.mainJobServer = self.JobServer(port)
        self.mainJobServer.start()

    def startQueue(self):
        self.manager = mp.Manager()
        self.q = self.manager.Queue()


if __name__ == "__main__":
    app = QtGui.QApplication(sys.argv)
    win = MayaTaskServer()
    win.show()
    sys.exit(app.exec_())
 

Ответ №1:

Итак, вот как я это сделал. Очень простое, прагматичное решение.

У меня есть метод под названием «finaLocalWorker», вы можете видеть, что рабочий класс может быть помечен как «занятый». Если рабочий не занят, ему отправляется входящая задача.

Если все работники заняты, то входящая задача добавляется в простой список под названием «self.q».

Когда рабочий завершает задачу, mpPool.apply_async выполняет обратный вызов, который запускает метод taskComplete. Этот метод говорит: «если self.q, возьмите элемент [0] из списка и извлеките (удалите) его. Иначе пометьте себя как не занятого «.

Это позволяет переполнять входящие запросы, такие как пакет из 500 анимаций, которые будут поставлены в очередь в списке задач, но при этом сервер по-прежнему может некоторое время не получать задач и немедленно работать над любой поступающей задачей.

Я выложу окончательный код на github.