我想在Python中每60秒重复执行一个函数(就像Objective C中的NSTimer或JS中的setTimeout)。这段代码将作为守护进程运行,有效地类似于使用cron每分钟调用python脚本,但不需要用户设置。

在这个关于用Python实现的cron的问题中,解决方案似乎只有效地使()休眠x秒。我不需要这么高级的功能,所以也许这样的东西可以工作

while True:
    # Code executed here
    time.sleep(60)

这段代码是否存在任何可预见的问题?


当前回答

下面是MestreLion对代码的更新,它可以避免随着时间的推移而漂移。

这里的RepeatedTimer类按照OP的请求每隔“间隔”秒调用给定函数;调度并不取决于函数执行的时间。我喜欢这个解决方案,因为它没有外部库依赖关系;这是纯python。

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False

示例用法(摘自MestreLion的回答):

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

其他回答

如果您的程序还没有事件循环,请使用sched模块,它实现了一个通用的事件调度器。

import sched, time

def do_something(scheduler): 
    # schedule the next call first
    scheduler.enter(60, 1, do_something, (scheduler,))
    print("Doing stuff...")
    # then do your stuff

my_scheduler = sched.scheduler(time.time, time.sleep)
my_scheduler.enter(60, 1, do_something, (my_scheduler,))
my_scheduler.run()

如果您已经在使用事件循环库,如asyncio、trio、tkinter、PyQt5、gobject、kivy等,则只需使用现有事件循环库的方法来调度任务。

下面是MestreLion对代码的更新,它可以避免随着时间的推移而漂移。

这里的RepeatedTimer类按照OP的请求每隔“间隔”秒调用给定函数;调度并不取决于函数执行的时间。我喜欢这个解决方案,因为它没有外部库依赖关系;这是纯python。

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False

示例用法(摘自MestreLion的回答):

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

如果漂移不是一个问题

import threading, time

def print_every_n_seconds(n=2):
    while True:
        print(time.ctime())
        time.sleep(n)
    
thread = threading.Thread(target=print_every_n_seconds, daemon=True)
thread.start()

异步输出。

#Tue Oct 16 17:29:40 2018
#Tue Oct 16 17:29:42 2018
#Tue Oct 16 17:29:44 2018

如果正在运行的任务需要相当多的时间,那么间隔就变成2秒+任务时间,所以如果您需要精确的调度,那么这并不适合您。

注意daemon=True标志意味着这个线程不会阻止应用程序关闭。例如,在运行测试等待此头停止后,pytest将无限期挂起的问题。

我使用Tkinter after()方法,它不会“窃取游戏”(就像之前提出的sched模块),即它允许其他东西并行运行:

import Tkinter

def do_something1():
  global n1
  n1 += 1
  if n1 == 6: # (Optional condition)
    print "* do_something1() is done *"; return
  # Do your stuff here
  # ...
  print "do_something1() "+str(n1)
  tk.after(1000, do_something1)

def do_something2(): 
  global n2
  n2 += 1
  if n2 == 6: # (Optional condition)
    print "* do_something2() is done *"; return
  # Do your stuff here
  # ...
  print "do_something2() "+str(n2)
  tk.after(500, do_something2)

tk = Tkinter.Tk(); 
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()

Do_something1()和do_something2()可以以任意的间隔速度并行运行。在这里,第2个将以两倍的速度执行。还要注意,我使用了一个简单的计数器作为终止任一函数的条件。你可以使用任何你喜欢的条件,或者不使用,如果你想让一个函数运行到程序终止(例如一个时钟)。

    ''' tracking number of times it prints'''
import threading

global timeInterval
count=0
def printit():
  threading.Timer(timeInterval, printit).start()
  print( "Hello, World!")
  global count
  count=count+1
  print(count)
printit

if __name__ == "__main__":
    timeInterval= int(input('Enter Time in Seconds:'))
    printit()