Использование ProcessPoolExecutor для веб-очистки: как вернуть данные в очередь и результаты?

#python #python-3.x #web-scraping #concurrent.futures

#python #python-3.x #веб-очистка #concurrent.futures

Вопрос:

Я написал программу для обхода одного веб-сайта и очистки определенных данных. Я хотел бы ускорить его выполнение с помощью ProcessingPoolExecutor . Однако у меня возникают проблемы с пониманием того, как я могу преобразовать однопоточный в параллельный.

В частности, при создании задания (через ProcessPoolExecutor.submit() ) могу ли я передать класс / объект и аргументы вместо функции и аргументов?

И, если да, то как вернуть данные из этих заданий в очередь для отслеживания посещенных страниц И структуры для хранения очищенного содержимого?

Я использовал это как отправную точку, а также просматривал документы Queue и concurrent.futures (честно говоря, последнее немного перебирает мои мысли). Я также немного погуглил / Youtubed / SO’ed, но безрезультатно.

 from queue import Queue, Empty
from concurrent.futures import ProcessPoolExecutor


class Scraper:
    """
    Scrapes a single url
    """

    def __init__(self, url):
        self.url = url # url of page to scrape
        self.internal_urls = None
        self.content = None
        self.scrape()

    def scrape(self):
        """
        Method(s) to request a page, scrape links from that page
        to other pages, and finally scrape actual content from the current page
        """
        # assume that code in this method would yield urls linked in current page
        self.internal_urls = set(scraped_urls)

        # and that code in this method would scrape a bit of actual content
        self.content = {'content1': content1, 'content2': content2, 'etc': etc}


class CrawlManager:
    """
    Manages a multiprocess crawl and scrape of a single site
    """

    def __init__(self, seed_url):
        self.seed_url = seed_url
        self.pool = ProcessPoolExecutor(max_workers=10)
        self.processed_urls = set([])
        self.queued_urls = Queue()
        self.queued_urls.put(self.seed_url)
        self.data = {}

    def crawl(self):
        while True:
            try:
                # get a url from the queue
                target_url = self.queued_urls.get(timeout=60)

                # check that the url hasn't already been processed
                if target_url not in self.processed_urls:
                    # add url to the processed list
                    self.processed_urls.add(target_url)
                    print(f'Processing url {target_url}')

                    # passing an object to the
                    # ProcessPoolExecutor... can this be done?
                    job = self.pool.submit(Scraper, target_url)

                    """
                    How do I 1) return the data from each 
                    Scraper instance into self.data?
                    and 2) put scraped links to self.queued_urls?
                    """

            except Empty:
                print("All done.")
            except Exception as e:
                print(e)


if __name__ == '__main__':
    crawler = CrawlManager('www.mywebsite.com')
    crawler.crawl()

  

Комментарии:

1. HTTP-запрос / ответ — это цикл, в котором доминирует сетевой ввод-вывод. Возможно, вы захотите рассмотреть возможность использования многопоточного или асинхронного ввода-вывода, а не многопроцессорной обработки.

2. @BradSolomon, спасибо! The article Очевидно, я пропустил контекстные требования.

Ответ №1:

Для всех, кто сталкивается с этой страницей, я смог понять это сам.

По совету @brad-solomon я переключился с ProcessPoolExecutor на ThreadPoolExecutor управление параллельными аспектами этого скрипта (подробнее см. Его комментарий).

W.r.t. первоначальный вопрос, ключ заключался в том, чтобы использовать add_done_callback метод ThreadPoolExecutor в сочетании с модификацией Scraper.scrape и новым методом CrawlManager.proc_scraper_results , как показано ниже:

 from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor


class Scraper:
    """
    Scrapes a single url
    """

    def __init__(self, url):
        self.url = url # url of page to scrape
        self.internal_urls = None
        self.content = None
        self.scrape()

    def scrape(self):
        """
        Method(s) to request a page, scrape links from that page
        to other pages, and finally scrape actual content from the current page
        """
        # assume that code in this method would yield urls linked in current page
        self.internal_urls = set(scraped_urls)

        # and that code in this method would scrape a bit of actual content
        self.content = {'content1': content1, 'content2': content2, 'etc': etc}

        # these three items will be passed to the callback
        # function with in a future object
        return self.internal_urls, self.url, self.content


class CrawlManager:
    """
    Manages a multiprocess crawl and scrape of a single website
    """

    def __init__(self, seed_url):
        self.seed_url = seed_url
        self.pool = ThreadPoolExecutor(max_workers=10)
        self.processed_urls = set([])
        self.queued_urls = Queue()
        self.queued_urls.put(self.seed_url)
        self.data = {}


    def proc_scraper_results(self, future):
        # get the items of interest from the future object
        internal_urls, url, content = future._result[0], future._result[1], future._result[2]

        # assign scraped data/content
        self.data[url] = content

        # also add scraped links to queue if they
        # aren't already queued or already processed
        for link_url in internal_urls:
            if link_url not in self.to_crawl.queue and link_url not in self.processed_urls:
                self.to_crawl.put(link_url)


    def crawl(self):
        while True:
            try:
                # get a url from the queue
                target_url = self.queued_urls.get(timeout=60)

                # check that the url hasn't already been processed
                if target_url not in self.processed_urls:
                    # add url to the processed list
                    self.processed_urls.add(target_url)
                    print(f'Processing url {target_url}')

                    # add a job to the ThreadPoolExecutor (note, unlike original question, we pass a method, not an object)
                    job = self.pool.submit(Scraper(target_url).scrape)

                    # to add_done_callback we send another function, this one from CrawlManager
                    # when this function is itself called, it will be pass a `future` object
                    job.add_done_callback(self.proc_scraper_results)

            except Empty:
                print("All done.")
            except Exception as e:
                print(e)


if __name__ == '__main__':
    crawler = CrawlManager('www.mywebsite.com')
    crawler.crawl()
  

Результатом этого является очень значительное сокращение продолжительности этой программы.