Paramiko 在执行大型 wget 命令时挂起

发布于 2024-12-07 20:38:33 字数 5575 浏览 0 评论 0原文

您好,我在执行通过 Ubuntu 10 服务器执行 wget 100mb 文件的命令时遇到问题。除此之外,较短的命令可以正常工作。下面的课程包含我如何使用 paramiko 以及我克服此问题的不同尝试(请参阅不同的 run 或 exec 方法)。在 exec_cmd 的情况下,执行挂在这一行:

        out = self.in_buffer.read(nbytes, self.timeout)

来自 paramiko 的channel.py 模块的recv 方法。

使用 Mac 上的普通 ssh 实用程序,相同的 wget 命令可以在 shell 中完美运行。

"""
Management of SSH connections
"""

import logging
import os
import paramiko
import socket
import time
import StringIO


class SSHClient():
    def __init__(self):
        self._ssh_client = paramiko.SSHClient()
        self._ssh_client.load_system_host_keys()
        self._ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.time_out = 300
        self.wait = 5

    def connect(self, hostname, user, pkey):
        retry = self.time_out
        self.hostname = hostname
        logging.info("connecting to:%s user:%s key:%s" % (hostname, user, pkey))
        while retry > 0:
            try:
                self._ssh_client.connect(hostname,
                                         username=user,
                                         key_filename=os.path.expanduser(pkey),
                                         timeout=self.time_out)
                return
            except socket.error, (value,message):
                if value == 61 or value == 111:
                    logging.warning('SSH Connection refused, will retry in 5 seconds')
                    time.sleep(self.wait)
                    retry -= self.wait
                else:
                    raise
            except paramiko.BadHostKeyException:
                logging.warning("%s has an entry in ~/.ssh/known_hosts and it doesn't match" % self.server.hostname)
                logging.warning('Edit that file to remove the entry and then try again')
                retry = 0
            except EOFError:
                logging.warning('Unexpected Error from SSH Connection, retry in 5 seconds')
                time.sleep(self.wait)
                retry -= self.wait
        logging.error('Could not establish SSH connection')

    def exists(self, path):
        status = self.run('[ -a %s ] || echo "FALSE"' % path)
        if status[1].startswith('FALSE'):
            return 0
        return 1

    def shell(self):
        """
        Start an interactive shell session on the remote host.
        """
        channel = self._ssh_client.invoke_shell()
        interactive_shell(channel)

    def run(self, command):
        """
        Execute a command on the remote host.  Return a tuple containing
        an integer status and a string containing all output from the command.
        """
        logging.info('running:%s on %s' % (command, self.hostname))
        log_fp = StringIO.StringIO()
        status = 0
        try:
            t = self._ssh_client.exec_command(command)
        except paramiko.SSHException:
            logging.error("Error executing command: " + command)
            status = 1
        log_fp.write(t[1].read())
        log_fp.write(t[2].read())
        t[0].close()
        t[1].close()
        t[2].close()
        logging.info('output: %s' % log_fp.getvalue())
        return (status, log_fp.getvalue())

    def run_pty(self, command):
        """
        Execute a command on the remote host with a pseudo-terminal.
        Returns a string containing the output of the command.
        """
        logging.info('running:%s on %s' % (command, self.hostname))
        channel = self._ssh_client.get_transport().open_session()
        channel.get_pty()
        status = 0
        try:
            channel.exec_command(command)
        except:
            logging.error("Error executing command: " + command)
            status = 1
        return status, channel.recv(1024)

    def close(self):
        transport = self._ssh_client.get_transport()
        transport.close()

    def run_remote(self, cmd, check_exit_status=True, verbose=True, use_sudo=False):
        logging.info('running:%s on %s' % (cmd, self.hostname))
        ssh = self._ssh_client
        chan = ssh.get_transport().open_session()
        stdin = chan.makefile('wb')
        stdout = chan.makefile('rb')
        stderr = chan.makefile_stderr('rb')
        processed_cmd = cmd
        if use_sudo:
            processed_cmd = 'sudo -S bash -c "%s"' % cmd.replace('"', '\\"')
        chan.exec_command(processed_cmd)
        result = {
            'stdout': [],
            'stderr': [],
        }
        exit_status = chan.recv_exit_status()
        result['exit_status'] = exit_status

        def print_output():
            for line in stdout:
                result['stdout'].append(line)
                logging.info(line)
            for line in stderr:
                result['stderr'].append(line)
                logging.info(line)
        if verbose:
            print processed_cmd
            print_output()
        return exit_status,result 

    def exec_cmd(self, cmd):
        import select
        ssh = self._ssh_client
        channel = ssh.get_transport().open_session()
        END = "CMD_EPILOGqwkjidksjk58754dskhjdksjKDSL"
        cmd += ";echo " + END
        logging.info('running:%s on %s' % (cmd, self.hostname))
        channel.exec_command(cmd)
        out = ""
        buf = ""
        while END not in buf:
          rl, wl, xl = select.select([channel],[],[],0.0)
          if len(rl) > 0:
              # Must be stdout
              buf = channel.recv(1024)
              logging.info(buf)
              out += buf
        return 0, out

