从 python 子进程获取输出并向其发出命令

发布于 2024-12-18 00:40:10 字数 3045 浏览 0 评论 0 原文

我试图从子进程获取输出,然后根据前面的输出向该进程发出命令。当程序需要进一步输入时,我需要多次执行此操作。 (如果可能的话,我还需要能够隐藏子进程命令提示符)。

我认为这将是一项简单的任务,因为我在 2003 年的帖子中看到过这个问题,现在已经快到 2012 年了,而且它似乎是一个非常普遍的需求,而且看起来它确实应该成为任何编程语言的基本部分。显然我错了,不知何故,差不多 9 年后,仍然没有标准方法以稳定、非破坏性、独立于平台的方式完成这项任务!

我不太了解文件 i/o 和缓冲或线程,所以我更喜欢一个尽可能简单的解决方案。如果有一个与 python 3.x 兼容的模块可以完成此任务,我将非常愿意下载它。我意识到有多个问题基本上询问同一件事,但我尚未找到解决我试图完成的简单任务的答案。

这是我迄今为止基于各种来源的代码;但我完全不知道下一步该做什么。我所有的尝试都以失败告终,有些尝试使用了 100% 的 CPU(基本上什么也不做)并且不会退出。

import subprocess
from subprocess import Popen, PIPE
p = Popen(r'C:\postgis_testing\shellcomm.bat',stdin=PIPE,stdout=PIPE,stderr=subprocess.STDOUT shell=True)
stdout,stdin = p.communicate(b'command string')

如果我的问题不清楚,我将发布示例批处理文件的文本,该文件演示了需要向子进程发送多个命令的情况(如果您键入不正确的命令字符串,程序将循环)。

@echo off
:looper
set INPUT=
set /P INPUT=Type the correct command string:
if "%INPUT%" == "command string" (echo you are correct) else (goto looper)

如果有人能帮助我,我将非常感激,我相信许多其他人也会如此!

这里编辑是使用 eryksun 的代码(下一篇文章)的功能代码:

import subprocess
import threading
import time
import sys

try: 
    import queue
except ImportError:
    import Queue as queue

def read_stdout(stdout, q, p):
    it = iter(lambda: stdout.read(1), b'')
    for c in it:
        q.put(c)
        if stdout.closed:
            break

_encoding = getattr(sys.stdout, 'encoding', 'latin-1')
def get_stdout(q, encoding=_encoding):
    out = []
    while 1:
        try:
            out.append(q.get(timeout=0.2))
        except queue.Empty:
            break
    return b''.join(out).rstrip().decode(encoding)

def printout(q):
    outdata = get_stdout(q)
    if outdata:
        print('Output: %s' % outdata)

if __name__ == '__main__':
    #setup
    p = subprocess.Popen(['shellcomm.bat'], stdin=subprocess.PIPE, 
                     stdout=subprocess.PIPE, stderr=subprocess.PIPE, 
                     bufsize=0, shell=True) # I put shell=True to hide prompt
    q = queue.Queue()
    encoding = getattr(sys.stdin, 'encoding', 'utf-8')

    #for reading stdout
    t = threading.Thread(target=read_stdout, args=(p.stdout, q, p))
    t.daemon = True
    t.start()

    #command loop
    while p.poll() is None:
        printout(q)
        cmd = input('Input: ')
        cmd = (cmd + '\n').encode(encoding)
        p.stdin.write(cmd)
        time.sleep(0.1) # I added this to give some time to check for closure (otherwise it doesn't work)

    #tear down
    for n in range(4):
        rc = p.poll()
        if rc is not None:
            break
        time.sleep(0.25)
    else:
        p.terminate()
        rc = p.poll()
        if rc is None:
            rc = 1

    printout(q)
    print('Return Code: %d' % rc)

但是,当从命令提示符运行脚本时,会发生以下情况:

C:\Users\username>python C:\postgis_testing\shellcomm7.py
Input: sth
Traceback (most recent call last):
File "C:\postgis_testing\shellcomm7.py", line 51, in <module>
    p.stdin.write(cmd)
IOError: [Errno 22] Invalid argument

从命令提示符运行时,程序似乎会关闭。有什么想法吗?

I am trying to get output from a subprocess and then give commands to that process based on the preceding output. I need to do this a variable number of times, when the program needs further input. (I also need to be able to hide the subprocess command prompt if possible).

I figured this would be an easy task given that I have seen this problem being discussed in posts from 2003 and it is nearly 2012 and it appears to be a pretty common need and really seems like it should be a basic part of any programming language. Apparently I was wrong and somehow almost 9 years later there is still no standard way of accomplishing this task in a stable, non-destructive, platform independent way!

I don't really understand much about file i/o and buffering or threading so I would prefer a solution that is as simple as possible. If there is a module that accomplishes this that is compatible with python 3.x, I would be very willing to download it. I realize that there are multiple questions that ask basically the same thing, but I have yet to find an answer that addresses the simple task that I am trying to accomplish.

