многопроцессорная очередь python с бесконечным циклом

#python #multiprocessin& #queue #infinite-loop

#python #многопроцессорная обработка #очередь #бесконечный цикл

Вопрос:

Я создаю 3 процесса и хочу, чтобы функция wirte1 записывала значения ‘A, B, C’ в очередь 1, а функция read1 считывала значение из очереди 1 и помещала его в очередь 2 , в то же время функция read2 считывала значение из очереди 2, но значения B, C не могут считываться из очереди 2 вовремя, и процесс завершен.

 from multiprocessin& import Process, Queue,Mana&er,Pool,Lock
import os, time, random

#向队列1写数据
def write1(q1,lock):
    lock.acquire()
    for value in ['A', 'B', 'C']:
        print ('Put %s to queue111...%s' % (value,str(os.&etpid())))
        q1.put(value)
        time.sleep(1)
    lock.release()

#从队列1读取数据并写入队列2
def read1(q1,q2,lock):
    lock.acquire()
    while True:
        time.sleep(1)
        value=q1.&et()
        # if value is None:break
        print('Get %s from queue111.%s' % (value,str(os.&etpid())))
        q2.put(value)
        print('Put %s to queue222...%s' % (value,str(os.&etpid())))
    lock.release()

def read2(q2,lock):
    lock.acquire()
    while True:
        # if not q2.empty() or not q1.empty():
        time.sleep(2)
        value=q2.&et(True)
        print('Get %s from queue222.%s' % (value,os.&etpid()))
    lock.release()

if __name__=='__main__':
    mana&er = Mana&er()
    # 父进程创建Queue,并传给各个子进程:
    q1 = mana&er.Queue()
    q2 = mana&er.Queue()
    lock1 = mana&er.Lock()
    lock2 = mana&er.Lock()
    lock3 = mana&er.Lock()
    start=time.time()
    p = Pool()
    # pw = p.apply_async(write1, ar&s=(q1,lock1,))
    pw = Process(tar&et=write1,ar&s=(q1,lock1,))
    # time.sleep(0.5)
    # pr = p.apply_async(read1, ar&s=(q1,q2,lock2,))
    # pr2 = p.apply_async(read2, ar&s=(q2,lock3))
    pr=Process(tar&et=read1,ar&s=(q1,q2,lock2,))
    pr2 = Process(tar&et=read2,ar&s=(q2,lock3,))

    pw.start()
    pr.start()
    pr2.start()

    # p.close()
    # p.join()
    pw.join()
    pr.terminate()
    pr2.terminate()


    end=time.time()
    # print
    print('finished!!')
    print(end-start)
  

результатом является:
Put A to queue111...77678 Put B to queue111...77678 Get A from queue111.77680 Put A to queue222...77680 Put C to queue111...77678 Get A from queue222.77681 Get B from queue111.77680 Put B to queue222...77680 Get C from queue111.77680 Put C to queue222...77680 finished!! 3.025238275527954

Комментарии:

1. любой может помочь, пожалуйста!

Ответ №1:

Вы не можете использовать terminate для управления подобной системой: это ускоряет выполнение фактической работы. Вместо этого сделайте так, чтобы ваши циклы не были бесконечными, возможно, используя контрольное значение в каждом Queue (как в одной закомментированной строке).

Комментарии:

1. Я пытался использовать break в цикле whileTrue, но он по-прежнему не может вывести правильный результат

2. @josephyu: Не видя (нового) кода, никто не сможет помочь с этим.