I am opening a file which has 100,000 URL's. I need to send an HTTP request to each URL and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible - I suppose that means 'concurrently'.


当前回答

一个解决方案:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

Testtime:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

Pingtime:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms

其他回答

对于您的情况,线程可能会做的技巧,因为您可能会花费大部分时间等待响应。标准库中有一些有用的模块,如Queue,可能会有所帮助。

我以前做过类似的并行下载文件的事情,对我来说已经足够好了,但它不是你所说的那种规模。

如果您的任务对cpu的限制更大,您可能需要考虑multiprocessing模块,它将允许您利用更多的cpu /内核/线程(更多的进程不会相互阻塞,因为锁定是每个进程)

Twistedless解决方案:

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue

concurrent = 200

def doWork():
    while True:
        url = q.get()
        status, url = getStatus(url)
        doSomethingWithResult(status, url)
        q.task_done()

def getStatus(ourl):
    try:
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status, ourl
    except:
        return "error", ourl

def doSomethingWithResult(status, url):
    print status, url

q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

这个方案比twisted方案稍微快一点,并且使用更少的CPU。

(下一个项目的自我提示)

Python 3解决方案只使用请求。它是最简单且快速的,不需要多处理或复杂的异步库。

最重要的方面是重用连接,特别是对于HTTPS (TLS需要额外的往返才能打开)。注意,连接是特定于子域的。如果在多个域上抓取多个页面,则可以对url列表进行排序,以最大化连接重用(它有效地按域进行排序)。

当给定足够的线程时,它将与任何异步代码一样快。(请求在等待响应时释放python GIL)。

[带有日志记录和错误处理的生产等级代码]

import logging
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

# source: https://stackoverflow.com/a/68583332/5994461

THREAD_POOL = 16

# This is how to create a reusable connection pool with python requests.
session = requests.Session()
session.mount(
    'https://',
    requests.adapters.HTTPAdapter(pool_maxsize=THREAD_POOL,
                                  max_retries=3,
                                  pool_block=True)
)

def get(url):
    response = session.get(url)
    logging.info("request was completed in %s seconds [%s]", response.elapsed.total_seconds(), response.url)
    if response.status_code != 200:
        logging.error("request failed, error code %s [%s]", response.status_code, response.url)
    if 500 <= response.status_code < 600:
        # server is overloaded? give it a break
        time.sleep(5)
    return response

def download(urls):
    with ThreadPoolExecutor(max_workers=THREAD_POOL) as executor:
        # wrap in a list() to wait for all requests to complete
        for response in list(executor.map(get, urls)):
            if response.status_code == 200:
                print(response.content)

def main():
    logging.basicConfig(
        format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
        level=logging.INFO,
        datefmt='%Y-%m-%d %H:%M:%S'
    )

    urls = [
        "https://httpstat.us/200",
        "https://httpstat.us/200",
        "https://httpstat.us/200",
        "https://httpstat.us/404",
        "https://httpstat.us/503"
    ]

    download(urls)

if __name__ == "__main__":
    main()

使用线程池是一个很好的选择,这将使这相当容易。不幸的是,python并没有一个标准库来简化线程池。但这里有一个不错的图书馆,你应该开始: http://www.chrisarndt.de/projects/threadpool/

来自他们网站的代码示例:

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()

希望这能有所帮助。

如果您希望获得尽可能好的性能,您可能会考虑使用异步I/O而不是线程。与成千上万个操作系统线程相关的开销是不小的,Python解释器内的上下文切换甚至增加了更多的开销。线程当然可以完成工作,但我怀疑异步路由将提供更好的整体性能。

具体来说,我建议使用Twisted库中的异步web客户端(http://www.twistedmatrix.com)。它有一个公认的陡峭的学习曲线,但一旦你很好地掌握了Twisted的异步编程风格,它就很容易使用。

Twisted的异步web客户端API的HowTo可以在以下地址找到:

http://twistedmatrix.com/documents/current/web/howto/client.html