I am opening a file which has 100,000 URL's. I need to send an HTTP request to each URL and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible - I suppose that means 'concurrently'.


当前回答

Scrapy框架将快速和专业地解决您的问题。它还将缓存所有请求,以便稍后可以重新运行失败的请求。

将该脚本保存为quotes_spider.py。

# quote_spiders.py
import json
import string
import scrapy
from scrapy.crawler import CrawlerProcess
from scrapy.item import Item, Field

class TextCleaningPipeline(object):
    def _clean_text(self, text):
        text = text.replace('“', '').replace('”', '')
        table = str.maketrans({key: None for key in string.punctuation})
        clean_text = text.translate(table)
        return clean_text.lower()

    def process_item(self, item, spider):
        item['text'] = self._clean_text(item['text'])
        return item

class JsonWriterPipeline(object):
    def open_spider(self, spider):
        self.file = open(spider.settings['JSON_FILE'], 'a')

    def close_spider(self, spider):
        self.file.close()

    def process_item(self, item, spider):
        line = json.dumps(dict(item)) + "\n"
        self.file.write(line)
        return item

class QuoteItem(Item):
    text = Field()
    author = Field()
    tags = Field()
    spider = Field()

class QuoteSpider(scrapy.Spider):
    name = "quotes"

    def start_requests(self):
        urls = [
            'http://quotes.toscrape.com/page/1/',
            'http://quotes.toscrape.com/page/2/',
            # ...
        ]
        for url in urls:
            yield scrapy.Request(url=url, callback=self.parse)

    def parse(self, response):
        for quote in response.css('div.quote'):
            item = QuoteItem()
            item['text'] = quote.css('span.text::text').get()
            item['author'] = quote.css('small.author::text').get()
            item['tags'] = quote.css('div.tags a.tag::text').getall()
            item['spider'] = self.name
            yield item

if __name__ == '__main__':
    settings = dict()
    settings['USER_AGENT'] = 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)'
    settings['HTTPCACHE_ENABLED'] = True
    settings['CONCURRENT_REQUESTS'] = 20
    settings['CONCURRENT_REQUESTS_PER_DOMAIN'] = 20
    settings['JSON_FILE'] = 'items.jl'
    settings['ITEM_PIPELINES'] = dict()
    settings['ITEM_PIPELINES']['__main__.TextCleaningPipeline'] = 800
    settings['ITEM_PIPELINES']['__main__.JsonWriterPipeline'] = 801

    process = CrawlerProcess(settings=settings)
    process.crawl(QuoteSpider)
    process.start()

紧随其后的是

$ pip install Scrapy
$ python quote_spiders.py 

为了微调scraper,相应地调整CONCURRENT_REQUESTS和CONCURRENT_REQUESTS_PER_DOMAIN设置。

其他回答

pip install requests-threads

使用实例使用async/await - send 100个并发请求

from requests_threads import AsyncSession

session = AsyncSession(n=100)

async def _main():
    rs = []
    for _ in range(100):
        rs.append(await session.get('http://httpbin.org/get'))
    print(rs)

if __name__ == '__main__':
    session.run(_main)

此示例仅适用于Python 3。您还可以提供自己的asyncio事件循环!

使用实例Twisted

from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import react
from requests_threads import AsyncSession

session = AsyncSession(n=100)

@inlineCallbacks
def main(reactor):
    responses = []
    for i in range(100):
        responses.append(session.get('http://httpbin.org/get'))

    for response in responses:
        r = yield response
        print(r)

if __name__ == '__main__':
    react(main)

这个例子在Python 2和Python 3上都可以运行。

也许这对我的回购有帮助,一个基本的例子, 用python编写快速异步HTTP请求

使用线程池是一个很好的选择,这将使这相当容易。不幸的是,python并没有一个标准库来简化线程池。但这里有一个不错的图书馆,你应该开始: http://www.chrisarndt.de/projects/threadpool/

来自他们网站的代码示例:

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()

希望这能有所帮助。

如果您希望获得尽可能好的性能,您可能会考虑使用异步I/O而不是线程。与成千上万个操作系统线程相关的开销是不小的,Python解释器内的上下文切换甚至增加了更多的开销。线程当然可以完成工作,但我怀疑异步路由将提供更好的整体性能。

具体来说,我建议使用Twisted库中的异步web客户端(http://www.twistedmatrix.com)。它有一个公认的陡峭的学习曲线,但一旦你很好地掌握了Twisted的异步编程风格,它就很容易使用。

Twisted的异步web客户端API的HowTo可以在以下地址找到:

http://twistedmatrix.com/documents/current/web/howto/client.html

最简单的方法是使用Python的内置线程库。它们不是“真正的”/内核线程。它们有问题(比如序列化),但足够好了。你需要一个队列和线程池。这里有一个选项,但是编写自己的选项很简单。您无法并行处理所有100,000个调用,但可以同时发出100个(或左右)调用。

我发现使用tornado包是最快和最简单的方法来实现这一点:

from tornado import ioloop, httpclient, gen


def main(urls):
    """
    Asynchronously download the HTML contents of a list of URLs.
    :param urls: A list of URLs to download.
    :return: List of response objects, one for each URL.
    """

    @gen.coroutine
    def fetch_and_handle():
        httpclient.AsyncHTTPClient.configure(None, defaults=dict(user_agent='MyUserAgent'))
        http_client = httpclient.AsyncHTTPClient()
        waiter = gen.WaitIterator(*[http_client.fetch(url, raise_error=False, method='HEAD')
                                    for url in urls])
        results = []
        # Wait for the jobs to complete
        while not waiter.done():
            try:
                response = yield waiter.next()
            except httpclient.HTTPError as e:
                print(f'Non-200 HTTP response returned: {e}')
                continue
            except Exception as e:
                print(f'An unexpected error occurred querying: {e}')
                continue
            else:
                print(f'URL \'{response.request.url}\' has status code <{response.code}>')
                results.append(response)
        return results

    loop = ioloop.IOLoop.current()
    web_pages = loop.run_sync(fetch_and_handle)

    return web_pages

my_urls = ['url1.com', 'url2.com', 'url100000.com']
responses = main(my_urls)
print(responses[0])