下面是运行任意命令返回其标准输出数据的Python代码,或在非零退出码上引发异常:
proc = subprocess.Popen(
cmd,
stderr=subprocess.STDOUT, # Merge stdout and stderr
stdout=subprocess.PIPE,
shell=True)
communication用于等待进程退出:
stdoutdata, stderrdata = proc.communicate()
子进程模块不支持超时——杀死运行超过X秒的进程的能力——因此,通信可能需要很长时间才能运行。
在Windows和Linux上运行的Python程序中实现超时的最简单方法是什么?
我有一个问题,我想终止一个多线程子进程,如果它花费的时间超过给定的超时长度。我想在Popen()中设置一个超时,但它不起作用。然后,我意识到Popen().wait()等于call(),所以我有了在.wait(timeout=xxx)方法中设置超时的想法,这最终工作了。因此,我是这样解决的:
import os
import sys
import signal
import subprocess
from multiprocessing import Pool
cores_for_parallelization = 4
timeout_time = 15 # seconds
def main():
jobs = [...YOUR_JOB_LIST...]
with Pool(cores_for_parallelization) as p:
p.map(run_parallel_jobs, jobs)
def run_parallel_jobs(args):
# Define the arguments including the paths
initial_terminal_command = 'C:\\Python34\\python.exe' # Python executable
function_to_start = 'C:\\temp\\xyz.py' # The multithreading script
final_list = [initial_terminal_command, function_to_start]
final_list.extend(args)
# Start the subprocess and determine the process PID
subp = subprocess.Popen(final_list) # starts the process
pid = subp.pid
# Wait until the return code returns from the function by considering the timeout.
# If not, terminate the process.
try:
returncode = subp.wait(timeout=timeout_time) # should be zero if accomplished
except subprocess.TimeoutExpired:
# Distinguish between Linux and Windows and terminate the process if
# the timeout has been expired
if sys.platform == 'linux2':
os.kill(pid, signal.SIGTERM)
elif sys.platform == 'win32':
subp.terminate()
if __name__ == '__main__':
main()
我已经实现了我能从其中一些收集到的东西。这适用于Windows,因为这是一个社区维基,我想我也会分享我的代码:
class Command(threading.Thread):
def __init__(self, cmd, outFile, errFile, timeout):
threading.Thread.__init__(self)
self.cmd = cmd
self.process = None
self.outFile = outFile
self.errFile = errFile
self.timed_out = False
self.timeout = timeout
def run(self):
self.process = subprocess.Popen(self.cmd, stdout = self.outFile, \
stderr = self.errFile)
while (self.process.poll() is None and self.timeout > 0):
time.sleep(1)
self.timeout -= 1
if not self.timeout > 0:
self.process.terminate()
self.timed_out = True
else:
self.timed_out = False
然后从另一个类或文件:
outFile = tempfile.SpooledTemporaryFile()
errFile = tempfile.SpooledTemporaryFile()
executor = command.Command(c, outFile, errFile, timeout)
executor.daemon = True
executor.start()
executor.join()
if executor.timed_out:
out = 'timed out'
else:
outFile.seek(0)
errFile.seek(0)
out = outFile.read()
err = errFile.read()
outFile.close()
errFile.close()
This solution kills the process tree in case of shell=True, passes parameters to the process (or not), has a timeout and gets the stdout, stderr and process output of the call back (it uses psutil for the kill_proc_tree). This was based on several solutions posted in SO including jcollado's. Posting in response to comments by Anson and jradice in jcollado's answer. Tested in Windows Srvr 2012 and Ubuntu 14.04. Please note that for Ubuntu you need to change the parent.children(...) call to parent.get_children(...).
def kill_proc_tree(pid, including_parent=True):
parent = psutil.Process(pid)
children = parent.children(recursive=True)
for child in children:
child.kill()
psutil.wait_procs(children, timeout=5)
if including_parent:
parent.kill()
parent.wait(5)
def run_with_timeout(cmd, current_dir, cmd_parms, timeout):
def target():
process = subprocess.Popen(cmd, cwd=current_dir, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
# wait for the process to terminate
if (cmd_parms == ""):
out, err = process.communicate()
else:
out, err = process.communicate(cmd_parms)
errcode = process.returncode
thread = Thread(target=target)
thread.start()
thread.join(timeout)
if thread.is_alive():
me = os.getpid()
kill_proc_tree(me, including_parent=False)
thread.join()
有一个想法是子类化Popen类并用一些简单的方法装饰器来扩展它。我们叫它ExpirablePopen吧。
from logging import error
from subprocess import Popen
from threading import Event
from threading import Thread
class ExpirablePopen(Popen):
def __init__(self, *args, **kwargs):
self.timeout = kwargs.pop('timeout', 0)
self.timer = None
self.done = Event()
Popen.__init__(self, *args, **kwargs)
def __tkill(self):
timeout = self.timeout
if not self.done.wait(timeout):
error('Terminating process {} by timeout of {} secs.'.format(self.pid, timeout))
self.kill()
def expirable(func):
def wrapper(self, *args, **kwargs):
# zero timeout means call of parent method
if self.timeout == 0:
return func(self, *args, **kwargs)
# if timer is None, need to start it
if self.timer is None:
self.timer = thr = Thread(target=self.__tkill)
thr.daemon = True
thr.start()
result = func(self, *args, **kwargs)
self.done.set()
return result
return wrapper
wait = expirable(Popen.wait)
communicate = expirable(Popen.communicate)
if __name__ == '__main__':
from subprocess import PIPE
print ExpirablePopen('ssh -T git@bitbucket.org', stdout=PIPE, timeout=1).communicate()
下面是Alex Martelli的解决方案,它是一个具有适当进程终止的模块。其他方法不起作用,因为它们没有使用proc. communication()。所以如果你有一个产生大量输出的进程,它会填满它的输出缓冲区,然后阻塞,直到你从中读取一些东西。
from os import kill
from signal import alarm, signal, SIGALRM, SIGKILL
from subprocess import PIPE, Popen
def run(args, cwd = None, shell = False, kill_tree = True, timeout = -1, env = None):
'''
Run a command with a timeout after which it will be forcibly
killed.
'''
class Alarm(Exception):
pass
def alarm_handler(signum, frame):
raise Alarm
p = Popen(args, shell = shell, cwd = cwd, stdout = PIPE, stderr = PIPE, env = env)
if timeout != -1:
signal(SIGALRM, alarm_handler)
alarm(timeout)
try:
stdout, stderr = p.communicate()
if timeout != -1:
alarm(0)
except Alarm:
pids = [p.pid]
if kill_tree:
pids.extend(get_process_children(p.pid))
for pid in pids:
# process might have died before getting to this line
# so wrap to avoid OSError: no such process
try:
kill(pid, SIGKILL)
except OSError:
pass
return -9, '', ''
return p.returncode, stdout, stderr
def get_process_children(pid):
p = Popen('ps --no-headers -o pid --ppid %d' % pid, shell = True,
stdout = PIPE, stderr = PIPE)
stdout, stderr = p.communicate()
return [int(p) for p in stdout.split()]
if __name__ == '__main__':
print run('find /', shell = True, timeout = 3)
print run('find', shell = True)