Бесконечный цикл автоматически завершается при использовании многопоточности и очередей

#python #multithreading

#python #многопоточность

Вопрос:

Я пытаюсь запустить функцию, которая имеет бесконечный цикл (для проверки данных после нескольких секунд задержки), используя многопоточность. Поскольку я считываю данные из файла csv, я также использую очереди.

Моя текущая функция работает нормально, когда я не использую многопоточность / очереди, но когда я их использую, функция выполняется только один раз, а затем останавливается.

Вот моя функция, которая имеет бесконечный цикл. Пожалуйста, обратите внимание, что первый цикл while True предназначен для потоков (в случае, если я использую меньшее количество потоков, чем строк в csv), для функции требуется только второй цикл while True .

 def doWork(q):
    while True:
        #logging.info('Thread Started')
        row=q.get()

        url = row[0]
        target_price = row[1]
        #logging.info('line 79')

        while True:
            delay=randint(5,10)
            headers = {'User-Agent': generate_user_agent()}
            print datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S') ': ' 'Sleeping for '   str(delay)   ' seconds'
            #logging.info('line 81')
            eventlet.sleep(delay)
            try:
                #logging.info('line 85')
                with requests.Session() as s:
                    #logging.info('line 87')
                    with eventlet.Timeout(10, False):
                        page = s.get(url,headers=headers,proxies=proxyDict,verify=False)
                    #logging.info('line 89')
                    tree = html.fromstring(page.content)
                    #logging.info('line 91')
                    price = tree.xpath('//div[@class="a-row a-spacing-mini olpOffer"]/div[@class="a-column a-span2 olpPriceColumn"]/span[@class="a-size-large a-color-price olpOfferPrice a-text-bold"]/text()')[0]
                    title = tree.xpath('//h1/text()')[0]
                    #logging.info('line 93')
                    new_price = re.findall("[- ]?d [.]?d [eE]?[- ]?d*", price)[0]
                    #logging.info('line 95')
                    old_price = new_price
                    #logging.info('line 97')
                    #print price
                    print new_price
                    print title   'Current price:'   new_price
                    if float(new_price)<float(target_price):
                        print 'Lower price found!'
                        mydriver = webdriver.Chrome()
                        send_simple_message()
                        login(mydriver)
                        print 'Old Price: '   old_price
                        print 'New Price: '   new_price
                    else:
                        print 'Trying again'
                q.task_done()   
            except Exception as e:
                print e
                print 'Error!'
                q.task_done()
  

И вот моя функция драйвера потока;

 q = Queue(concurrent * 2)

if __name__ == "__main__":

    for i in range(concurrent):
        t = Thread(target=doWork,args=(q,))
        t.daemon = True
        t.start()
    try:
        with open('products.csv','r') as f:
            reader = csv.reader(f.read().splitlines())
            for row in reader:
                q.put((row[0],row[1]))
        q.join()
    except KeyboardInterrupt:
        sys.exit(1) 
  

Ответ №1:

Для тех, кто сталкивается с такой же проблемой, вот как я ее решил.

Я удалил q.task_done() из цикла while и поместил его за пределы цикла while. Это работает так, как задумано, но я не уверен, что это правильный подход.