#python #python-3.x #web-scraping #concurrent.futures
#python #python-3.x #веб-очистка #concurrent.futures
Вопрос:
Я написал программу для обхода одного веб-сайта и очистки определенных данных. Я хотел бы ускорить его выполнение с помощью ProcessingPoolExecutor
. Однако у меня возникают проблемы с пониманием того, как я могу преобразовать однопоточный в параллельный.
В частности, при создании задания (через ProcessPoolExecutor.submit()
) могу ли я передать класс / объект и аргументы вместо функции и аргументов?
И, если да, то как вернуть данные из этих заданий в очередь для отслеживания посещенных страниц И структуры для хранения очищенного содержимого?
Я использовал это как отправную точку, а также просматривал документы Queue и concurrent.futures (честно говоря, последнее немного перебирает мои мысли). Я также немного погуглил / Youtubed / SO’ed, но безрезультатно.
from queue import Queue, Empty
from concurrent.futures import ProcessPoolExecutor
class Scraper:
"""
Scrapes a single url
"""
def __init__(self, url):
self.url = url # url of page to scrape
self.internal_urls = None
self.content = None
self.scrape()
def scrape(self):
"""
Method(s) to request a page, scrape links from that page
to other pages, and finally scrape actual content from the current page
"""
# assume that code in this method would yield urls linked in current page
self.internal_urls = set(scraped_urls)
# and that code in this method would scrape a bit of actual content
self.content = {'content1': content1, 'content2': content2, 'etc': etc}
class CrawlManager:
"""
Manages a multiprocess crawl and scrape of a single site
"""
def __init__(self, seed_url):
self.seed_url = seed_url
self.pool = ProcessPoolExecutor(max_workers=10)
self.processed_urls = set([])
self.queued_urls = Queue()
self.queued_urls.put(self.seed_url)
self.data = {}
def crawl(self):
while True:
try:
# get a url from the queue
target_url = self.queued_urls.get(timeout=60)
# check that the url hasn't already been processed
if target_url not in self.processed_urls:
# add url to the processed list
self.processed_urls.add(target_url)
print(f'Processing url {target_url}')
# passing an object to the
# ProcessPoolExecutor... can this be done?
job = self.pool.submit(Scraper, target_url)
"""
How do I 1) return the data from each
Scraper instance into self.data?
and 2) put scraped links to self.queued_urls?
"""
except Empty:
print("All done.")
except Exception as e:
print(e)
if __name__ == '__main__':
crawler = CrawlManager('www.mywebsite.com')
crawler.crawl()
Комментарии:
1. HTTP-запрос / ответ — это цикл, в котором доминирует сетевой ввод-вывод. Возможно, вы захотите рассмотреть возможность использования многопоточного или асинхронного ввода-вывода, а не многопроцессорной обработки.
2. @BradSolomon, спасибо! The article Очевидно, я пропустил контекстные требования.
Ответ №1:
Для всех, кто сталкивается с этой страницей, я смог понять это сам.
По совету @brad-solomon я переключился с ProcessPoolExecutor
на ThreadPoolExecutor
управление параллельными аспектами этого скрипта (подробнее см. Его комментарий).
W.r.t. первоначальный вопрос, ключ заключался в том, чтобы использовать add_done_callback
метод ThreadPoolExecutor
в сочетании с модификацией Scraper.scrape
и новым методом CrawlManager.proc_scraper_results
, как показано ниже:
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor
class Scraper:
"""
Scrapes a single url
"""
def __init__(self, url):
self.url = url # url of page to scrape
self.internal_urls = None
self.content = None
self.scrape()
def scrape(self):
"""
Method(s) to request a page, scrape links from that page
to other pages, and finally scrape actual content from the current page
"""
# assume that code in this method would yield urls linked in current page
self.internal_urls = set(scraped_urls)
# and that code in this method would scrape a bit of actual content
self.content = {'content1': content1, 'content2': content2, 'etc': etc}
# these three items will be passed to the callback
# function with in a future object
return self.internal_urls, self.url, self.content
class CrawlManager:
"""
Manages a multiprocess crawl and scrape of a single website
"""
def __init__(self, seed_url):
self.seed_url = seed_url
self.pool = ThreadPoolExecutor(max_workers=10)
self.processed_urls = set([])
self.queued_urls = Queue()
self.queued_urls.put(self.seed_url)
self.data = {}
def proc_scraper_results(self, future):
# get the items of interest from the future object
internal_urls, url, content = future._result[0], future._result[1], future._result[2]
# assign scraped data/content
self.data[url] = content
# also add scraped links to queue if they
# aren't already queued or already processed
for link_url in internal_urls:
if link_url not in self.to_crawl.queue and link_url not in self.processed_urls:
self.to_crawl.put(link_url)
def crawl(self):
while True:
try:
# get a url from the queue
target_url = self.queued_urls.get(timeout=60)
# check that the url hasn't already been processed
if target_url not in self.processed_urls:
# add url to the processed list
self.processed_urls.add(target_url)
print(f'Processing url {target_url}')
# add a job to the ThreadPoolExecutor (note, unlike original question, we pass a method, not an object)
job = self.pool.submit(Scraper(target_url).scrape)
# to add_done_callback we send another function, this one from CrawlManager
# when this function is itself called, it will be pass a `future` object
job.add_done_callback(self.proc_scraper_results)
except Empty:
print("All done.")
except Exception as e:
print(e)
if __name__ == '__main__':
crawler = CrawlManager('www.mywebsite.com')
crawler.crawl()
Результатом этого является очень значительное сокращение продолжительности этой программы.