Многопроцессорная обработка Python выполняет несколько вызовов api

#python #database #api #multiprocessing #python-multiprocessing

#python #База данных #API #многопроцессорная обработка #python-многопроцессорная обработка

Вопрос:

я пытаюсь ускорить свой код с помощью многопроцессорной обработки и одновременного выполнения нескольких вызовов api. в настоящее время я выполняю вызов api, получаю оттуда необходимые данные, а затем вставляю их в базу данных. это работает, но очень медленно. мне нужно иметь около 700-800 миллионов пользователей в базе данных, и при этой текущей скорости это займет около 200-250 дней. как я могу выполнить несколько вызовов api?

 import traceback
import requests
import json
import sys
from time import time, sleep
from multiprocessing import Process, Queue
from io import BytesIO
import imagehash
from PIL import Image
import sqlite3
from multiprocessing import Process
from multiprocessing import Pool as ThreadPool 

min = 7960265729
max = 9080098567

database_location = 'D:/Script/steam_database.db'
key = []
pool_size = 32
image_hashes = []

def queue_flusher(queue, flush_limit=80, temp = 0):
    connection = sqlite3.connect(database_location)
    cursor = connection.cursor()
    cursor.execute("CREATE TABLE IF NOT EXISTS user (id INTEGER PRIMARY KEY AUTOINCREMENT, hash TEXT, profile TEXT)")
    connection.commit()
    while True:
        if(queue.qsize() < flush_limit):
            sleep(.1)
        else:
            temp  = 80
            print(f"Flushing {flush_limit} out of queue {temp}")
            queue_input = [queue.get() for _ in range(0, flush_limit)]
            cursor = connection.cursor()
            for row in queue_input:
                if row['image'] not in image_hashes:
                    print(f"Inserting Row: {repr(row)}")
                    cursor.execute("INSERT INTO user (hash, profile) VALUES (?, ?);", (row['image'], row['profileUrl']))
                    image_hashes.append(row['image'])
            connection.commit()
    connection.close()

def databaseFiller(queue, min = 0, max = 0):
    while True:
        try:
            for i in range(min, max):
                r = requests.get(f'http://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?key={key[3]}amp;steamids=7656119{i}').json()
                pool = ThreadPool(8)
                all = pool.map(databaseFiller, i)
                response = r

                player = None
                steamid = None

                response = response.get('response', None)
                if response is None or not response.get('players', None):
                    continue
                player = response['players'][0]
                pfp = player.get('avatar', None)
                profileUrl = player.get('profileurl', None)
                if pfp != "https://steamcdn-a.akamaihd.net/steamcommunity/public/images/avatars/fe/fef49e7fa7e1997310d705b2a6158ff8dc1cdfeb.jpg":
                    img = requests.get(pfp)
                    img = Image.open(BytesIO(img.content))
                    image = str(imagehash.average_hash(img))
                    queue.put({'image': image, 'profileUrl': profileUrl})

        except Exception as e:
            # print(f'Received Response: {response}')
            print("Printing only the traceback above the current stack frame")
            print("".join(traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2])))
            print("Printing the full traceback as if we had not caught it here...")
            print(format_exception(e))


def format_exception(e):

    exception_list = traceback.format_stack()
    exception_list = exception_list[:-2]
    exception_list.extend(traceback.format_tb(sys.exc_info()[2]))
    exception_list.extend(traceback.format_exception_only(
        sys.exc_info()[0], sys.exc_info()[1]))

    exception_str = "Traceback (most recent call last):n"
    exception_str  = "".join(exception_list)
    exception_str = exception_str[:-1]

    return exception_str

if __name__ == '__main__':
    database_connection = sqlite3.connect("steam_database.db")
    data_queue = Queue()
    data_flush_process = Process(target=queue_flusher, args=([data_queue]))
    data_flush_process.start()
    total_nums = max - min
    nums_per_process = total_nums // pool_size
    for i in range(pool_size):
        new_min = min   (nums_per_process * i)
        new_max = max if i == (pool_size-1) else new_min   nums_per_process
        Process(target=databaseFiller, args=([data_queue, new_min, new_max])).start()
 

Спасибо.

Ответ №1:

Это не решит вашу проблему на 100%, но я вижу, что вы вставляете текст в файл sqlite, вам следует загрузить все это, например, в csv и использовать execute cursor.executemany вместо cursor.execute. Такая вставка выполняется быстрее. Сколько времени требуется для выполнения 1 загрузки?

Комментарии:

1. привет, спасибо за обращение. я попробую это, и я не знаю, сколько времени потребуется для загрузки, но этот текущий скрипт захватывает около 1500-2000 пользователей в минуту. есть способы сделать все это на php и получить всю базу данных steam за день, но я не знаю, как