我想在Python中每60秒重复执行一个函数(就像Objective C中的NSTimer或JS中的setTimeout)。这段代码将作为守护进程运行,有效地类似于使用cron每分钟调用python脚本,但不需要用户设置。

在这个关于用Python实现的cron的问题中,解决方案似乎只有效地使()休眠x秒。我不需要这么高级的功能,所以也许这样的东西可以工作

while True:
    # Code executed here
    time.sleep(60)

这段代码是否存在任何可预见的问题?


当前回答

如果您的程序还没有事件循环,请使用sched模块,它实现了一个通用的事件调度器。

import sched, time

def do_something(scheduler): 
    # schedule the next call first
    scheduler.enter(60, 1, do_something, (scheduler,))
    print("Doing stuff...")
    # then do your stuff

my_scheduler = sched.scheduler(time.time, time.sleep)
my_scheduler.enter(60, 1, do_something, (my_scheduler,))
my_scheduler.run()

如果您已经在使用事件循环库,如asyncio、trio、tkinter、PyQt5、gobject、kivy等,则只需使用现有事件循环库的方法来调度任务。

其他回答

下面是MestreLion对代码的更新,它可以避免随着时间的推移而漂移。

这里的RepeatedTimer类按照OP的请求每隔“间隔”秒调用给定函数;调度并不取决于函数执行的时间。我喜欢这个解决方案,因为它没有外部库依赖关系;这是纯python。

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False

示例用法(摘自MestreLion的回答):

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

一个可能的答案是:

import time
t=time.time()

while True:
    if time.time()-t>10:
        #run your task here
        t=time.time()

我最终使用了时间表模块。API很好。

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

我以前也遇到过类似的问题。也许http://cronus.readthedocs.org会有帮助?

对于v0.2,下面的代码片段可以工作

import cronus.beat as beat

beat.set_rate(2) # run twice per second
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec

这里是另一个不使用任何额外库的解决方案。

def delay_until(condition_fn, interval_in_sec, timeout_in_sec):
    """Delay using a boolean callable function.

    `condition_fn` is invoked every `interval_in_sec` until `timeout_in_sec`.
    It can break early if condition is met.

    Args:
        condition_fn     - a callable boolean function
        interval_in_sec  - wait time between calling `condition_fn`
        timeout_in_sec   - maximum time to run

    Returns: None
    """
    start = last_call = time.time()
    while time.time() - start < timeout_in_sec:
        if (time.time() - last_call) > interval_in_sec:
            if condition_fn() is True:
                break
            last_call = time.time()