我尝试了python请求库文档中提供的示例。
使用async.map(rs),我获得了响应代码,但我想获得所请求的每个页面的内容。例如,这是行不通的:
out = async.map(rs)
print out[0].content
我尝试了python请求库文档中提供的示例。
使用async.map(rs),我获得了响应代码,但我想获得所请求的每个页面的内容。例如,这是行不通的:
out = async.map(rs)
print out[0].content
当前回答
我对发布的大多数答案都有很多问题——他们要么使用了已弃用的库,这些库已经移植了有限的功能,要么提供了一个在执行请求时具有太多魔力的解决方案,使得错误处理变得困难。如果它们不属于上述类别之一,则它们是第三方库或已弃用。
有些解决方案完全适用于http请求,但解决方案不适用于任何其他类型的请求,这是可笑的。这里不需要高度定制的解决方案。
简单地使用python内置库asyncio就足以执行任何类型的异步请求,并为复杂的和特定于用例的错误处理提供足够的流动性。
import asyncio
loop = asyncio.get_event_loop()
def do_thing(params):
async def get_rpc_info_and_do_chores(id):
# do things
response = perform_grpc_call(id)
do_chores(response)
async def get_httpapi_info_and_do_chores(id):
# do things
response = requests.get(URL)
do_chores(response)
async_tasks = []
for element in list(params.list_of_things):
async_tasks.append(loop.create_task(get_chan_info_and_do_chores(id)))
async_tasks.append(loop.create_task(get_httpapi_info_and_do_chores(ch_id)))
loop.run_until_complete(asyncio.gather(*async_tasks))
它的工作原理很简单。您正在创建一系列希望异步发生的任务,然后请求一个循环执行这些任务并在完成时退出。不需要维护额外的库,也不缺少所需的功能。
其他回答
声明:下面的代码为每个函数创建了不同的线程。
这对于某些情况可能是有用的,因为它使用起来更简单。但要知道,它不是异步的,但使用多线程会给人一种异步的错觉,尽管decorator建议这样做。
可以使用以下装饰器在函数执行完成后给出回调,回调必须处理函数返回的数据。
请注意,在函数被修饰后,它将返回一个Future对象。
import asyncio
## Decorator implementation of async runner !!
def run_async(callback, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
def inner(func):
def wrapper(*args, **kwargs):
def __exec():
out = func(*args, **kwargs)
callback(out)
return out
return loop.run_in_executor(None, __exec)
return wrapper
return inner
实现示例:
urls = ["https://google.com", "https://facebook.com", "https://apple.com", "https://netflix.com"]
loaded_urls = [] # OPTIONAL, used for showing realtime, which urls are loaded !!
def _callback(resp):
print(resp.url)
print(resp)
loaded_urls.append((resp.url, resp)) # OPTIONAL, used for showing realtime, which urls are loaded !!
# Must provide a callback function, callback func will be executed after the func completes execution
# Callback function will accept the value returned by the function.
@run_async(_callback)
def get(url):
return requests.get(url)
for url in urls:
get(url)
如果你想看到实时加载的url,你可以在最后添加以下代码:
while True:
print(loaded_urls)
if len(loaded_urls) == len(urls):
break
你可以使用httpx。
import httpx
async def get_async(url):
async with httpx.AsyncClient() as client:
return await client.get(url)
urls = ["http://google.com", "http://wikipedia.org"]
# Note that you need an async context to use `await`.
await asyncio.gather(*map(get_async, urls))
如果你想要一个函数式语法,gamla库将其包装到get_async中。
然后你就可以
await gamla.map(gamla.get_async(10))(["http://google.com", "http://wikipedia.org"])
10是超时时间,单位是秒。
(声明:我是作者)
我测试了两个请求——未来请求和请求请求。Grequests速度更快,但会带来猴子补丁和依赖关系的其他问题。请求-期货比请求慢几倍。我决定编写自己的请求,并简单地将请求包装到ThreadPoolExecutor中,它几乎和grequest一样快,但没有外部依赖。
import requests
import concurrent.futures
def get_urls():
return ["url1","url2"]
def load_url(url, timeout):
return requests.get(url, timeout = timeout)
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
future_to_url = {executor.submit(load_url, url, 10): url for url in get_urls()}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
resp_err = resp_err + 1
else:
resp_ok = resp_ok + 1
Async现在是一个独立的模块:grequests。
请看这里:https://github.com/kennethreitz/grequests
还有:通过Python发送多个HTTP请求的理想方法?
安装:
$ pip install grequests
用法:
建立一个堆栈:
import grequests
urls = [
'http://www.heroku.com',
'http://tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://kennethreitz.com'
]
rs = (grequests.get(u) for u in urls)
发送堆栈
grequests.map(rs)
结果如下所示
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
grequest似乎没有设置并发请求的限制,即当多个请求被发送到同一个服务器时。
上面的答案都没有帮助我,因为他们假设你有一个预定义的请求列表,而在我的情况下,我需要能够侦听请求和异步响应(类似于它在nodejs中的工作方式)。
def handle_finished_request(r, **kwargs):
print(r)
# while True:
def main():
while True:
address = listen_to_new_msg() # based on your server
# schedule async requests and run 'handle_finished_request' on response
req = grequests.get(address, timeout=1, hooks=dict(response=handle_finished_request))
job = grequests.send(req) # does not block! for more info see https://stackoverflow.com/a/16016635/10577976
main()
handle_finished_request回调函数将在收到响应时被调用。注意:由于某些原因,超时(或无响应)在这里不会触发错误
这个简单的循环可以触发异步请求,类似于它在nodejs服务器中的工作方式