我如何在Python中命名一个外部命令,就好像我把它写在一个<unk>或命令中?
当前回答
我常常用下列函数用于外部命令,这特别适用于长时间运行过程,下面的方法在运行时将过程输出缩短,并返回输出,如果过程失败,则会产生一个例外。
结果是,如果这个过程是通过 poll() 方法完成的。
import subprocess,sys
def exec_long_running_proc(command, args):
cmd = "{} {}".format(command, " ".join(str(arg) if ' ' not in arg else arg.replace(' ','\ ') for arg in args))
print(cmd)
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Poll process for new output until finished
while True:
nextline = process.stdout.readline().decode('UTF-8')
if nextline == '' and process.poll() is not None:
break
sys.stdout.write(nextline)
sys.stdout.flush()
output = process.communicate()[0]
exitCode = process.returncode
if (exitCode == 0):
return output
else:
raise Exception(command, exitCode, output)
你可以这样引用它:
exec_long_running_proc(command = "hive", args=["-f", hql_path])
其他回答
在Linux中,如果您想打电话给一个将独立执行的外部命令(在Python脚本结束后将继续运行),您可以使用一个简单的字符串作为任务插槽或命令中的命令。
例如,Task Spooler:
import os
os.system('ts <your-command>')
關於Task Spooler(TS)的評論:
您可以设置要运行的竞争对手过程的数量(“插槽”)与: ts -S <插槽的号码> 安装 ts 不需要管理特权. 您可以从源头下载并编写一个简单的创建,将其添加到您的路径,并完成。
这里有很多答案,但没有一个满足了我所有的需求。
我需要运行命令并捕获输出和输出代码. 我需要时间out 执行的程序,并强迫它出去,如果时间out 达到,并杀死它的所有儿童过程. 我需要它在 Windows XP 和以后, Cygwin 和 Linux 工作。
def _run(command, timeout_s=False, shell=False):
### run a process, capture the output and wait for it to finish. if timeout is specified then Kill the subprocess and its children when the timeout is reached (if parent did not detach)
## usage: _run(arg1, arg2, arg3)
# arg1: command + arguments. Always pass a string; the function will split it when needed
# arg2: (optional) timeout in seconds before force killing
# arg3: (optional) shell usage. default shell=False
## return: a list containing: exit code, output, and if timeout was reached or not
# - Tested on Python 2 and 3 on Windows XP, Windows 7, Cygwin and Linux.
# - preexec_fn=os.setsid (py2) is equivalent to start_new_session (py3) (works on Linux only), in Windows and Cygwin we use TASKKILL
# - we use stderr=subprocess.STDOUT to merge standard error and standard output
import sys, subprocess, os, signal, shlex, time
def _runPY3(command, timeout_s=None, shell=False):
# py3.3+ because: timeout was added to communicate() in py3.3.
new_session=False
if sys.platform.startswith('linux'): new_session=True
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, start_new_session=new_session, shell=shell)
try:
out = p.communicate(timeout=timeout_s)[0].decode('utf-8')
is_timeout_reached = False
except subprocess.TimeoutExpired:
print('Timeout reached: Killing the whole process group...')
killAll(p.pid)
out = p.communicate()[0].decode('utf-8')
is_timeout_reached = True
return p.returncode, out, is_timeout_reached
def _runPY2(command, timeout_s=0, shell=False):
preexec=None
if sys.platform.startswith('linux'): preexec=os.setsid
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=preexec, shell=shell)
start_time = time.time()
is_timeout_reached = False
while timeout_s and p.poll() == None:
if time.time()-start_time >= timeout_s:
print('Timeout reached: Killing the whole process group...')
killAll(p.pid)
is_timeout_reached = True
break
time.sleep(1)
out = p.communicate()[0].decode('utf-8')
return p.returncode, out, is_timeout_reached
def killAll(ParentPid):
if sys.platform.startswith('linux'):
os.killpg(os.getpgid(ParentPid), signal.SIGTERM)
elif sys.platform.startswith('cygwin'):
# subprocess.Popen(shlex.split('bash -c "TASKKILL /F /PID $(</proc/{pid}/winpid) /T"'.format(pid=ParentPid)))
winpid=int(open("/proc/{pid}/winpid".format(pid=ParentPid)).read())
subprocess.Popen(['TASKKILL', '/F', '/PID', str(winpid), '/T'])
elif sys.platform.startswith('win32'):
subprocess.Popen(['TASKKILL', '/F', '/PID', str(ParentPid), '/T'])
# - In Windows, we never need to split the command, but in Cygwin and Linux we need to split if shell=False (default), shlex will split the command for us
if shell==False and (sys.platform.startswith('cygwin') or sys.platform.startswith('linux')):
command=shlex.split(command)
if sys.version_info >= (3, 3): # py3.3+
if timeout_s==False:
returnCode, output, is_timeout_reached = _runPY3(command, timeout_s=None, shell=shell)
else:
returnCode, output, is_timeout_reached = _runPY3(command, timeout_s=timeout_s, shell=shell)
else: # Python 2 and up to 3.2
if timeout_s==False:
returnCode, output, is_timeout_reached = _runPY2(command, timeout_s=0, shell=shell)
else:
returnCode, output, is_timeout_reached = _runPY2(command, timeout_s=timeout_s, shell=shell)
return returnCode, output, is_timeout_reached
然后用它如下:
总是将命令作为一个行(它更容易)。你不需要分开它;函数在需要时将分开它。
所以我们可以用这个类似的时间:
a=_run('cmd /c echo 11111 & echo 22222 & calc',3)
for i in a[1].splitlines(): print(i)
或者没有时间:
b=_run('cmd /c echo 11111 & echo 22222 & calc')
更多例子:
b=_run('''wmic nic where 'NetConnectionID="Local Area Connection"' get NetConnectionStatus /value''')
print(b)
c=_run('cmd /C netsh interface ip show address "Local Area Connection"')
print(c)
d=_run('printf "<%s>\n" "{foo}"')
print(d)
你也可以指定Shell=True,但在大多数情况下,这个功能是无用的。我宁愿自己选择我想要的Shell,但这里是,如果你也需要它:
# windows
e=_run('echo 11111 & echo 22222 & calc',3, shell=True)
print(e)
# Cygwin/Linux:
f=_run('printf "<%s>\n" "{foo}"', shell=True)
print(f)
為什麼我沒有使用更簡單的新方法 subprocess.run()?
因为它支持在Python 3.7+,但最后支持的Python版本在Windows XP是3.4. 而且由于这个功能的Timeout论点在Windows中是无用的,它不会杀死执行命令的儿童过程. 如果你使用 capture_output + Timeout论点,它会挂在如果有一个儿童过程仍然运行。
我用的是Python 3.6+:
import subprocess
def execute(cmd):
"""
Purpose : To execute a command and return exit status
Argument : cmd - command to execute
Return : result, exit_code
"""
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(result, error) = process.communicate()
rc = process.wait()
if rc != 0:
print ("Error: failed to execute command: ", cmd)
print (error.rstrip().decode("utf-8"))
return result.rstrip().decode("utf-8"), serror.rstrip().decode("utf-8")
# def
我会推荐下面的“运行”方法,它会帮助我们获得标准输出、标准错误和输出状态作为词典;这个词典的呼叫者可以通过“运行”方法阅读词典返回,以了解过程的实际状态。
def run (cmd):
print "+ DEBUG exec({0})".format(cmd)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True)
(out, err) = p.communicate()
ret = p.wait()
out = filter(None, out.split('\n'))
err = filter(None, err.split('\n'))
ret = True if ret == 0 else False
return dict({'output': out, 'error': err, 'status': ret})
#end
import subprocess
p = subprocess.run(["ls", "-ltr"], capture_output=True)
print(p.stdout.decode(), p.stderr.decode())
网上尝试
推荐文章
- 证书验证失败:无法获得本地颁发者证书
- 当使用pip3安装包时,“Python中的ssl模块不可用”
- 无法切换Python与pyenv
- Python if not == vs if !=
- 如何从终端/命令行调用VS代码编辑器
- 如何从scikit-learn决策树中提取决策规则?
- 为什么在Mac OS X v10.9 (Mavericks)的终端中apt-get功能不起作用?
- 将旋转的xtick标签与各自的xtick对齐
- 为什么元组可以包含可变项?
- 如何合并字典的字典?
- 如何创建类属性?
- 不区分大小写的“in”
- 在Python中获取迭代器中的元素个数
- 解析日期字符串并更改格式
- 查看PS命令的全部输出