我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。

我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?


当前回答

如果工作没有完成,我打算杀死进程,使用线程和进程来实现这一点。

from concurrent.futures import ThreadPoolExecutor

from time import sleep
import multiprocessing


# test case 1
def worker_1(a,b,c):
    for _ in range(2):
        print('very time consuming sleep')
        sleep(1)

    return a+b+c

# test case 2
def worker_2(in_name):
    for _ in range(10):
        print('very time consuming sleep')
        sleep(1)

    return 'hello '+in_name

作为上下文管理器的实际类

class FuncTimer():
    def __init__(self,fn,args,runtime):
        self.fn = fn
        self.args = args
        self.queue = multiprocessing.Queue()
        self.runtime = runtime
        self.process = multiprocessing.Process(target=self.thread_caller)

    def thread_caller(self):
        with ThreadPoolExecutor() as executor:
            future = executor.submit(self.fn, *self.args)
            self.queue.put(future.result())

    def  __enter__(self):
        return self

    def start_run(self):
        self.process.start()
        self.process.join(timeout=self.runtime)
        if self.process.exitcode is None:
            self.process.kill()
        if self.process.exitcode is None:
            out_res = None
            print('killed premature')
        else:
            out_res = self.queue.get()
        return out_res


    def __exit__(self, exc_type, exc_value, exc_traceback):
        self.process.kill()

如何使用

print('testing case 1') 
with FuncTimer(fn=worker_1,args=(1,2,3),runtime = 5) as fp: 
    res = fp.start_run()
    print(res)

print('testing case 2')
with FuncTimer(fn=worker_2,args=('ram',),runtime = 5) as fp: 
    res = fp.start_run()
    print(res)

其他回答

我有一个不同的建议,这是一个纯函数(与线程建议相同的API),似乎工作得很好(基于这个线程的建议)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

在pypi上找到的stopit包似乎可以很好地处理超时。

我喜欢@stopit。Threading_timeoutable装饰器,它向被装饰的函数添加了一个超时参数,该参数执行您所期望的操作,它将停止该函数。

在pypi上查看:https://pypi.python.org/pypi/stopit

Tim Savannah的func_timeout包对我来说工作得很好。

安装:

PIP安装func_timeout

用法:

import time
from func_timeout import func_timeout, FunctionTimedOut

def my_func(n):
    time.sleep(n)

time_to_sleep = 10

# time out after 2 seconds using kwargs
func_timeout(2, my_func, kwargs={'n' : time_to_sleep})

# time out after 2 seconds using args
func_timeout(2, my_func, args=(time_to_sleep,))

伟大的,易于使用和可靠的PyPi项目超时装饰器(https://pypi.org/project/timeout-decorator/)

安装:

pip install timeout-decorator

用法:

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()

突出了

引发TimeoutError使用异常在超时时发出警报-可以很容易地修改 跨平台:Windows和Mac OS X 兼容性:Python 3.6+(我也在Python 2.7上进行了测试,它可以在很小的语法调整下工作)

有关平行地图的完整解释和扩展,请参见https://flipdazed.github.io/blog/quant%20dev/parallel-functions-with-timeouts

最小的例子

>>> @killer_call(timeout=4)
... def bar(x):
...        import time
...        time.sleep(x)
...        return x
>>> bar(10)
Traceback (most recent call last):
  ...
__main__.TimeoutError: function 'bar' timed out after 4s

正如预期的那样

>>> bar(2)
2

完整代码

import multiprocessing as mp
import multiprocessing.queues as mpq
import functools
import dill

from typing import Tuple, Callable, Dict, Optional, Iterable, List, Any

class TimeoutError(Exception):

    def __init__(self, func: Callable, timeout: int):
        self.t = timeout
        self.fname = func.__name__

    def __str__(self):
            return f"function '{self.fname}' timed out after {self.t}s"


def _lemmiwinks(func: Callable, args: Tuple, kwargs: Dict[str, Any], q: mp.Queue):
    """lemmiwinks crawls into the unknown"""
    q.put(dill.loads(func)(*args, **kwargs))


def killer_call(func: Callable = None, timeout: int = 10) -> Callable:
    """
    Single function call with a timeout

    Args:
        func: the function
        timeout: The timeout in seconds
    """

    if not isinstance(timeout, int):
        raise ValueError(f'timeout needs to be an int. Got: {timeout}')

    if func is None:
        return functools.partial(killer_call, timeout=timeout)

    @functools.wraps(killer_call)
    def _inners(*args, **kwargs) -> Any:
        q_worker = mp.Queue()
        proc = mp.Process(target=_lemmiwinks, args=(dill.dumps(func), args, kwargs, q_worker))
        proc.start()
        try:
            return q_worker.get(timeout=timeout)
        except mpq.Empty:
            raise TimeoutError(func, timeout)
        finally:
            try:
                proc.terminate()
            except:
                pass
    return _inners

if __name__ == '__main__':
    @killer_call(timeout=4)
    def bar(x):
        import time
        time.sleep(x)
        return x

    print(bar(2))
    bar(10)

笔记

由于dill的工作方式,您需要在函数内部导入。

这也意味着如果目标函数中有导入,这些函数可能与doctest不兼容。你将会遇到__import__未找到的问题。