Многопроцессорность в Python: обработка нескольких рабочих потоков

#python #python-3.x #multithreading #python-multiprocessing

#python #python-3.x #многопоточность #python-многопроцессорность

Вопрос:

В моем коде мне нужно иметь несколько экземпляров рабочих потоков, запущенных в программе Python. Сначала я создаю несколько экземпляров рабочего потока (скажем, 10), а затем добавляю их в пул. Всякий раз, когда клиент запрашивает службу, поток должен быть вызван и зарезервирован для клиента. После завершения задачи поток должен быть добавлен обратно в пул.

На данный момент я написал следующий код. Но я не уверен в том, как вечно запускать потоки в пуле (они должны находиться в режиме ожидания, когда находятся внутри пула), вызывать и получать сервис всякий раз, когда это необходимо, и добавлять их обратно в пул (должны снова перейти в режим ожидания) после обработки. Будем признательны за любую помощь.

 PRED = Queue(10)

class Worker(threading.Thread):
    def __init__(self, threadID, name):
        threading.Thread.__init__(self)
        self.threadID  =threadID
        self.name = name

    def run(self):
        print("starting "   self.name   " thread")
        while True:
            ??
        print("Exiting "   self.name   " thread")


def work():
    print("working")
    time.sleep(3)
  
  • Допустим, рабочие потоки находятся в очереди PRED.
  • work() — это метод, который я должен вызывать для обслуживания клиента.

Ответ №1:

Вот что я извлек из документации Python

подробнее:https://docs.python.org/3/library/queue.html#queue .Очередь.Присоединиться

Убедитесь, что вы хорошо прочитали это, есть несколько интересных опций, таких как создание очереди приоритетов или первый вход первым выходом, или последний вход первым выходом.

 import queue
import threading
import time


# The queue for tasks
q = queue.Queue()


# Worker, handles each task
def worker():
    while True:
        item = q.get()
        if item is None:
            break
        print("Working on", item)
        time.sleep(1)
        q.task_done()


def start_workers(worker_pool=1000):
    threads = []
    for i in range(worker_pool):
        t = threading.Thread(target=worker)
        t.start()
        threads.append(t)
    return threads


def stop_workers(threads):
    # stop workers
    for i in threads:
        q.put(None)
    for t in threads:
        t.join()


def create_queue(task_items):
    for item in task_items:
        q.put(item)


if __name__ == "__main__":
    # Dummy tasks
    tasks = [item for item in range(1000)]

    # Start up your workers
    workers = start_workers(worker_pool=10)
    create_queue(tasks)

    # Blocks until all tasks are complete
    q.join()

    stop_workers(workers)
  

Комментарии:

1. Хороший пример! Вам следует подумать о том, чтобы сделать их демонами на случай, если они не будут завершены должным образом (т.daemon = True). По моему опыту, это также помогает назначать рабочих для отладки.

Ответ №2:

Используйте очередь задач для отправки задач своим работникам. Заставьте ваших рабочих прослушивать очередь задач и ждать, если она пуста. Когда рабочий получает задачу из очереди, он должен выполнить ее, а затем вернуться к опросу очереди. Довольно стандартный рабочий шаблон.

Когда я говорю «задача», я имею в виду, что вы можете поместить фактический метод в очередь. Рабочий может просто взять его и выполнить.

Комментарии:

1. Да. Этот шаблон в порядке. Но не могли бы вы предоставить небольшой код о том, как его архивировать. Я не совсем понимаю, как выполнять часть прослушивания в Python

2. Это здорово. Спасибо