I am opening a file which has 100,000 URL's. I need to send an HTTP request to each URL and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible - I suppose that means 'concurrently'.
当前回答
下面是一个“异步”解决方案,它不使用asyncio,而是使用asyncio使用的低级机制(在Linux上):select()。(或者asyncio可能使用poll或epoll,但这是类似的原理。)
它是对PyCurl示例的稍微修改版本。
(为了简单起见,它多次请求相同的URL,但您可以轻松地修改它以检索一系列不同的URL。)
(另一个轻微的修改可以使这个检索相同的URL作为一个无限循环。提示:将while url和句柄更改为while句柄,将while nprocessed<nurls更改为while 1。)
import pycurl,io,gzip,signal, time, random
signal.signal(signal.SIGPIPE, signal.SIG_IGN) # NOTE! We should ignore SIGPIPE when using pycurl.NOSIGNAL - see the libcurl tutorial for more info
NCONNS = 2 # Number of concurrent GET requests
url = 'example.com'
urls = [url for i in range(0x7*NCONNS)] # Copy the same URL over and over
# Check args
nurls = len(urls)
NCONNS = min(NCONNS, nurls)
print("\x1b[32m%s \x1b[0m(compiled against 0x%x)" % (pycurl.version, pycurl.COMPILE_LIBCURL_VERSION_NUM))
print(f'\x1b[37m{nurls} \x1b[91m@ \x1b[92m{NCONNS}\x1b[0m')
# Pre-allocate a list of curl objects
m = pycurl.CurlMulti()
m.handles = []
for i in range(NCONNS):
c = pycurl.Curl()
c.setopt(pycurl.FOLLOWLOCATION, 1)
c.setopt(pycurl.MAXREDIRS, 5)
c.setopt(pycurl.CONNECTTIMEOUT, 30)
c.setopt(pycurl.TIMEOUT, 300)
c.setopt(pycurl.NOSIGNAL, 1)
m.handles.append(c)
handles = m.handles # MUST make a copy?!
nprocessed = 0
while nprocessed<nurls:
while urls and handles: # If there is an url to process and a free curl object, add to multi stack
url = urls.pop(0)
c = handles.pop()
c.buf = io.BytesIO()
c.url = url # store some info
c.t0 = time.perf_counter()
c.setopt(pycurl.URL, c.url)
c.setopt(pycurl.WRITEDATA, c.buf)
c.setopt(pycurl.HTTPHEADER, [f'user-agent: {random.randint(0,(1<<256)-1):x}', 'accept-encoding: gzip, deflate', 'connection: keep-alive', 'keep-alive: timeout=10, max=1000'])
m.add_handle(c)
while 1: # Run the internal curl state machine for the multi stack
ret, num_handles = m.perform()
if ret!=pycurl.E_CALL_MULTI_PERFORM: break
while 1: # Check for curl objects which have terminated, and add them to the handles
nq, ok_list, ko_list = m.info_read()
for c in ok_list:
m.remove_handle(c)
t1 = time.perf_counter()
reply = gzip.decompress(c.buf.getvalue())
print(f'\x1b[33mGET \x1b[32m{t1-c.t0:.3f} \x1b[37m{len(reply):9,} \x1b[0m{reply[:32]}...') # \x1b[35m{psutil.Process(os.getpid()).memory_info().rss:,} \x1b[0mbytes')
handles.append(c)
for c, errno, errmsg in ko_list:
m.remove_handle(c)
print('\x1b[31mFAIL {c.url} {errno} {errmsg}')
handles.append(c)
nprocessed = nprocessed + len(ok_list) + len(ko_list)
if nq==0: break
m.select(1.0) # Currently no more I/O is pending, could do something in the meantime (display a progress bar, etc.). We just call select() to sleep until some more data is available.
for c in m.handles:
c.close()
m.close()
其他回答
这个扭曲的异步web客户端运行得相当快。
#!/usr/bin/python2.7
from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}
def getLock(url, simultaneous = 1):
return locks[urlparse(url).netloc, randrange(simultaneous)]
@inlineCallbacks
def getMapping(url):
# Limit ourselves to 4 simultaneous connections per host
# Tweak this number, but it should be no larger than pool.maxPersistentPerHost
lock = getLock(url,4)
yield lock.acquire()
try:
resp = yield agent.request('HEAD', url)
codes[url] = resp.code
except Exception as e:
codes[url] = str(e)
finally:
lock.release()
dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())
reactor.run()
pprint(codes)
pip install requests-threads
使用实例使用async/await - send 100个并发请求
from requests_threads import AsyncSession
session = AsyncSession(n=100)
async def _main():
rs = []
for _ in range(100):
rs.append(await session.get('http://httpbin.org/get'))
print(rs)
if __name__ == '__main__':
session.run(_main)
此示例仅适用于Python 3。您还可以提供自己的asyncio事件循环!
使用实例Twisted
from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import react
from requests_threads import AsyncSession
session = AsyncSession(n=100)
@inlineCallbacks
def main(reactor):
responses = []
for i in range(100):
responses.append(session.get('http://httpbin.org/get'))
for response in responses:
r = yield response
print(r)
if __name__ == '__main__':
react(main)
这个例子在Python 2和Python 3上都可以运行。
也许这对我的回购有帮助,一个基本的例子, 用python编写快速异步HTTP请求
Twistedless解决方案:
from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue
concurrent = 200
def doWork():
while True:
url = q.get()
status, url = getStatus(url)
doSomethingWithResult(status, url)
q.task_done()
def getStatus(ourl):
try:
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status, ourl
except:
return "error", ourl
def doSomethingWithResult(status, url):
print status, url
q = Queue(concurrent * 2)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in open('urllist.txt'):
q.put(url.strip())
q.join()
except KeyboardInterrupt:
sys.exit(1)
这个方案比twisted方案稍微快一点,并且使用更少的CPU。
线程绝对不是这里的答案。它们将提供进程和内核瓶颈,以及吞吐量限制,如果总体目标是“最快的方式”,这些限制是不可接受的。
稍微扭曲一点,它的异步HTTP客户端会给你更好的结果。
一个使用tornado的异步网络库解决方案
from tornado import ioloop, httpclient
i = 0
def handle_request(response):
print(response.code)
global i
i -= 1
if i == 0:
ioloop.IOLoop.instance().stop()
http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
i += 1
http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()
这段代码使用非阻塞网络I/O,没有任何限制。它可以扩展到数万个打开的连接。它将在单个线程中运行,但比任何线程解决方案都要快。签出非阻塞I/O
推荐文章
- 证书验证失败:无法获得本地颁发者证书
- 当使用pip3安装包时,“Python中的ssl模块不可用”
- 无法切换Python与pyenv
- Python if not == vs if !=
- 如何从scikit-learn决策树中提取决策规则?
- 为什么在Mac OS X v10.9 (Mavericks)的终端中apt-get功能不起作用?
- 将旋转的xtick标签与各自的xtick对齐
- 为什么元组可以包含可变项?
- 如何合并字典的字典?
- 如何创建类属性?
- 不区分大小写的“in”
- 在Python中获取迭代器中的元素个数
- 解析日期字符串并更改格式
- 使用try和。Python中的if
- 如何在Python中获得所有直接子目录