Hi I am having problems executing a command that performs a wget of a 100mb file over a Ubuntu 10 server. Shorter commands work fine except for this. The below class contains how I use paramiko and my different tries of overcoming this problem (see the different run or exec methods). In the case of exec_cmd the execution hangs on this line:

        out = self.in_buffer.read(nbytes, self.timeout)

from the recv method of the channel.py module from paramiko.

The same wget command works perfectly in a shell using the normal ssh utility from Mac.

"""
Management of SSH connections
"""

import logging
import os
import paramiko
import socket
import time
import StringIO


class SSHClient():
    def __init__(self):
        self._ssh_client = paramiko.SSHClient()
        self._ssh_client.load_system_host_keys()
        self._ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.time_out = 300
        self.wait = 5

    def connect(self, hostname, user, pkey):
        retry = self.time_out
        self.hostname = hostname
        logging.info("connecting to:%s user:%s key:%s" % (hostname, user, pkey))
        while retry > 0:
            try:
                self._ssh_client.connect(hostname,
                                         username=user,
                                         key_filename=os.path.expanduser(pkey),
                                         timeout=self.time_out)
                return
            except socket.error, (value,message):
                if value == 61 or value == 111:
                    logging.warning('SSH Connection refused, will retry in 5 seconds')
                    time.sleep(self.wait)
                    retry -= self.wait
                else:
                    raise
            except paramiko.BadHostKeyException:
                logging.warning("%s has an entry in ~/.ssh/known_hosts and it doesn't match" % self.server.hostname)
                logging.warning('Edit that file to remove the entry and then try again')
                retry = 0
            except EOFError:
                logging.warning('Unexpected Error from SSH Connection, retry in 5 seconds')
                time.sleep(self.wait)
                retry -= self.wait
        logging.error('Could not establish SSH connection')

    def exists(self, path):
        status = self.run('[ -a %s ] || echo "FALSE"' % path)
        if status[1].startswith('FALSE'):
            return 0
        return 1

    def shell(self):
        """
        Start an interactive shell session on the remote host.
        """
        channel = self._ssh_client.invoke_shell()
        interactive_shell(channel)

    def run(self, command):
        """
        Execute a command on the remote host.  Return a tuple containing
        an integer status and a string containing all output from the command.
        """
        logging.info('running:%s on %s' % (command, self.hostname))
        log_fp = StringIO.StringIO()
        status = 0
        try:
            t = self._ssh_client.exec_command(command)
        except paramiko.SSHException:
            logging.error("Error executing command: " + command)
            status = 1
        log_fp.write(t[1].read())
        log_fp.write(t[2].read())
        t[0].close()
        t[1].close()
        t[2].close()
        logging.info('output: %s' % log_fp.getvalue())
        return (status, log_fp.getvalue())

    def run_pty(self, command):
        """
        Execute a command on the remote host with a pseudo-terminal.
        Returns a string containing the output of the command.
        """
        logging.info('running:%s on %s' % (command, self.hostname))
        channel = self._ssh_client.get_transport().open_session()
        channel.get_pty()
        status = 0
        try:
            channel.exec_command(command)
        except:
            logging.error("Error executing command: " + command)
            status = 1
        return status, channel.recv(1024)

    def close(self):
        transport = self._ssh_client.get_transport()
        transport.close()

    def run_remote(self, cmd, check_exit_status=True, verbose=True, use_sudo=False):
        logging.info('running:%s on %s' % (cmd, self.hostname))
        ssh = self._ssh_client
        chan = ssh.get_transport().open_session()
        stdin = chan.makefile('wb')
        stdout = chan.makefile('rb')
        stderr = chan.makefile_stderr('rb')
        processed_cmd = cmd
        if use_sudo:
            processed_cmd = 'sudo -S bash -c "%s"' % cmd.replace('"', '\\"')
        chan.exec_command(processed_cmd)
        result = {
            'stdout': [],
            'stderr': [],
        }
        exit_status = chan.recv_exit_status()
        result['exit_status'] = exit_status

        def print_output():
            for line in stdout:
                result['stdout'].append(line)
                logging.info(line)
            for line in stderr:
                result['stderr'].append(line)
                logging.info(line)
        if verbose:
            print processed_cmd
            print_output()
        return exit_status,result 

    def exec_cmd(self, cmd):
        import select
        ssh = self._ssh_client
        channel = ssh.get_transport().open_session()
        END = "CMD_EPILOGqwkjidksjk58754dskhjdksjKDSL"
        cmd += ";echo " + END
        logging.info('running:%s on %s' % (cmd, self.hostname))
        channel.exec_command(cmd)
        out = ""
        buf = ""
        while END not in buf:
          rl, wl, xl = select.select([channel],[],[],0.0)
          if len(rl) > 0:
              # Must be stdout
              buf = channel.recv(1024)
              logging.info(buf)
              out += buf
        return 0, out

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

