我想在Python中每60秒重复执行一个函数(就像Objective C中的NSTimer或JS中的setTimeout)。这段代码将作为守护进程运行,有效地类似于使用cron每分钟调用python脚本,但不需要用户设置。

在这个关于用Python实现的cron的问题中,解决方案似乎只有效地使()休眠x秒。我不需要这么高级的功能,所以也许这样的东西可以工作

while True:
    # Code executed here
    time.sleep(60)

这段代码是否存在任何可预见的问题?


当前回答

例如:显示当前本地时间

import datetime
import glib
import logger

def get_local_time():
    current_time = datetime.datetime.now().strftime("%H:%M")
    logger.info("get_local_time(): %s",current_time)
    return str(current_time)

def display_local_time():
    logger.info("Current time is: %s", get_local_time())
    return True

# call every minute
glib.timeout_add(60*1000, display_local_time)

其他回答

一个可能的答案是:

import time
t=time.time()

while True:
    if time.time()-t>10:
        #run your task here
        t=time.time()

下面是MestreLion对代码的更新,它可以避免随着时间的推移而漂移。

这里的RepeatedTimer类按照OP的请求每隔“间隔”秒调用给定函数;调度并不取决于函数执行的时间。我喜欢这个解决方案,因为它没有外部库依赖关系;这是纯python。

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False

示例用法(摘自MestreLion的回答):

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

我以前也遇到过类似的问题。也许http://cronus.readthedocs.org会有帮助?

对于v0.2,下面的代码片段可以工作

import cronus.beat as beat

beat.set_rate(2) # run twice per second
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec

另一种灵活性解决方案是Apscheduler。

pip install apscheduler
from apscheduler.schedulers.background import BlockingScheduler
def print_t():
  pass

sched = BlockingScheduler()
sched.add_job(print_t, 'interval', seconds =60) #will do the print_t work for every 60 seconds

sched.start()

另外,apscheduler提供了如下所示的许多调度程序。

BlockingScheduler: use when the scheduler is the only thing running in your process BackgroundScheduler: use when you’re not using any of the frameworks below, and want the scheduler to run in the background inside your application AsyncIOScheduler: use if your application uses the asyncio module GeventScheduler: use if your application uses gevent TornadoScheduler: use if you’re building a Tornado application TwistedScheduler: use if you’re building a Twisted application QtScheduler: use if you’re building a Qt application

像这样将你的时间循环锁定到系统时钟上:

import time
starttime = time.time()
while True:
    print("tick")
    time.sleep(60.0 - ((time.time() - starttime) % 60.0))