Запуск многопоточного кода после Queue.task_done()

#python #mysql #queue #python-multithreading

#python #mysql #очередь #python-многопоточность

Вопрос:

В классическом «потоковом / очередном» приложении. Мне нужно выполнить дальнейшие вычисления в моей «потребительской» функции. После того, как очередь пуста, дальнейший код после urls.task_done() не выполняется.

Я импортирую рыночные данные из JSON api и импортирую их в свою базу данных MariaDB. В API каждый элемент, который я хочу получить, имеет собственный URL-адрес, поэтому я создаю очередь для всех доступных URL-адресов в функции. «Потребительская» функция обрабатывает очередь, импортируя новый набор данных или обновляя существующую запись в зависимости от уже существующих данных в моей базе данных. Я уже пытался обернуть фактический цикл while True в его собственную функцию, но у меня это не сработало.

 def create_url():
    try:
        mariadb_connection = mariadb.connect(host='host
                                             database='db',
                                             user='user',                                             
                                           password='pw')

        cursor = mariadb_connection.cursor()

        cursor.execute('SELECT type_id from tbl_items')
        item_list = cursor.fetchall()
        print("Create URL - Record retrieved successfully")

        for row in item_list:

            url = 'https://someinternet.com/type_id='   
                str(row[0])
            urls.put(url)

        return urls

    except mariadb.Error as error:
        mariadb_connection.rollback()  
        print("Failed retrieving itemtypes from tbl_items table 
        {}".format(error))

    finally:
        if mariadb_connection.is_connected():
            cursor.close()
            mariadb_connection.close()

def import(urls):
    list_mo_esi = []
    try:
        mariadb_connection = mariadb.connect(host='host',
                                             database='db',
                                             user='user',
                                             password='pw')

        cursor = mariadb_connection.cursor()

        while True:
            s = requests.Session()
            retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
            s.mount('https://', HTTPAdapter(max_retries=retries))
            jsonraw = s.get(urls.get())
            jsondata = ujson.loads(jsonraw.text)

            for row in jsondata:
                cursor.execute('SELECT order_id from tbl_mo WHERE order_id = %s',
                               (row['order_id'], ))
                exists_mo = cursor.fetchall()
                list_mo_esi.append(row['order_id'])

                if len(exists_mo) != 0:
                    print("updating order#", row['order_id'])
                    cursor.execute('UPDATE tbl_mo SET volume = %s, price = %s WHERE order_id = %s',
                                   (row['volume_remain'], row['price'], row['order_id'], ))
                    mariadb_connection.commit()
                else:
                        cursor.execute('INSERT INTO tbl_mo (type_id, order_id, ordertype,volume, price) VALUES (%s,%s,%s,%s,%s)',
                                       (row['type_id'], row['order_id'], row['is_buy_order'], row['volume_remain'], row['price'], ))
                        mariadb_connection.commit()

            urls.task_done()

    except mariadb.Error as error:
        mariadb_connection.rollback()  
        print("Failed retrieving itemtypes from tbl_items table {}".format(error))
  

Следующая, наконец, часть моей функции не выполняется, но должна.

     finally:
        list_mo_purge = list(set(list_mo_sql)-set(list_mo_esi))
        cursor.execute('SELECT order_id FROM tbl_mo')
        list_mo_sql = cursor.fetchall()
        print(len(list_mo_esi))
        print(len(list_mo_sql))

        if mariadb_connection.is_connected():
            cursor.close()
            mariadb_connection.close()
  

основной поток

 for i in range(num_threads):
    worker = Thread(target=import_mo, args=(urls,))
    worker.setDaemon(True)
    worker.start()

create_url()

urls.join()
  

После завершения всех задач мой рабочий перестает выполнять код сразу после urls.task_done().
Тем не менее, у меня есть еще немного кода после URL-адреса функции.task_done() мне нужно выполнить для закрытия подключения к базе данных и очистки моей базы данных от старых записей. Как я могу выполнить эту «наконец»-часть?

Комментарии:

1. Является mariadb.Error ли поднятым?

2. Нет, это не так. Поток просто останавливается нормально без сообщения об ошибке.

Ответ №1:

Вы не нарушаете время.

Вы должны сделать следующее:

 if urls.empty():
    break
  

Скорее всего, ваш import поток блокируется при urls.get()

Комментарии:

1. Это сделало свое дело. Теперь мой код выполняется после urls.task_done() . Что касается блока импорта: да, я заметил это, и теперь я вызываю create_url() перед запуском рабочих потоков. Однако теперь я получаю некоторые сообщения об ошибках при увеличении количества потоков выше 1. Придется вникать в это. Спасибо!