Как я могу добавить многопоточность в свой код на Python?

#python #multithreading #python-multithreading

Вопрос:

Ниже приведена моя попытка создать проверку доступности имени пользователя с прокси-серверами, пока она работает по назначению, единственное, что она медленная, я пытался реализовать потоки, но ничем не отличается, так как я не уверен, правильно ли я это делаю или нет. использовались библиотеки параллельных.фьючерсов и потоков.

Есть ли лучший способ кодирования такого рода программ или есть какие-либо другие предложения? Заранее спасибо

 import requests
import json
import ctypes
import colorama
from colorama import Fore
from datetime import datetime
import os

os.system("cls")
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
colorama.init()
url = "https://link"

def grab_proxies():
    proxylist = []
    prx = open('proxy.txt','r')
    prx = prx.readlines()
    for proxy in prx:
        proxy = proxy.rstrip("n")
        proxylist.append(proxy)
    return proxylist
prlist = grab_proxies()

def grab_usernames():
    userlist = []
    users = open('userlist.txt','r')
    users = users.readlines()
    for user in users:
        user = user.rstrip("n")
        userlist.append(user)
    return userlist
ulist = grab_usernames()


found = 0
pc = 0
uc = 0
for i in range(0,len(prlist)):
    ctypes.windll.kernel32.SetConsoleTitleW(f"[# Checker] | Counter: %s - Found: %s - Current Proxy: %s - Started at: %s" % (i, found, prlist[pc], current_time))
    try:
        req = requests.post(url,headers=headers, data = {"requested_username": ulist[uc], "xsrf_token": "F0kpyvjJgeBtsOk5Gl6Jvg"},proxies={'http' : prlist[pc],'https': prlist[pc]}, timeout=2)
        response = req.json()
        #print(response,req.status_code)
        #print(response)
        #print(type(response))
        if(response['reference']['status_code'] == 'TAKEN'):
            #rd = response['errors']['username'][0]['code']
            print(f'{Fore.LIGHTBLACK_EX}[{Fore.LIGHTRED_EX}Taken{Fore.LIGHTBLACK_EX}]{Fore.LIGHTCYAN_EX} {ulist[uc]}')
            #print(ulist[uc] " Taken")
            uc =1
        elif(response['reference']['status_code'] == 'OK'):
            print(f'{Fore.LIGHTBLACK_EX}[{Fore.LIGHTGREEN_EX}Available{Fore.LIGHTBLACK_EX}]{Fore.LIGHTCYAN_EX} {ulist[uc]}')
                #print(ulist[uc] " Available")
            f = open("found.txt","a")
            f.write(ulist[uc] "n")
            f.close()
            found =1
            uc =1
        elif(response['reference']['status_code'] == 'INVALID_BEGIN'):
            print(f'{Fore.LIGHTBLACK_EX}[{Fore.LIGHTRED_EX}Invalid Username{Fore.LIGHTBLACK_EX}]{Fore.LIGHTCYAN_EX} {ulist[uc]}')
            uc =1
        elif(response['reference']['status_code'] == 'DELETED'):
            print(f'{Fore.LIGHTBLACK_EX}[{Fore.LIGHTRED_EX}Deleted{Fore.LIGHTBLACK_EX}]{Fore.LIGHTCYAN_EX} {ulist[uc]}')
            uc =1
        else:
            print(response)
    except:
        #print(prlist[pc]  " Going to next proxy")
        pc =1
        pass
#break

x = input("Finished!.. press enter to exit")
 

Комментарии:

1. Идея состоит в том, чтобы создать функцию, которая проверяет прокси-сервер и принимает в качестве параметра список прокси-серверов. Затем вы можете разделить свой прокси-список на подсписки длиной k и создать потоки, которые выполняют функцию для каждого созданного подсписка. Посмотрите на: docs.python.org/3/library/threading.html

Ответ №1:

Вы могли бы использовать https://github.com/encode/requests-async чтобы выполнять ваши запросы асинхронно