我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
当前回答
我怎么调用函数或者我怎么包装它,如果它超过5秒脚本取消它?
我发布了一个要点,用装饰器和threading.Timer解决了这个问题。下面是它的分类。
导入和设置兼容性
它是用Python 2和3测试的。它也应该在Unix/Linux和Windows下工作。
首先是进口。这些尝试保持代码的一致性,而不管Python版本:
from __future__ import print_function
import sys
import threading
from time import sleep
try:
import thread
except ImportError:
import _thread as thread
使用版本独立代码:
try:
range, _print = xrange, print
def print(*args, **kwargs):
flush = kwargs.pop('flush', False)
_print(*args, **kwargs)
if flush:
kwargs.get('file', sys.stdout).flush()
except NameError:
pass
现在我们已经从标准库导入了我们的功能。
exit_after装饰
接下来,我们需要一个函数来终止子线程的main():
def quit_function(fn_name):
# print to stderr, unbuffered in Python 2.
print('{0} took too long'.format(fn_name), file=sys.stderr)
sys.stderr.flush() # Python 3 stderr is likely buffered.
thread.interrupt_main() # raises KeyboardInterrupt
这是decorator本身:
def exit_after(s):
'''
use as decorator to exit process if
function takes longer than s seconds
'''
def outer(fn):
def inner(*args, **kwargs):
timer = threading.Timer(s, quit_function, args=[fn.__name__])
timer.start()
try:
result = fn(*args, **kwargs)
finally:
timer.cancel()
return result
return inner
return outer
使用
下面这个用法直接回答了你关于5秒后退出的问题!:
@exit_after(5)
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
sleep(1)
print('countdown finished')
演示:
>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 6, in countdown
KeyboardInterrupt
第二个函数调用将不会结束,相反,进程应该退出并返回一个跟踪!
KeyboardInterrupt并不总是停止一个睡眠线程
注意,在Windows上的Python 2中,睡眠并不总是被键盘中断中断,例如:
@exit_after(1)
def sleep10():
sleep(10)
print('slept 10 seconds')
>>> sleep10()
sleep10 took too long # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 3, in sleep10
KeyboardInterrupt
它也不可能中断扩展中运行的代码,除非它显式地检查PyErr_CheckSignals(),参见忽略Cython, Python和KeyboardInterrupt
在任何情况下,我都会避免让线程休眠超过一秒钟——这在处理器时间上是一eon。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
要捕获它并做其他事情,你可以捕获KeyboardInterrupt。
>>> try:
... countdown(10)
... except KeyboardInterrupt:
... print('do something else')
...
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
其他回答
有很多建议,但没有一个是使用并发的。期货,我认为这是最清晰的处理方式。
from concurrent.futures import ProcessPoolExecutor
# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
with ProcessPoolExecutor() as p:
f = p.submit(fnc, *args, **kwargs)
return f.result(timeout=5)
超级简单的阅读和维护。
我们创建一个池,提交一个进程,然后等待5秒,然后引发一个TimeoutError,你可以根据需要捕获和处理它。
本机为python 3.2+,并反向移植到2.7 (pip install futures)。
线程和进程之间的切换非常简单,只需将ProcessPoolExecutor替换为ThreadPoolExecutor。
如果您想在超时时终止进程,我建议您查看Pebble。
突出了
引发TimeoutError使用异常在超时时发出警报-可以很容易地修改 跨平台:Windows和Mac OS X 兼容性:Python 3.6+(我也在Python 2.7上进行了测试,它可以在很小的语法调整下工作)
有关平行地图的完整解释和扩展,请参见https://flipdazed.github.io/blog/quant%20dev/parallel-functions-with-timeouts
最小的例子
>>> @killer_call(timeout=4)
... def bar(x):
... import time
... time.sleep(x)
... return x
>>> bar(10)
Traceback (most recent call last):
...
__main__.TimeoutError: function 'bar' timed out after 4s
正如预期的那样
>>> bar(2)
2
完整代码
import multiprocessing as mp
import multiprocessing.queues as mpq
import functools
import dill
from typing import Tuple, Callable, Dict, Optional, Iterable, List, Any
class TimeoutError(Exception):
def __init__(self, func: Callable, timeout: int):
self.t = timeout
self.fname = func.__name__
def __str__(self):
return f"function '{self.fname}' timed out after {self.t}s"
def _lemmiwinks(func: Callable, args: Tuple, kwargs: Dict[str, Any], q: mp.Queue):
"""lemmiwinks crawls into the unknown"""
q.put(dill.loads(func)(*args, **kwargs))
def killer_call(func: Callable = None, timeout: int = 10) -> Callable:
"""
Single function call with a timeout
Args:
func: the function
timeout: The timeout in seconds
"""
if not isinstance(timeout, int):
raise ValueError(f'timeout needs to be an int. Got: {timeout}')
if func is None:
return functools.partial(killer_call, timeout=timeout)
@functools.wraps(killer_call)
def _inners(*args, **kwargs) -> Any:
q_worker = mp.Queue()
proc = mp.Process(target=_lemmiwinks, args=(dill.dumps(func), args, kwargs, q_worker))
proc.start()
try:
return q_worker.get(timeout=timeout)
except mpq.Empty:
raise TimeoutError(func, timeout)
finally:
try:
proc.terminate()
except:
pass
return _inners
if __name__ == '__main__':
@killer_call(timeout=4)
def bar(x):
import time
time.sleep(x)
return x
print(bar(2))
bar(10)
笔记
由于dill的工作方式,您需要在函数内部导入。
这也意味着如果目标函数中有导入,这些函数可能与doctest不兼容。你将会遇到__import__未找到的问题。
我怎么调用函数或者我怎么包装它,如果它超过5秒脚本取消它?
我发布了一个要点,用装饰器和threading.Timer解决了这个问题。下面是它的分类。
导入和设置兼容性
它是用Python 2和3测试的。它也应该在Unix/Linux和Windows下工作。
首先是进口。这些尝试保持代码的一致性,而不管Python版本:
from __future__ import print_function
import sys
import threading
from time import sleep
try:
import thread
except ImportError:
import _thread as thread
使用版本独立代码:
try:
range, _print = xrange, print
def print(*args, **kwargs):
flush = kwargs.pop('flush', False)
_print(*args, **kwargs)
if flush:
kwargs.get('file', sys.stdout).flush()
except NameError:
pass
现在我们已经从标准库导入了我们的功能。
exit_after装饰
接下来,我们需要一个函数来终止子线程的main():
def quit_function(fn_name):
# print to stderr, unbuffered in Python 2.
print('{0} took too long'.format(fn_name), file=sys.stderr)
sys.stderr.flush() # Python 3 stderr is likely buffered.
thread.interrupt_main() # raises KeyboardInterrupt
这是decorator本身:
def exit_after(s):
'''
use as decorator to exit process if
function takes longer than s seconds
'''
def outer(fn):
def inner(*args, **kwargs):
timer = threading.Timer(s, quit_function, args=[fn.__name__])
timer.start()
try:
result = fn(*args, **kwargs)
finally:
timer.cancel()
return result
return inner
return outer
使用
下面这个用法直接回答了你关于5秒后退出的问题!:
@exit_after(5)
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
sleep(1)
print('countdown finished')
演示:
>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 6, in countdown
KeyboardInterrupt
第二个函数调用将不会结束,相反,进程应该退出并返回一个跟踪!
KeyboardInterrupt并不总是停止一个睡眠线程
注意,在Windows上的Python 2中,睡眠并不总是被键盘中断中断,例如:
@exit_after(1)
def sleep10():
sleep(10)
print('slept 10 seconds')
>>> sleep10()
sleep10 took too long # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 3, in sleep10
KeyboardInterrupt
它也不可能中断扩展中运行的代码,除非它显式地检查PyErr_CheckSignals(),参见忽略Cython, Python和KeyboardInterrupt
在任何情况下,我都会避免让线程休眠超过一秒钟——这在处理器时间上是一eon。
我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?
要捕获它并做其他事情,你可以捕获KeyboardInterrupt。
>>> try:
... countdown(10)
... except KeyboardInterrupt:
... print('do something else')
...
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
在pypi上找到的stopit包似乎可以很好地处理超时。
我喜欢@stopit。Threading_timeoutable装饰器,它向被装饰的函数添加了一个超时参数,该参数执行您所期望的操作,它将停止该函数。
在pypi上查看:https://pypi.python.org/pypi/stopit
下面是一个POSIX版本,它结合了前面的许多答案来提供以下特性:
子进程阻塞执行。 timeout函数在类成员函数上的使用。 严格要求终止时间。
下面是代码和一些测试用例:
import threading
import signal
import os
import time
class TerminateExecution(Exception):
"""
Exception to indicate that execution has exceeded the preset running time.
"""
def quit_function(pid):
# Killing all subprocesses
os.setpgrp()
os.killpg(0, signal.SIGTERM)
# Killing the main thread
os.kill(pid, signal.SIGTERM)
def handle_term(signum, frame):
raise TerminateExecution()
def invoke_with_timeout(timeout, fn, *args, **kwargs):
# Setting a sigterm handler and initiating a timer
old_handler = signal.signal(signal.SIGTERM, handle_term)
timer = threading.Timer(timeout, quit_function, args=[os.getpid()])
terminate = False
# Executing the function
timer.start()
try:
result = fn(*args, **kwargs)
except TerminateExecution:
terminate = True
finally:
# Restoring original handler and cancel timer
signal.signal(signal.SIGTERM, old_handler)
timer.cancel()
if terminate:
raise BaseException("xxx")
return result
### Test cases
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
time.sleep(1)
print('countdown finished')
return 1337
def really_long_function():
time.sleep(10)
def really_long_function2():
os.system("sleep 787")
# Checking that we can run a function as expected.
assert invoke_with_timeout(3, countdown, 1) == 1337
# Testing various scenarios
t1 = time.time()
try:
print(invoke_with_timeout(1, countdown, 3))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
t1 = time.time()
try:
print(invoke_with_timeout(1, really_long_function2))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
t1 = time.time()
try:
print(invoke_with_timeout(1, really_long_function))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
# Checking that classes are referenced and not
# copied (as would be the case with multiprocessing)
class X:
def __init__(self):
self.value = 0
def set(self, v):
self.value = v
x = X()
invoke_with_timeout(2, x.set, 9)
assert x.value == 9