我想在Python中每60秒重复执行一个函数(就像Objective C中的NSTimer或JS中的setTimeout)。这段代码将作为守护进程运行,有效地类似于使用cron每分钟调用python脚本,但不需要用户设置。

在这个关于用Python实现的cron的问题中,解决方案似乎只有效地使()休眠x秒。我不需要这么高级的功能,所以也许这样的东西可以工作

while True:
    # Code executed here
    time.sleep(60)

这段代码是否存在任何可预见的问题?


当前回答

计时计数可以做到高精度(即< 1毫秒),因为它与系统时钟同步。它不会随着时间的推移而漂移,也不受代码执行时间长度的影响(当然,前提是它小于间隔时间)。

一个简单的阻塞的例子:

from timed_count import timed_count

for count in timed_count(60):
    # Execute code here exactly every 60 seconds
    ...

你可以通过在线程中运行它来让它变得不阻塞:

from threading import Thread
from timed_count import timed_count

def periodic():
    for count in timed_count(60):
        # Execute code here exactly every 60 seconds
        ...

thread = Thread(target=periodic)
thread.start()

其他回答

你可能会考虑Twisted,它是一个实现了Reactor Pattern的Python网络库。

from twisted.internet import task, reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

虽然“While True: sleep(60)”可能会工作,Twisted可能已经实现了许多你最终需要的功能(如bobince指出的守护进程化、日志记录或异常处理),并且可能是一个更健壮的解决方案

我使用Tkinter after()方法,它不会“窃取游戏”(就像之前提出的sched模块),即它允许其他东西并行运行:

import Tkinter

def do_something1():
  global n1
  n1 += 1
  if n1 == 6: # (Optional condition)
    print "* do_something1() is done *"; return
  # Do your stuff here
  # ...
  print "do_something1() "+str(n1)
  tk.after(1000, do_something1)

def do_something2(): 
  global n2
  n2 += 1
  if n2 == 6: # (Optional condition)
    print "* do_something2() is done *"; return
  # Do your stuff here
  # ...
  print "do_something2() "+str(n2)
  tk.after(500, do_something2)

tk = Tkinter.Tk(); 
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()

Do_something1()和do_something2()可以以任意的间隔速度并行运行。在这里,第2个将以两倍的速度执行。还要注意,我使用了一个简单的计数器作为终止任一函数的条件。你可以使用任何你喜欢的条件,或者不使用,如果你想让一个函数运行到程序终止(例如一个时钟)。

计时计数可以做到高精度(即< 1毫秒),因为它与系统时钟同步。它不会随着时间的推移而漂移,也不受代码执行时间长度的影响(当然,前提是它小于间隔时间)。

一个简单的阻塞的例子:

from timed_count import timed_count

for count in timed_count(60):
    # Execute code here exactly every 60 seconds
    ...

你可以通过在线程中运行它来让它变得不阻塞:

from threading import Thread
from timed_count import timed_count

def periodic():
    for count in timed_count(60):
        # Execute code here exactly every 60 seconds
        ...

thread = Thread(target=periodic)
thread.start()

一个可能的答案是:

import time
t=time.time()

while True:
    if time.time()-t>10:
        #run your task here
        t=time.time()

另一种灵活性解决方案是Apscheduler。

pip install apscheduler
from apscheduler.schedulers.background import BlockingScheduler
def print_t():
  pass

sched = BlockingScheduler()
sched.add_job(print_t, 'interval', seconds =60) #will do the print_t work for every 60 seconds

sched.start()

另外,apscheduler提供了如下所示的许多调度程序。

BlockingScheduler: use when the scheduler is the only thing running in your process BackgroundScheduler: use when you’re not using any of the frameworks below, and want the scheduler to run in the background inside your application AsyncIOScheduler: use if your application uses the asyncio module GeventScheduler: use if your application uses gevent TornadoScheduler: use if you’re building a Tornado application TwistedScheduler: use if you’re building a Twisted application QtScheduler: use if you’re building a Qt application