我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
当前回答
在@piro答案的基础上,您可以构建一个contextmanager。这允许非常易读的代码,将在成功运行后禁用警报信号(sets signal.alarm(0))
from contextlib import contextmanager
import signal
import time
@contextmanager
def timeout(duration):
def timeout_handler(signum, frame):
raise TimeoutError(f'block timedout after {duration} seconds')
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(duration)
try:
yield
finally:
signal.alarm(0)
def sleeper(duration):
time.sleep(duration)
print('finished')
使用示例:
In [19]: with timeout(2):
...: sleeper(1)
...:
finished
In [20]: with timeout(2):
...: sleeper(3)
...:
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
<ipython-input-20-66c78858116f> in <module>()
1 with timeout(2):
----> 2 sleeper(3)
3
<ipython-input-7-a75b966bf7ac> in sleeper(t)
1 def sleeper(t):
----> 2 time.sleep(t)
3 print('finished')
4
<ipython-input-18-533b9e684466> in timeout_handler(signum, frame)
2 def timeout(duration):
3 def timeout_handler(signum, frame):
----> 4 raise Exception(f'block timedout after {duration} seconds')
5 signal.signal(signal.SIGALRM, timeout_handler)
6 signal.alarm(duration)
Exception: block timedout after 2 seconds
其他回答
在pypi上找到的stopit包似乎可以很好地处理超时。
我喜欢@stopit。Threading_timeoutable装饰器,它向被装饰的函数添加了一个超时参数,该参数执行您所期望的操作,它将停止该函数。
在pypi上查看:https://pypi.python.org/pypi/stopit
你可以使用多处理。过程来做到这一点。
Code
import multiprocessing
import time
# bar
def bar():
for i in range(100):
print "Tick"
time.sleep(1)
if __name__ == '__main__':
# Start bar as a process
p = multiprocessing.Process(target=bar)
p.start()
# Wait for 10 seconds or until process finishes
p.join(10)
# If thread is still active
if p.is_alive():
print "running... let's kill it..."
# Terminate - may not work if process is stuck for good
p.terminate()
# OR Kill - will work for sure, no chance for process to finish nicely however
# p.kill()
p.join()
我在搜索单元测试的超时调用时遇到了这个线程。我没有在答案或第三方包中找到任何简单的东西,所以我写了下面的装饰器,你可以直接放入代码中:
import multiprocessing.pool
import functools
def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(item):
"""Wrap the original function."""
@functools.wraps(item)
def func_wrapper(*args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(item, args, kwargs)
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(max_timeout)
return func_wrapper
return timeout_decorator
然后就像这样简单地超时测试或任何你喜欢的函数:
@timeout(5.0) # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
...
下面是一个简单的例子,运行一个带有timeout的方法,并在成功时检索它的值。
import multiprocessing
import time
ret = {"foo": False}
def worker(queue):
"""worker function"""
ret = queue.get()
time.sleep(1)
ret["foo"] = True
queue.put(ret)
if __name__ == "__main__":
queue = multiprocessing.Queue()
queue.put(ret)
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
p.join(timeout=10)
if p.exitcode is None:
print("The worker timed out.")
else:
print(f"The worker completed and returned: {queue.get()}")
如果工作没有完成,我打算杀死进程,使用线程和进程来实现这一点。
from concurrent.futures import ThreadPoolExecutor
from time import sleep
import multiprocessing
# test case 1
def worker_1(a,b,c):
for _ in range(2):
print('very time consuming sleep')
sleep(1)
return a+b+c
# test case 2
def worker_2(in_name):
for _ in range(10):
print('very time consuming sleep')
sleep(1)
return 'hello '+in_name
作为上下文管理器的实际类
class FuncTimer():
def __init__(self,fn,args,runtime):
self.fn = fn
self.args = args
self.queue = multiprocessing.Queue()
self.runtime = runtime
self.process = multiprocessing.Process(target=self.thread_caller)
def thread_caller(self):
with ThreadPoolExecutor() as executor:
future = executor.submit(self.fn, *self.args)
self.queue.put(future.result())
def __enter__(self):
return self
def start_run(self):
self.process.start()
self.process.join(timeout=self.runtime)
if self.process.exitcode is None:
self.process.kill()
if self.process.exitcode is None:
out_res = None
print('killed premature')
else:
out_res = self.queue.get()
return out_res
def __exit__(self, exc_type, exc_value, exc_traceback):
self.process.kill()
如何使用
print('testing case 1')
with FuncTimer(fn=worker_1,args=(1,2,3),runtime = 5) as fp:
res = fp.start_run()
print(res)
print('testing case 2')
with FuncTimer(fn=worker_2,args=('ram',),runtime = 5) as fp:
res = fp.start_run()
print(res)