Here is the code I have so far based on a variety of sources; however I have absolutely no idea what to do next. All my attempts ended in failure and some managed to use 100% of my CPU (to do basically nothing) and would not quit.

import subprocess
from subprocess import Popen, PIPE
p = Popen(r'C:\postgis_testing\shellcomm.bat',stdin=PIPE,stdout=PIPE,stderr=subprocess.STDOUT shell=True)
stdout,stdin = p.communicate(b'command string')

In case my question is unclear I am posting the text of the sample batch file that I demonstrates a situation in which it is necessary to send multiple commands to the subprocess (if you type an incorrect command string the program loops).

@echo off
:looper
set INPUT=
set /P INPUT=Type the correct command string:
if "%INPUT%" == "command string" (echo you are correct) else (goto looper)

If anyone can help me I would very much appreciate it, and I'm sure many others would as well!

EDIT here is the functional code using eryksun's code (next post) :

import subprocess
import threading
import time
import sys

try: 
    import queue
except ImportError:
    import Queue as queue

def read_stdout(stdout, q, p):
    it = iter(lambda: stdout.read(1), b'')
    for c in it:
        q.put(c)
        if stdout.closed:
            break

_encoding = getattr(sys.stdout, 'encoding', 'latin-1')
def get_stdout(q, encoding=_encoding):
    out = []
    while 1:
        try:
            out.append(q.get(timeout=0.2))
        except queue.Empty:
            break
    return b''.join(out).rstrip().decode(encoding)

def printout(q):
    outdata = get_stdout(q)
    if outdata:
        print('Output: %s' % outdata)

if __name__ == '__main__':
    #setup
    p = subprocess.Popen(['shellcomm.bat'], stdin=subprocess.PIPE, 
                     stdout=subprocess.PIPE, stderr=subprocess.PIPE, 
                     bufsize=0, shell=True) # I put shell=True to hide prompt
    q = queue.Queue()
    encoding = getattr(sys.stdin, 'encoding', 'utf-8')

    #for reading stdout
    t = threading.Thread(target=read_stdout, args=(p.stdout, q, p))
    t.daemon = True
    t.start()

    #command loop
    while p.poll() is None:
        printout(q)
        cmd = input('Input: ')
        cmd = (cmd + '\n').encode(encoding)
        p.stdin.write(cmd)
        time.sleep(0.1) # I added this to give some time to check for closure (otherwise it doesn't work)

    #tear down
    for n in range(4):
        rc = p.poll()
        if rc is not None:
            break
        time.sleep(0.25)
    else:
        p.terminate()
        rc = p.poll()
        if rc is None:
            rc = 1

    printout(q)
    print('Return Code: %d' % rc)

However when the script is run from a command prompt the following happens:

C:\Users\username>python C:\postgis_testing\shellcomm7.py
Input: sth
Traceback (most recent call last):
File "C:\postgis_testing\shellcomm7.py", line 51, in <module>
    p.stdin.write(cmd)
IOError: [Errno 22] Invalid argument

It seems that the program closes out when run from command prompt. any ideas?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

南城追梦 2024-12-25 00:40:10

该演示使用专用线程从标准输出读取。如果您四处搜索,我相信您可以找到用面向对象接口编写的更完整的实现。至少我可以说这对于我在 Python 2.7.2 和 3.2.2 中提供的批处理文件都有效。

shellcomm.bat:

@echo off
echo Command Loop Test
echo.
:looper
set INPUT=
set /P INPUT=Type the correct command string:
if "%INPUT%" == "command string" (echo you are correct) else (goto looper)

这是我根据命令“错误”、“仍然错误”和“命令字符串”序列得到的输出:

Output:
Command Loop Test

Type the correct command string:
Input: wrong
Output:
Type the correct command string:
Input: still wrong
Output:
Type the correct command string:
Input: command string
Output:
you are correct

Return Code: 0

为了读取管道输出,readline有时可能会起作用,但是批处理文件中的set /P INPUT自然不会写入行结尾。因此,我使用 lambda: stdout.read(1) 一次读取一个字节(效率不高,但有效)。读取函数将数据放入队列中。主线程在写入命令后从队列中获取输出。此处对 get 调用使用超时会使其等待一小段时间,以确保程序正在等待输入。相反,您可以检查输出中的提示,以了解程序何时需要输入。

尽管如此,您不能期望这样的设置能够普遍工作,因为您尝试与之交互的控制台程序可能会在通过管道传输时缓冲其输出。在 Unix 系统中,有一些可用的实用命令,您可以将它们插入管道中以将缓冲修改为非缓冲、行缓冲或给定大小 - 例如 stdbuf。还有一些方法可以欺骗程序认为它已连接到 pty(请参阅 pexpect)。但是,如果您无法访问程序的源代码来使用 setvbuf

import subprocess
import threading
import time
import sys