帅气尐潴 2024-12-14 20:38:33

我遇到了同样的问题,当我在远程 ssh 客户端上运行的 shell 脚本对 400Mb 文件执行 wget 命令时,我的 python 脚本挂起。

我发现向 wget 命令添加超时可以解决问题。
最初我有:

wget http://blah:8888/file.zip

现在有了:

wget -q -T90 http://blah:8888/file.zip

它就像一个魅力!

希望有帮助。

I had the same problem, my python script hung when the shell script I was running on a remote ssh client, did a wget command on a 400Mb file.

I found that adding a time-out to the wget command fixed the problem.
Originally I had:

wget http://blah:8888/file.zip

now with this:

wget -q -T90 http://blah:8888/file.zip

it works like a charm!

Hope it helps.

街角迷惘 2024-12-14 20:38:33
  1. 在这种情况下,我会先添加列表,然后连接。为什么?嗯,Python 中的字符串是不可变的。这意味着每次使用 += 时,您基本上都是在创建两个新字符串并读取第三个字符串。另一方面,如果您创建一个列表并将其追加,则创建的字符串数量将减半。
  2. 你真的需要多次调用 select 吗?我的理解是,您并不真正关心进程是否是线程阻塞的。由于 select 或多或少是同名 C 方法的包装器:

    <块引用>

    select() 和 pselect() 允许程序监视多个文件描述符,等待一个或多个文件描述符为某一类 I/O 操作(例如,可能的输入)“准备好”。文件描述符是 con-
    如果可以在不阻塞的情况下执行相应的 I/O 操作(例如 read(2)),则说明已就绪。

  3. 您没有在代码中监听 socket.timeout 异常。
  4. 写入 stdout/文件系统可能会很昂贵,但您正在记录由 recv 返回的每一行。你能移动日志行吗?
  5. 您是否考虑过手动读取频道?您在技术上需要的唯一代码是:
try:
    out = self.in_buffer.read(nbytes, self.timeout)
except PipeTimeout, e:
    # do something with error

它不能保证,但它会减少额外的处理。

  1. In this case, I would go with list appending and then concatenation. Why? Well, strings are immutable in Python. That means that every time you use += you are basically creating two new strings and reading a third. If you create a list and append it, on the other hand, you halve the number of strings created.
  2. Do you really need to call select multiple times? My understanding is that you don't really care if the process is thread-blocking. Since select is more or less a wrapper around the C method of the same name:

    select() and pselect() allow a program to monitor multiple file descriptors, waiting until one or more of the file descriptors become "ready" for some class of I/O operation (e.g., input possible). A file descriptor is con‐
    sidered ready if it is possible to perform the corresponding I/O operation (e.g., read(2)) without blocking.

  3. You are not listening for a socket.timeout Exception in your code.
  4. Writing to stdout/the file system can be expensive, yet you are logging every single line which is returned by recv. Can you move the log line?
  5. Have you considered handling reading the channel manually? The only code you technically need is:
try:
    out = self.in_buffer.read(nbytes, self.timeout)
except PipeTimeout, e:
    # do something with error

It isn't guaranteed, but it will cut out extra processing.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文