Сравнение результатов многопроцессорной обработки с использованием queue (Python)

#python #multiprocessing

#python #многопроцессорная обработка

Вопрос:

Мне нужно запускать command1 и command2 одновременно (параллельно), пока их выходные данные не совпадут. Например, цикл while в consumer должен прерваться на третьей итерации, где command1 = 9 и command2 = 9.
У меня есть приведенный выше код с использованием многопроцессорной обработки. Что я делаю не так? Любая помощь приветствуется.

РЕДАКТИРОВАТЬ: я воспользовался приведенным здесь предложением создать два пула, но вывод выполняется в бесконечном цикле. Я получаю следующий вывод

результаты совпадают

результаты совпадают

результаты совпадают

Кто-нибудь может объяснить, как здесь происходит поток данных?

 import multiprocessing 


def producer1(q1):
    a = [1,2,3,4,5]

    for i in a:
        command1 = i*i 
        q1.put(command1)

def producer2(q2):
    b = [6,7,3,4,5]
    
    for j in b:
        command2 = j*j 
        q2.put(command2)
        
def consumer(q1,q2):
    result1 = q1.get()
    result2= q2.get()
    while True:
        if (result1==result2):
            print("results match")
            break

q1 = multiprocessing.Queue()
q2 = multiprocessing.Queue()

for _ in range(5):
    p1 = multiprocessing.Process(target=consumer, args = [q1,q2])
    
    p1.start()
    producer1(q1)
    producer2(q2)
    print(q1)

p1.join()
 

Комментарии:

1. Элементы помещаются в очередь по порядку . Таким образом, элементы в нем будут: [1, 4, 9, 16, 25, 36, 49, 9, 16, 25]. Ваш тест consumer не может быть выполнен успешно.

2. каждая команда должна иметь собственную очередь, и тогда вы можете получить command1 = 9 и command2 = 9 одновременно. Или вам придется запускать обе команды в одном цикле — и запускать q.put(command2) сразу после q.put(command1) , но таким образом вы не сможете запускать команды в отдельных процессах. Вы должны использовать два запроса.

3. Это имеет смысл иметь отдельные очереди для каждой команды. Но это означает, что мне нужно иметь два разных процесса в двух разных функциях?

Ответ №1:

У вас бесконечный цикл, потому что вы читаете из очередей вне бесконечного цикла и никогда не переназначаете result1 / result2 . Вы хотите:

 def consumer(q1,q2):
    while True:
        result1 = q1.get()  # Reading inside the loop
        result2 = q2.get()
        if result1 == result2:
            print("results match")
            break
 

Другая проблема (не для корректности, а просто для фактического сравнения шаблонов) заключается в том, что ваши производители запускаются один за другим, поэтому на практике producer1 всегда должны выполняться до завершения (полного заполнения очереди) перед producer2 запуском и consumer зависают в ожидании producer2 запуска, он вообще не может начать осмысленную обработку, пока producer1 выполняетсявыполняется. Самый простой подход — поместить (по крайней мере) одного производителя в свой собственный Process :

 for _ in range(5):
    p1 = multiprocessing.Process(target=consumer, args = [q1,q2])
    multiprocessing.Process(target=producer1, args=(q1,), daemon=True).start()
    p1.start()

    producer2(q2)  # May as well let main process run this

p1.join()
 

Создание обоих производителей daemon Process позволило бы вашему основному процессу замкнуться; как только потребитель будет завершен, он может завершиться, и производители автоматически умрут (при producer2 запуске в основном процессе он должен завершиться, даже если consumer он выполнен).

И при дальнейшей проверке у вас возникает третья проблема. После первого цикла, если consumer не найдено совпадение (для реального кода я сомневаюсь, что у вас есть жестко запрограммированные записи с гарантированными совпадениями), вы запустите второй consumer , пока первый все еще работает, и теперь у вас есть условия гонки; первый потребитель может использовать все нечетные значения из q1 ичетные значения из q2 , а не из парных значений, в то время как второе получает другие, и даже если совпадение существует, вы его не найдете (и аналогично, может быть найдено несуществующее совпадение, если одни и те же значения встречаются непарными, но потребитель отключает синхронизацию между очередями и сопоставляет их).

На практике я бы, вероятно, предпочел запускать ваших производителей в рабочих процессах, а основной процесс действовать как единый потребитель, например:

 import multiprocessing 
import queue

def producer1(q1):
    a = [1,2,3,4,5]

    for i in a:
        command1 = i * i 
        q1.put(command1)

def producer2(q2):
    b = [6,7,3,4,5]
    
    for j in b:
        command2 = j * j 
        q2.put(command2)
        
def consumer(q1,q2):
    while True:
        result1 = q1.get()
        result2 = q2.get()
        if result1 == result2:
            print("results match")
            break


if __name__ == '__main__':  # Guard required for correctness when start method isn't fork
    q1 = multiprocessing.Queue()
    q2 = multiprocessing.Queue()

    for _ in range(5):
        p1 = multiprocessing.Process(target=producer1, args=(q1,))
        p2 = multiprocessing.Process(target=producer2, args=(q2,))
        p1.start()
        p2.start()
        try:
            consumer(q1, q2)
        finally:
            p1.join()
            p2.join()
        print(q1)
        # Optionally drain queues before next run so matches from last run not
        # processed by consumer
        for q in (q1, q2):
            try:
                while True:
                    q.get_nowait()
            except queue.Empty:
                pass
 

Комментарии:

1. Спасибо за комментарий и объяснение ShadowRanger. Я попробовал и запустил его, он работает только тогда, когда в двух списках есть совпадение. Если нет, то скрипт запускается в бесконечный цикл. Он застревает в этом цикле while

2. @hitman99: Ну, это неизбежно в вашем текущем дизайне. Вы использовали get вызовы, которые не используют тайм-ауты, поэтому они блокируются навсегда, когда у них заканчиваются входные данные, и даже если они этого не сделали, внутренний цикл бесконечен, за исключением совпадения. Итак, вам нужно решить, как с этим справиться. Вы могли бы использовать тайм-ауты, поэтому, если ни одна пустая очередь не получает новый элемент за определенное время, он повышается queue.Empty . Вы могли бы попросить производителей явно поместить объект sentinel в очередь (например None , если в противном случае значение недопустимо), когда они закончат, и consumer проверить наличие sentinel и break его видимость.

3. Дело в том, что вам нужно решить, что вы хотите сделать, чтобы исправить этот случай, и это должно быть что-то, что работает в параллельном контексте. Нет ни одного правильного ответа.