Является ли этот параллельный код, использующий процессы, не являющиеся по существу последовательными — многопроцессорная обработка Pyhon?

#python #python-3.x #multithreading #multiprocessing

#python #python-3.x #многопоточность #многопроцессорность

Вопрос:

Пытаясь понять параллелизм с многопроцессорной обработкой, я написал ниже код, который разделяет чтение и запись в Queue между несколькими процессами:

 import time
from multiprocessing import Process, Queue, Pool
class QueueFun():

    def writing_queue(self, work_tasks):
        while True:
            print("Writing to queue")
            work_tasks.put(1)
            time.sleep(.5)

    def read_queue(self, work_tasks):
        while True:
            print('Reading from queue')
            work_tasks.get()
            time.sleep(.5)


if __name__ == '__main__':
    q = QueueFun()
    work_tasks = Queue()
    processes = []


    write_processes = []
    read_processes = []
    for i in range(0, 3):
        write_processes.append(Process(target=q.writing_queue,
                                 args=(work_tasks,)))
    for i in range(0, 3):
        read_processes.append(Process(target=q.read_queue,
                                 args=(work_tasks,)))

    for p in write_processes:
        p.start()
    for p in read_processes:
        p.start()

    print('Joining write_processes')
    for p in write_processes:
        print('Joining thread' , p)
        p.join()
    print('Joining read_processes')
    for p in read_processes:
        p.join()
  

производит:

 Joining write_processes
Joining thread <Process name='Process-1' pid=2432 parent=2430 started>
Writing to queue
Reading from queue
Writing to queue
Writing to queue
Reading from queue
Reading from queue
  

Прежде чем я вручную завершу поток.

Насколько я понимаю, join() это t.join() приводит к тому, что основной поток ожидает t завершения. Следовательно, это не по существу последовательный вызов, поскольку мне нужно дождаться завершения каждого потока перед вызовом следующего потока. Я стремлюсь выполнять каждый из этих процессов параллельно, но, похоже, они выполняются последовательно? Как запускать списки write_processes и read_processes одновременно?

Обновить:

Код для print('Joining read_processes') не обрабатывается, поскольку ‘Joining read_processes’ не выводится на консоль. Но «Чтение из очереди» выводится на консоль, поэтому запускается функция read_queue . Почему код print('Joining read_processes') пропускается? Следовательно, поскольку t.join () заставляет основной поток ждать завершения t.join () блокирует основной поток, это не блокирует запуск других потоков?

Комментарии:

1. Обратите внимание, что очереди по своей сути предназначены для «упорядочивания» параллельных действий, что должно быть очевидно из их названия. Если вы хотите увидеть параллелизм, код должен фактически выполнять что-то параллельное. Это означает значительную часть независимых действий и лишь небольшую синхронизацию.

2. @MisterMiyagi в приведенном выше примере мне не нужно «print(‘Joining read_processes’) для p в read_processes: p.join()», поскольку основной поток уже заблокирован во время «для p в write_processes: цикл» и, соответственно, мне просто нужно заблокировать один из потоков, а не все 6?

3. Вам не нужно блокировать какие-либо потоки (фактически, процессы в данном случае). Вам нужен только .join поток / процесс, который, как вы знаете, завершится сам по себе . Оба writing_queue и read_queue являются бесконечными циклами, которые никогда не завершатся, нет смысла их ждать.

4. Нет, почему это должно останавливать запуск других потоков? Ваша программа запускает потоки, затем она ожидает их завершения.

Ответ №1:

Thread.join() — это блокирующий вызов (для вызывающего потока), который ожидает завершения потока, прежде чем продолжить. Таким образом, каждая задача, запущенная после join(), будет последовательной для объединенных потоков. В вашем примере это не делает ваши процессы последовательными. Ваши процессы запускаются один за другим, что нормально, но это не значит, что они не выполняются параллельно

Комментарии:

1. «Thread.join() является блокирующим вызовом для ожидания завершения всех потоков перед продолжением». Я думаю, что это неверно, как указано в документах: «Блокируйте вызывающий поток до тех пор, пока процесс, чей метод join() вызывается, не завершится или пока не наступит необязательный тайм-аут». (src: docs.python.org/2/library/multiprocessing.html ) похоже, что вызывающий поток блокирует выполнение всех других потоков до завершения вызывающего потока.

2. нет, он не блокирует потоки, он блокирует основной поток (откуда вы вызываете join() до тех пор, пока присоединенные потоки не завершатся. Я отредактировал ответ для большей ясности

3. @Adrian Как говорится в документах, join «блокирует вызывающий поток » (тот, который выполняется join() ), а не какие-либо другие.