Многопоточность Python с запросами

#python #multithreading #request #python-multithreading

Вопрос:

у меня есть один скребок, который инициирует сеанс «запросов» и извлекает некоторые данные, используя IPV6, сейчас у меня есть список ip-адресов 10000, я подготовил его с помощью многопоточности, но он выдает ошибку. Нужна поддержка, чтобы выяснить проблему.

 import requests, queue,threading, urllib3,jso,pandas as pd, os, time, datetime,inspect
num_threads = 2
root = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
with open (root  "/ip_list.txt") as ips:
    device_ip = list(ips)
class Writer_Worker(threading.Thread):
    def __init__(self, queue, df, *args, **kwargs):
        if not queue:
            print("Device Queue not specified")
            exit(1)
        self.out_q = queue
        self.df = df
        super().__init__(*args, **kwargs)
    def run(self):
        while True:
            try:
                device_details = self.out_q.get(timeout=3)
            except queue.Empty:
                return
            self.df[device_details[0]] = device_details
            self.out_q.task_done()
class Worker(threading.Thread):
    def __init__(self, queue, out_queue, device_password, *args, **kwargs):
        if not queue:
            print("Device Queue not specified")
            exit(1)
        self.queue = queue
        self.pas = device_password
        self.out_q = out_queue
        super().__init__(*args, **kwargs)
    def run(self):
        while True:
            try:
                device_ip = self.queue.get(timeout=3)
            except queue.Empty:
                return
            self.connect_to_device_and_process(device_ip)
            self.queue.task_done()
    def connect_to_device_and_process(self, device_ip):
        st = str("Online")
        try:
            r = requests.post("https://[" device_ip "]/?q=index.loginamp;mimosa_ajax=1", {"username":"configure", "password":self.pas}, verify=False)
        except requests.exceptions.ConnectionError:
                st = str("Offline")
                self.out_q.put([device_ip,st,"","","","","","","","","","","","","","","","","",""])
                return
        finally:
            if 'Online' in st:
                r = requests.get("https://[" device_ip "]/cgi/dashboard.php", cookies=r.cookies, verify=False)
                if "Response [401]" in str(r):
                    st2 = str("Password Error")
                    self.out_q.put([device_ip,st2,"","","","","","","","","","","","","","","","","",""])
                else:
                    data = json.loads(r.content.decode())
                    output5 = data ['config'] ['Spectrum_Power']
                    self.out_q.put([device_ip,st,output5['Auto_Power'].replace('2', 'Max Power').replace('1', 'Min Power').replace('0', 'off'),output5['AutoConfig']])
def main():
    start = time.time()
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    pas = input("Enter Device Password:")
    df =pd.DataFrame(columns = ["IP","Status","Auto_Power","AutoChannel"])
    q = queue.Queue(len(device_ip))
    for ip in device_ip:
        q.put_nowait(ip)
    out_q = queue.Queue(len(device_ip))
    Writer_Worker(out_q, df).start()
    for _ in range(num_threads):
        Worker(q, out_q, pas).start()
    q.join()
    print(df)
    df.to_excel('iBridge_C5x_Audit_Report.xlsx', sheet_name='Detail', index = False)
    
if __name__ == "__main__":
    main()
 

ниже приведена ошибка при запуске скрипта, из-за которой я не могу войти на это устройство.
Любая помощь будет ощутима.

введите описание изображения здесь

Комментарии:

1. вы str2 = str("Password Error") ошибаетесь»,» Ошибка пароля » — это уже строка

2. @Theshape да, напиши, исправлю, моя главная проблема в том, как сделать его многопоточным.

Ответ №1:

Вы должны использовать пул потоков, который распределяет работу между фиксированным количеством потоков. Это основная функция Python начиная с версии 3.2.

  1. from concurrent.futures import ThreadPoolExecutor
  2. Определите функцию perform(ip) , которая выполняет запрос для одного ip-адреса
  3. Установите переменную numThreads на количество желаемых потоков
  4. Запустите исполнителя пула потоков:
 print(f'Using {numThreads} threads')
with ThreadPoolExecutor(max_workers=numThreads) as pool:
    success = all(pool.map(perform, ips))
 

Источник: https://docs.python.org/3/library/concurrent.futures.html

На этой странице вы найдете пример, еще лучше адаптированный к вашему приложению: https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor-example

Комментарии:

1. Спасибо за помощь, но я не могу реализовать это в соответствии с моими требованиями. если возможно, вы можете изменить мой код. Заранее спасибо. Я новичок.

Ответ №2:

от нарезания импортной нити

th = Поток(цель=self.fill_imdb, args=(movies_info_part, «поток» str(количество))) th.начало()

fill_imdb-это мой метод