我如何在Python中命名一个外部命令,就好像我把它写在一个<unk>或命令中?


当前回答

最多案例:

在大多数情况下,一个短片的代码,如此,是你将需要的一切:

import subprocess
import shlex

source = "test.txt"
destination = "test_copy.txt"

base = "cp {source} {destination}'"
cmd = base.format(source=source, destination=destination)
subprocess.check_call(shlex.split(cmd))

它是干净和简单的。

subprocess.check_call 以论点运行命令,等待命令完成。 shlex.split 使用 shell-like syntax 分割行 cmd

其他案例:

如果这不适用于某些特定的命令,最有可能你有一个问题与命令线的解释器. 操作系统选择了默认一个不适合你的程序类型或可能没有找到适当的一个在系统可执行的路径。

例子:

使用Unix系统上的转向操作器

input_1 = "input_1.txt"
input_2 = "input_2.txt"
output = "merged.txt"
base_command = "/bin/bash -c 'cat {input} >> {output}'"

base_command.format(input_1, output=output)
subprocess.check_call(shlex.split(base_command))

base_command.format(input_2, output=output)
subprocess.check_call(shlex.split(base_command))

正如《Python Zen: Explicit is better than implicit》中所说的那样。

因此,如果使用 Python >=3.6 函数,它会看起来像这样:

import subprocess
import shlex

def run_command(cmd_interpreter: str, command: str) -> None:
    base_command = f"{cmd_interpreter} -c '{command}'"
    subprocess.check_call(shlex.split(base_command)

其他回答

如果您正在写一个 Python shell 脚本,并且在您的系统上安装了 IPython,您可以使用 bang 预定来在 IPython 中运行一个 shell 命令:

!ls
filelist = !ls

我用的是Python 3.6+:

import subprocess
def execute(cmd):
    """
        Purpose  : To execute a command and return exit status
        Argument : cmd - command to execute
        Return   : result, exit_code
    """
    process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    (result, error) = process.communicate()
    rc = process.wait()
    if rc != 0:
        print ("Error: failed to execute command: ", cmd)
        print (error.rstrip().decode("utf-8"))
    return result.rstrip().decode("utf-8"), serror.rstrip().decode("utf-8")
# def

使用:

import subprocess

p = subprocess.Popen("df -h", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0]
print p.split("\n")

它提供良好的产量,更容易与:

['Filesystem      Size  Used Avail Use% Mounted on',
 '/dev/sda6        32G   21G   11G  67% /',
 'none            4.0K     0  4.0K   0% /sys/fs/cgroup',
 'udev            1.9G  4.0K  1.9G   1% /dev',
 'tmpfs           387M  1.4M  386M   1% /run',
 'none            5.0M     0  5.0M   0% /run/lock',
 'none            1.9G   58M  1.9G   3% /run/shm',
 'none            100M   32K  100M   1% /run/user',
 '/dev/sda5       340G  222G  100G  69% /home',
 '']

如果您需要从 Python 笔记本电脑(如 Jupyter、Zeppelin、Databricks 或 Google Cloud Datalab)中拨打一个支票命令,您只能使用! 预定。

例如,

!ls -ilF

我常常用下列函数用于外部命令,这特别适用于长时间运行过程,下面的方法在运行时将过程输出缩短,并返回输出,如果过程失败,则会产生一个例外。

结果是,如果这个过程是通过 poll() 方法完成的。

import subprocess,sys

def exec_long_running_proc(command, args):
    cmd = "{} {}".format(command, " ".join(str(arg) if ' ' not in arg else arg.replace(' ','\ ') for arg in args))
    print(cmd)
    process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

    # Poll process for new output until finished
    while True:
        nextline = process.stdout.readline().decode('UTF-8')
        if nextline == '' and process.poll() is not None:
            break
        sys.stdout.write(nextline)
        sys.stdout.flush()

    output = process.communicate()[0]
    exitCode = process.returncode

    if (exitCode == 0):
        return output
    else:
        raise Exception(command, exitCode, output)

你可以这样引用它:

exec_long_running_proc(command = "hive", args=["-f", hql_path])