我在Python中调用一个函数,我知道这个函数可能会暂停,并迫使我重新启动脚本。

我怎么调用这个函数或者我把它包装在什么里面,这样如果它花费超过5秒脚本就会取消它并做其他事情?


当前回答

在@piro答案的基础上,您可以构建一个contextmanager。这允许非常易读的代码,将在成功运行后禁用警报信号(sets signal.alarm(0))

from contextlib import contextmanager
import signal
import time

@contextmanager
def timeout(duration):
    def timeout_handler(signum, frame):
        raise TimeoutError(f'block timedout after {duration} seconds')
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(duration)
    try:
        yield
    finally:
        signal.alarm(0)

def sleeper(duration):
    time.sleep(duration)
    print('finished')

使用示例:

In [19]: with timeout(2):
    ...:     sleeper(1)
    ...:     
finished

In [20]: with timeout(2):
    ...:     sleeper(3)
    ...:         
---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-20-66c78858116f> in <module>()
      1 with timeout(2):
----> 2     sleeper(3)
      3 

<ipython-input-7-a75b966bf7ac> in sleeper(t)
      1 def sleeper(t):
----> 2     time.sleep(t)
      3     print('finished')
      4 

<ipython-input-18-533b9e684466> in timeout_handler(signum, frame)
      2 def timeout(duration):
      3     def timeout_handler(signum, frame):
----> 4         raise Exception(f'block timedout after {duration} seconds')
      5     signal.signal(signal.SIGALRM, timeout_handler)
      6     signal.alarm(duration)

Exception: block timedout after 2 seconds

其他回答

在pypi上找到的stopit包似乎可以很好地处理超时。

我喜欢@stopit。Threading_timeoutable装饰器,它向被装饰的函数添加了一个超时参数,该参数执行您所期望的操作,它将停止该函数。

在pypi上查看:https://pypi.python.org/pypi/stopit

你可以使用多处理。过程来做到这一点。

Code

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate - may not work if process is stuck for good
        p.terminate()
        # OR Kill - will work for sure, no chance for process to finish nicely however
        # p.kill()

        p.join()

我在搜索单元测试的超时调用时遇到了这个线程。我没有在答案或第三方包中找到任何简单的东西,所以我写了下面的装饰器,你可以直接放入代码中:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

然后就像这样简单地超时测试或任何你喜欢的函数:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

下面是一个简单的例子,运行一个带有timeout的方法,并在成功时检索它的值。

import multiprocessing
import time

ret = {"foo": False}


def worker(queue):
    """worker function"""

    ret = queue.get()

    time.sleep(1)

    ret["foo"] = True
    queue.put(ret)


if __name__ == "__main__":
    queue = multiprocessing.Queue()
    queue.put(ret)

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join(timeout=10)

    if p.exitcode is None:
        print("The worker timed out.")
    else:
        print(f"The worker completed and returned: {queue.get()}")

如果工作没有完成,我打算杀死进程,使用线程和进程来实现这一点。

from concurrent.futures import ThreadPoolExecutor

from time import sleep
import multiprocessing


# test case 1
def worker_1(a,b,c):
    for _ in range(2):
        print('very time consuming sleep')
        sleep(1)

    return a+b+c

# test case 2
def worker_2(in_name):
    for _ in range(10):
        print('very time consuming sleep')
        sleep(1)

    return 'hello '+in_name

作为上下文管理器的实际类

class FuncTimer():
    def __init__(self,fn,args,runtime):
        self.fn = fn
        self.args = args
        self.queue = multiprocessing.Queue()
        self.runtime = runtime
        self.process = multiprocessing.Process(target=self.thread_caller)

    def thread_caller(self):
        with ThreadPoolExecutor() as executor:
            future = executor.submit(self.fn, *self.args)
            self.queue.put(future.result())

    def  __enter__(self):
        return self

    def start_run(self):
        self.process.start()
        self.process.join(timeout=self.runtime)
        if self.process.exitcode is None:
            self.process.kill()
        if self.process.exitcode is None:
            out_res = None
            print('killed premature')
        else:
            out_res = self.queue.get()
        return out_res


    def __exit__(self, exc_type, exc_value, exc_traceback):
        self.process.kill()

如何使用

print('testing case 1') 
with FuncTimer(fn=worker_1,args=(1,2,3),runtime = 5) as fp: 
    res = fp.start_run()
    print(res)

print('testing case 2')
with FuncTimer(fn=worker_2,args=('ram',),runtime = 5) as fp: 
    res = fp.start_run()
    print(res)