if sys.version_info.major >= 3:
    import queue
else:
    import Queue as queue
    input = raw_input

def read_stdout(stdout, q):
    it = iter(lambda: stdout.read(1), b'')
    for c in it:
        q.put(c)
        if stdout.closed:
            break

_encoding = getattr(sys.stdout, 'encoding', 'latin-1')
def get_stdout(q, encoding=_encoding):
    out = []
    while 1:
        try:
            out.append(q.get(timeout=0.2))
        except queue.Empty:
            break
    return b''.join(out).rstrip().decode(encoding)

def printout(q):
    outdata = get_stdout(q)
    if outdata:
        print('Output:\n%s' % outdata)

if __name__ == '__main__':

    ARGS = ["shellcomm.bat"]   ### Modify this

    #setup
    p = subprocess.Popen(ARGS, bufsize=0, stdin=subprocess.PIPE, 
                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    q = queue.Queue()
    encoding = getattr(sys.stdin, 'encoding', 'utf-8')

    #for reading stdout
    t = threading.Thread(target=read_stdout, args=(p.stdout, q))
    t.daemon = True
    t.start()

    #command loop
    while 1:
        printout(q)
        if p.poll() is not None or p.stdin.closed:
            break
        cmd = input('Input: ') 
        cmd = (cmd + '\n').encode(encoding)
        p.stdin.write(cmd)

    #tear down
    for n in range(4):
        rc = p.poll()
        if rc is not None:
            break
        time.sleep(0.25)
    else:
        p.terminate()
        rc = p.poll()
        if rc is None:
            rc = 1

    printout(q)
    print('\nReturn Code: %d' % rc)

This demo uses a dedicated thread to read from stdout. If you search around, I'm sure you can find a more complete implementation written up in an object oriented interface. At least I can say this is working for me with your provided batch file in both Python 2.7.2 and 3.2.2.

shellcomm.bat:

@echo off
echo Command Loop Test
echo.
:looper
set INPUT=
set /P INPUT=Type the correct command string:
if "%INPUT%" == "command string" (echo you are correct) else (goto looper)

Here's what I get for output based on the sequence of commands "wrong", "still wrong", and "command string":

Output:
Command Loop Test

Type the correct command string:
Input: wrong
Output:
Type the correct command string:
Input: still wrong
Output:
Type the correct command string:
Input: command string
Output:
you are correct

Return Code: 0

For reading the piped output, readline might work sometimes, but set /P INPUT in the batch file naturally isn't writing a line ending. So instead I used lambda: stdout.read(1) to read a byte at a time (not so efficient, but it works). The reading function puts the data on a queue. The main thread gets the output from the queue after it writes a a command. Using a timeout on the get call here makes it wait a small amount of time to ensure the program is waiting for input. Instead you could check the output for prompts to know when the program is expecting input.

All that said, you can't expect a setup like this to work universally because the console program you're trying to interact with might buffer its output when piped. In Unix systems there are some utility commands available that you can insert into a pipe to modify the buffering to be non-buffered, line-buffered, or a given size -- such as stdbuf. There are also ways to trick the program into thinking it's connected to a pty (see pexpect). However, I don't know a way around this problem on Windows if you don't have access to the program's source code to explicitly set the buffering using setvbuf.

import subprocess
import threading
import time
import sys

if sys.version_info.major >= 3:
    import queue
else:
    import Queue as queue
    input = raw_input

def read_stdout(stdout, q):
    it = iter(lambda: stdout.read(1), b'')
    for c in it:
        q.put(c)
        if stdout.closed:
            break

_encoding = getattr(sys.stdout, 'encoding', 'latin-1')
def get_stdout(q, encoding=_encoding):
    out = []
    while 1:
        try:
            out.append(q.get(timeout=0.2))
        except queue.Empty:
            break
    return b''.join(out).rstrip().decode(encoding)

def printout(q):
    outdata = get_stdout(q)
    if outdata:
        print('Output:\n%s' % outdata)

if __name__ == '__main__':

    ARGS = ["shellcomm.bat"]   ### Modify this

    #setup
    p = subprocess.Popen(ARGS, bufsize=0, stdin=subprocess.PIPE, 
                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    q = queue.Queue()
    encoding = getattr(sys.stdin, 'encoding', 'utf-8')

    #for reading stdout
    t = threading.Thread(target=read_stdout, args=(p.stdout, q))
    t.daemon = True
    t.start()

    #command loop
    while 1:
        printout(q)
        if p.poll() is not None or p.stdin.closed:
            break
        cmd = input('Input: ') 
        cmd = (cmd + '\n').encode(encoding)
        p.stdin.write(cmd)

    #tear down
    for n in range(4):
        rc = p.poll()
        if rc is not None:
            break
        time.sleep(0.25)
    else:
        p.terminate()
        rc = p.poll()
        if rc is None:
            rc = 1

    printout(q)
    print('\nReturn Code: %d' % rc)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文