#python #multiprocessing
#python #многопроцессорная обработка
Вопрос:
Мне нужно запускать command1 и command2 одновременно (параллельно), пока их выходные данные не совпадут. Например, цикл while в consumer должен прерваться на третьей итерации, где command1 = 9 и command2 = 9.
У меня есть приведенный выше код с использованием многопроцессорной обработки. Что я делаю не так? Любая помощь приветствуется.
РЕДАКТИРОВАТЬ: я воспользовался приведенным здесь предложением создать два пула, но вывод выполняется в бесконечном цикле. Я получаю следующий вывод
результаты совпадают
результаты совпадают
результаты совпадают
Кто-нибудь может объяснить, как здесь происходит поток данных?
import multiprocessing
def producer1(q1):
a = [1,2,3,4,5]
for i in a:
command1 = i*i
q1.put(command1)
def producer2(q2):
b = [6,7,3,4,5]
for j in b:
command2 = j*j
q2.put(command2)
def consumer(q1,q2):
result1 = q1.get()
result2= q2.get()
while True:
if (result1==result2):
print("results match")
break
q1 = multiprocessing.Queue()
q2 = multiprocessing.Queue()
for _ in range(5):
p1 = multiprocessing.Process(target=consumer, args = [q1,q2])
p1.start()
producer1(q1)
producer2(q2)
print(q1)
p1.join()
Комментарии:
1. Элементы помещаются в очередь по порядку . Таким образом, элементы в нем будут: [1, 4, 9, 16, 25, 36, 49, 9, 16, 25]. Ваш тест
consumer
не может быть выполнен успешно.2. каждая команда должна иметь собственную очередь, и тогда вы можете получить
command1 = 9
иcommand2 = 9
одновременно. Или вам придется запускать обе команды в одном цикле — и запускатьq.put(command2)
сразу послеq.put(command1)
, но таким образом вы не сможете запускать команды в отдельных процессах. Вы должны использовать два запроса.3. Это имеет смысл иметь отдельные очереди для каждой команды. Но это означает, что мне нужно иметь два разных процесса в двух разных функциях?
Ответ №1:
У вас бесконечный цикл, потому что вы читаете из очередей вне бесконечного цикла и никогда не переназначаете result1
/ result2
. Вы хотите:
def consumer(q1,q2):
while True:
result1 = q1.get() # Reading inside the loop
result2 = q2.get()
if result1 == result2:
print("results match")
break
Другая проблема (не для корректности, а просто для фактического сравнения шаблонов) заключается в том, что ваши производители запускаются один за другим, поэтому на практике producer1
всегда должны выполняться до завершения (полного заполнения очереди) перед producer2
запуском и consumer
зависают в ожидании producer2
запуска, он вообще не может начать осмысленную обработку, пока producer1
выполняетсявыполняется. Самый простой подход — поместить (по крайней мере) одного производителя в свой собственный Process
:
for _ in range(5):
p1 = multiprocessing.Process(target=consumer, args = [q1,q2])
multiprocessing.Process(target=producer1, args=(q1,), daemon=True).start()
p1.start()
producer2(q2) # May as well let main process run this
p1.join()
Создание обоих производителей daemon
Process
позволило бы вашему основному процессу замкнуться; как только потребитель будет завершен, он может завершиться, и производители автоматически умрут (при producer2
запуске в основном процессе он должен завершиться, даже если consumer
он выполнен).
И при дальнейшей проверке у вас возникает третья проблема. После первого цикла, если consumer
не найдено совпадение (для реального кода я сомневаюсь, что у вас есть жестко запрограммированные записи с гарантированными совпадениями), вы запустите второй consumer
, пока первый все еще работает, и теперь у вас есть условия гонки; первый потребитель может использовать все нечетные значения из q1
ичетные значения из q2
, а не из парных значений, в то время как второе получает другие, и даже если совпадение существует, вы его не найдете (и аналогично, может быть найдено несуществующее совпадение, если одни и те же значения встречаются непарными, но потребитель отключает синхронизацию между очередями и сопоставляет их).
На практике я бы, вероятно, предпочел запускать ваших производителей в рабочих процессах, а основной процесс действовать как единый потребитель, например:
import multiprocessing
import queue
def producer1(q1):
a = [1,2,3,4,5]
for i in a:
command1 = i * i
q1.put(command1)
def producer2(q2):
b = [6,7,3,4,5]
for j in b:
command2 = j * j
q2.put(command2)
def consumer(q1,q2):
while True:
result1 = q1.get()
result2 = q2.get()
if result1 == result2:
print("results match")
break
if __name__ == '__main__': # Guard required for correctness when start method isn't fork
q1 = multiprocessing.Queue()
q2 = multiprocessing.Queue()
for _ in range(5):
p1 = multiprocessing.Process(target=producer1, args=(q1,))
p2 = multiprocessing.Process(target=producer2, args=(q2,))
p1.start()
p2.start()
try:
consumer(q1, q2)
finally:
p1.join()
p2.join()
print(q1)
# Optionally drain queues before next run so matches from last run not
# processed by consumer
for q in (q1, q2):
try:
while True:
q.get_nowait()
except queue.Empty:
pass
Комментарии:
1. Спасибо за комментарий и объяснение ShadowRanger. Я попробовал и запустил его, он работает только тогда, когда в двух списках есть совпадение. Если нет, то скрипт запускается в бесконечный цикл. Он застревает в этом цикле while
2. @hitman99: Ну, это неизбежно в вашем текущем дизайне. Вы использовали
get
вызовы, которые не используют тайм-ауты, поэтому они блокируются навсегда, когда у них заканчиваются входные данные, и даже если они этого не сделали, внутренний цикл бесконечен, за исключением совпадения. Итак, вам нужно решить, как с этим справиться. Вы могли бы использовать тайм-ауты, поэтому, если ни одна пустая очередь не получает новый элемент за определенное время, он повышаетсяqueue.Empty
. Вы могли бы попросить производителей явно поместить объект sentinel в очередь (напримерNone
, если в противном случае значение недопустимо), когда они закончат, иconsumer
проверить наличие sentinel иbreak
его видимость.3. Дело в том, что вам нужно решить, что вы хотите сделать, чтобы исправить этот случай, и это должно быть что-то, что работает в параллельном контексте. Нет ни одного правильного ответа.