#python #multithreading #server #queue #pool
#python #многопоточность #сервер #очередь #Бассейн
Вопрос:
Итак, я пишу бесплатный сервер задач python для Autodesk Maya, который содержит очередь из x числа «рабочих». В любой момент сервер может принять «задачу» и поместить эту задачу в очередь, через которую проходят рабочие.
Из очереди каждый рабочий получает «taskDict», который представляет собой словарь, отправляемый на сервер, в котором указывается, где находится файл Maya, и какой код запускать, когда мы открываем приложение Maya без головы (mayapy.exe / standalone)
Я переписывал это много раз, сначала используя свою собственную систему очередей, но потом решил использовать python. Далее, используя пул, используя очередь.Очередь, используя mp.Manager.Очередь и пул и т. Д. Мне трудно найти какие-либо примеры простого многопоточного сервера, который получает информацию и запускает поток, но использует очередь, когда получает слишком много запросов.
Я просто принципиально не понимаю, как размещать информацию в очереди, и чтобы mp.pool перемещался по очереди, запуская процессы apply_async, которые используют эти данные, и сообщая очереди, когда она завершена.
Вот текущее состояние кода:
import tempfile
import os
import subprocess
import threading
import multiprocessing as mp
import socket
import sys
from PySide import QtGui, QtCore
import serverUtils
selfDirectory = os.path.dirname(__file__)
uiFile = selfDirectory '/server.ui'
if os.path.isfile(uiFile):
form_class, base_class = serverUtils.loadUiType(uiFile)
else:
print('Cannot find UI file: ' uiFile)
def show():
global mayaTaskServerWindow
try:
mayaTaskServerWindow.close()
except:
pass
mayaTaskServerWindow = mayaTaskServer()
mayaTaskServerWindow.show()
return mayaTaskServerWindow
class MayaTaskServer(base_class, form_class):
refreshSignal = QtCore.Signal()
def __init__(self):
super(MayaTaskServer, self).__init__()
self.setupUi(self)
self.mainJobServer = None
self.mpPool = None
self.manager = None
self.q = None
self.workerDict = {}
self.refreshSignal.connect(self.refreshTree)
self.startLocalCoresBTN.clicked.connect(self.startLocalCoresFn)
self.killLocalCoresBTN.clicked.connect(self.killLocalCoresFn)
self.jobTree.setContextMenuPolicy(QtCore.Qt.CustomContextMenu)
self.jobTree.customContextMenuRequested.connect(self.openMenu)
self.startJobServer(6006)
self.startQueue()
# set the default temp folder
filepath = os.path.realpath(__file__)
self.localTempFolderEDT.setText(filepath.replace(filepath.split('\')[-1], ''))
## JOB SYSTEM
####################################################################
class MayaWorker(object):
def __init__(self, host, port, cpuID):
self.host = host
self.port = port
self.location = None
self.cpuID = cpuID
self.location = self.host
self.busy = False
self.task = None
self.taskHistory = {}
def runTask(self, task):
print 'starting task - ', self.task['description']
self.busy = True
serverUtils.spawnMaya(task)
win.refreshSignal.emit()
def taskComplete(self, arg):
self.busy = False
self.task = None
self.mayaFile = None
win.refreshSignal.emit()
def bootUpLocalWorkers(self, numProcs):
self.mpPool = mp.Pool(processes=numProcs)
for i in range(0, numProcs):
mw = self.MayaWorker('localhost', 6006, i)
win.mpPool.apply_async(mw, args=(win.q))
win.workerDict['CPU_' str(i)] = mw
## USER INTERFACE
####################################################################
#UI code here you don't care about
## JOB LISTENER / SERVER / QUEUE
####################################################################
class JobServer(threading.Thread):
def __init__(self, port):
threading.Thread.__init__(self)
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.bind(('localhost', port))
self.server_socket.listen(5)
self.port = port
self.running = True
self.mpPool = None
def addToQueue(self, task):
#add to queue
win.q.put(task, timeout=1000)
#update UI
wid1 = QtGui.QTreeWidgetItem()
wid1.setText(0, str(task))
win.queTree.addTopLevelItem(wid1)
def run(self, debug=1):
print 'Starting Task Server @' socket.gethostbyname(socket.gethostname()) ':' str(self.port)
while self.running:
client_socket, address = self.server_socket.accept()
ip = str(address[0])
data = client_socket.recv(512)
if 'runTask' in data:
taskDict = eval(data.split(' >> ')[-1])
print 'SERVER>> Received task:', str(taskDict)
self.addToQueue(taskDict)
class TaskQueueServer(threading.Thread):
def __init__(self):
q = self.q_in
while True:
if self.q_in:
worker = win.findLocalWorker()
if worker:
taskDict = self.q_in[0]
worker.task = taskDict
worker.startTask()
self.q_in.pop[0]
def startJobServer(self, port):
self.mainJobServer = self.JobServer(port)
self.mainJobServer.start()
def startQueue(self):
self.manager = mp.Manager()
self.q = self.manager.Queue()
if __name__ == "__main__":
app = QtGui.QApplication(sys.argv)
win = MayaTaskServer()
win.show()
sys.exit(app.exec_())
Ответ №1:
Итак, вот как я это сделал. Очень простое, прагматичное решение.
У меня есть метод под названием «finaLocalWorker», вы можете видеть, что рабочий класс может быть помечен как «занятый». Если рабочий не занят, ему отправляется входящая задача.
Если все работники заняты, то входящая задача добавляется в простой список под названием «self.q».
Когда рабочий завершает задачу, mpPool.apply_async выполняет обратный вызов, который запускает метод taskComplete. Этот метод говорит: «если self.q, возьмите элемент [0] из списка и извлеките (удалите) его. Иначе пометьте себя как не занятого «.
Это позволяет переполнять входящие запросы, такие как пакет из 500 анимаций, которые будут поставлены в очередь в списке задач, но при этом сервер по-прежнему может некоторое время не получать задач и немедленно работать над любой поступающей задачей.
Я выложу окончательный код на github.