summaryrefslogtreecommitdiff
path: root/paramiko/channel.py
diff options
context:
space:
mode:
Diffstat (limited to 'paramiko/channel.py')
-rw-r--r--paramiko/channel.py1174
1 files changed, 1174 insertions, 0 deletions
diff --git a/paramiko/channel.py b/paramiko/channel.py
new file mode 100644
index 0000000..8a00233
--- /dev/null
+++ b/paramiko/channel.py
@@ -0,0 +1,1174 @@
+# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Abstraction for an SSH2 channel.
+"""
+
+import sys
+import time
+import threading
+import socket
+import os
+
+from paramiko.common import *
+from paramiko import util
+from paramiko.message import Message
+from paramiko.ssh_exception import SSHException
+from paramiko.file import BufferedFile
+from paramiko import pipe
+
+
+class Channel (object):
+ """
+ A secure tunnel across an SSH L{Transport}. A Channel is meant to behave
+ like a socket, and has an API that should be indistinguishable from the
+ python socket API.
+
+ Because SSH2 has a windowing kind of flow control, if you stop reading data
+ from a Channel and its buffer fills up, the server will be unable to send
+ you any more data until you read some of it. (This won't affect other
+ channels on the same transport -- all channels on a single transport are
+ flow-controlled independently.) Similarly, if the server isn't reading
+ data you send, calls to L{send} may block, unless you set a timeout. This
+ is exactly like a normal network socket, so it shouldn't be too surprising.
+ """
+
+ # lower bound on the max packet size we'll accept from the remote host
+ MIN_PACKET_SIZE = 1024
+
+ def __init__(self, chanid):
+ """
+ Create a new channel. The channel is not associated with any
+ particular session or L{Transport} until the Transport attaches it.
+ Normally you would only call this method from the constructor of a
+ subclass of L{Channel}.
+
+ @param chanid: the ID of this channel, as passed by an existing
+ L{Transport}.
+ @type chanid: int
+ """
+ self.chanid = chanid
+ self.remote_chanid = 0
+ self.transport = None
+ self.active = False
+ self.eof_received = 0
+ self.eof_sent = 0
+ self.in_buffer = ''
+ self.in_stderr_buffer = ''
+ self.timeout = None
+ self.closed = False
+ self.ultra_debug = False
+ self.lock = threading.Lock()
+ self.in_buffer_cv = threading.Condition(self.lock)
+ self.in_stderr_buffer_cv = threading.Condition(self.lock)
+ self.out_buffer_cv = threading.Condition(self.lock)
+ self.in_window_size = 0
+ self.out_window_size = 0
+ self.in_max_packet_size = 0
+ self.out_max_packet_size = 0
+ self.in_window_threshold = 0
+ self.in_window_sofar = 0
+ self.status_event = threading.Event()
+ self.name = str(chanid)
+ self.logger = util.get_logger('paramiko.chan.' + str(chanid))
+ self.pipe = None
+ self.event = threading.Event()
+ self.combine_stderr = False
+ self.exit_status = -1
+
+ def __del__(self):
+ self.close()
+
+ def __repr__(self):
+ """
+ Return a string representation of this object, for debugging.
+
+ @rtype: str
+ """
+ out = '<paramiko.Channel %d' % self.chanid
+ if self.closed:
+ out += ' (closed)'
+ elif self.active:
+ if self.eof_received:
+ out += ' (EOF received)'
+ if self.eof_sent:
+ out += ' (EOF sent)'
+ out += ' (open) window=%d' % (self.out_window_size)
+ if len(self.in_buffer) > 0:
+ out += ' in-buffer=%d' % (len(self.in_buffer),)
+ out += ' -> ' + repr(self.transport)
+ out += '>'
+ return out
+
+ def get_pty(self, term='vt100', width=80, height=24):
+ """
+ Request a pseudo-terminal from the server. This is usually used right
+ after creating a client channel, to ask the server to provide some
+ basic terminal semantics for a shell invoked with L{invoke_shell}.
+ It isn't necessary (or desirable) to call this method if you're going
+ to exectue a single command with L{exec_command}.
+
+ @param term: the terminal type to emulate (for example, C{'vt100'}).
+ @type term: str
+ @param width: width (in characters) of the terminal screen
+ @type width: int
+ @param height: height (in characters) of the terminal screen
+ @type height: int
+ @return: C{True} if the operation succeeded; C{False} if not.
+ @rtype: bool
+ """
+ if self.closed or self.eof_received or self.eof_sent or not self.active:
+ raise SSHException('Channel is not open')
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_REQUEST))
+ m.add_int(self.remote_chanid)
+ m.add_string('pty-req')
+ m.add_boolean(True)
+ m.add_string(term)
+ m.add_int(width)
+ m.add_int(height)
+ # pixel height, width (usually useless)
+ m.add_int(0).add_int(0)
+ m.add_string('')
+ self.event.clear()
+ self.transport._send_user_message(m)
+ while True:
+ self.event.wait(0.1)
+ if self.closed:
+ return False
+ if self.event.isSet():
+ return True
+
+ def invoke_shell(self):
+ """
+ Request an interactive shell session on this channel. If the server
+ allows it, the channel will then be directly connected to the stdin,
+ stdout, and stderr of the shell.
+
+ Normally you would call L{get_pty} before this, in which case the
+ shell will operate through the pty, and the channel will be connected
+ to the stdin and stdout of the pty.
+
+ When the shell exits, the channel will be closed and can't be reused.
+ You must open a new channel if you wish to open another shell.
+
+ @return: C{True} if the operation succeeded; C{False} if not.
+ @rtype: bool
+ """
+ if self.closed or self.eof_received or self.eof_sent or not self.active:
+ raise SSHException('Channel is not open')
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_REQUEST))
+ m.add_int(self.remote_chanid)
+ m.add_string('shell')
+ m.add_boolean(1)
+ self.event.clear()
+ self.transport._send_user_message(m)
+ while True:
+ self.event.wait(0.1)
+ if self.closed:
+ return False
+ if self.event.isSet():
+ return True
+
+ def exec_command(self, command):
+ """
+ Execute a command on the server. If the server allows it, the channel
+ will then be directly connected to the stdin, stdout, and stderr of
+ the command being executed.
+
+ When the command finishes executing, the channel will be closed and
+ can't be reused. You must open a new channel if you wish to execute
+ another command.
+
+ @param command: a shell command to execute.
+ @type command: str
+ @return: C{True} if the operation succeeded; C{False} if not.
+ @rtype: bool
+ """
+ if self.closed or self.eof_received or self.eof_sent or not self.active:
+ raise SSHException('Channel is not open')
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_REQUEST))
+ m.add_int(self.remote_chanid)
+ m.add_string('exec')
+ m.add_boolean(1)
+ m.add_string(command)
+ self.event.clear()
+ self.transport._send_user_message(m)
+ while True:
+ self.event.wait(0.1)
+ if self.closed:
+ return False
+ if self.event.isSet():
+ return True
+
+ def invoke_subsystem(self, subsystem):
+ """
+ Request a subsystem on the server (for example, C{sftp}). If the
+ server allows it, the channel will then be directly connected to the
+ requested subsystem.
+
+ When the subsystem finishes, the channel will be closed and can't be
+ reused.
+
+ @param subsystem: name of the subsystem being requested.
+ @type subsystem: str
+ @return: C{True} if the operation succeeded; C{False} if not.
+ @rtype: bool
+ """
+ if self.closed or self.eof_received or self.eof_sent or not self.active:
+ raise SSHException('Channel is not open')
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_REQUEST))
+ m.add_int(self.remote_chanid)
+ m.add_string('subsystem')
+ m.add_boolean(1)
+ m.add_string(subsystem)
+ self.event.clear()
+ self.transport._send_user_message(m)
+ while True:
+ self.event.wait(0.1)
+ if self.closed:
+ return False
+ if self.event.isSet():
+ return True
+
+ def resize_pty(self, width=80, height=24):
+ """
+ Resize the pseudo-terminal. This can be used to change the width and
+ height of the terminal emulation created in a previous L{get_pty} call.
+
+ @param width: new width (in characters) of the terminal screen
+ @type width: int
+ @param height: new height (in characters) of the terminal screen
+ @type height: int
+ @return: C{True} if the operation succeeded; C{False} if not.
+ @rtype: bool
+ """
+ if self.closed or self.eof_received or self.eof_sent or not self.active:
+ raise SSHException('Channel is not open')
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_REQUEST))
+ m.add_int(self.remote_chanid)
+ m.add_string('window-change')
+ m.add_boolean(1)
+ m.add_int(width)
+ m.add_int(height)
+ m.add_int(0).add_int(0)
+ self.event.clear()
+ self.transport._send_user_message(m)
+ while True:
+ self.event.wait(0.1)
+ if self.closed:
+ return False
+ if self.event.isSet():
+ return True
+
+ def recv_exit_status(self):
+ """
+ Return the exit status from the process on the server. This is
+ mostly useful for retrieving the reults of an L{exec_command}.
+ If the command hasn't finished yet, this method will wait until
+ it does, or until the channel is closed. If no exit status is
+ provided by the server, -1 is returned.
+
+ @return: the exit code of the process on the server.
+ @rtype: int
+
+ @since: 1.2
+ """
+ while True:
+ if self.closed or self.status_event.isSet():
+ return self.exit_status
+ self.status_event.wait(0.1)
+
+ def send_exit_status(self, status):
+ """
+ Send the exit status of an executed command to the client. (This
+ really only makes sense in server mode.) Many clients expect to
+ get some sort of status code back from an executed command after
+ it completes.
+
+ @param status: the exit code of the process
+ @type status: int
+
+ @since: 1.2
+ """
+ # in many cases, the channel will not still be open here.
+ # that's fine.
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_REQUEST))
+ m.add_int(self.remote_chanid)
+ m.add_string('exit-status')
+ m.add_boolean(0)
+ m.add_int(status)
+ self.transport._send_user_message(m)
+
+ def get_transport(self):
+ """
+ Return the L{Transport} associated with this channel.
+
+ @return: the L{Transport} that was used to create this channel.
+ @rtype: L{Transport}
+ """
+ return self.transport
+
+ def set_name(self, name):
+ """
+ Set a name for this channel. Currently it's only used to set the name
+ of the log level used for debugging. The name can be fetched with the
+ L{get_name} method.
+
+ @param name: new channel name.
+ @type name: str
+ """
+ self.name = name
+ self.logger = util.get_logger(self.transport.get_log_channel() + '.' + self.name)
+
+ def get_name(self):
+ """
+ Get the name of this channel that was previously set by L{set_name}.
+
+ @return: the name of this channel.
+ @rtype: str
+ """
+ return self.name
+
+ def get_id(self):
+ """
+ Return the ID # for this channel. The channel ID is unique across
+ a L{Transport} and usually a small number. It's also the number
+ passed to L{ServerInterface.check_channel_request} when determining
+ whether to accept a channel request in server mode.
+
+ @return: the ID of this channel.
+ @rtype: int
+
+ @since: ivysaur
+ """
+ return self.chanid
+
+ def set_combine_stderr(self, combine):
+ """
+ Set whether stderr should be combined into stdout on this channel.
+ The default is C{False}, but in some cases it may be convenient to
+ have both streams combined.
+
+ If this is C{False}, and L{exec_command} is called (or C{invoke_shell}
+ with no pty), output to stderr will not show up through the L{recv}
+ and L{recv_ready} calls. You will have to use L{recv_stderr} and
+ L{recv_stderr_ready} to get stderr output.
+
+ If this is C{True}, data will never show up via L{recv_stderr} or
+ L{recv_stderr_ready}.
+
+ @param combine: C{True} if stderr output should be combined into
+ stdout on this channel.
+ @type combine: bool
+ @return: previous setting.
+ @rtype: bool
+
+ @since: 1.1
+ """
+ data = ''
+ self.lock.acquire()
+ try:
+ old = self.combine_stderr
+ self.combine_stderr = combine
+ if combine and not old:
+ # copy old stderr buffer into primary buffer
+ data = self.in_stderr_buffer
+ self.in_stderr_buffer = ''
+ finally:
+ self.lock.release()
+ if len(data) > 0:
+ self._feed(data)
+ return old
+
+
+ ### socket API
+
+
+ def settimeout(self, timeout):
+ """
+ Set a timeout on blocking read/write operations. The C{timeout}
+ argument can be a nonnegative float expressing seconds, or C{None}. If
+ a float is given, subsequent channel read/write operations will raise
+ a timeout exception if the timeout period value has elapsed before the
+ operation has completed. Setting a timeout of C{None} disables
+ timeouts on socket operations.
+
+ C{chan.settimeout(0.0)} is equivalent to C{chan.setblocking(0)};
+ C{chan.settimeout(None)} is equivalent to C{chan.setblocking(1)}.
+
+ @param timeout: seconds to wait for a pending read/write operation
+ before raising C{socket.timeout}, or C{None} for no timeout.
+ @type timeout: float
+ """
+ self.timeout = timeout
+
+ def gettimeout(self):
+ """
+ Returns the timeout in seconds (as a float) associated with socket
+ operations, or C{None} if no timeout is set. This reflects the last
+ call to L{setblocking} or L{settimeout}.
+
+ @return: timeout in seconds, or C{None}.
+ @rtype: float
+ """
+ return self.timeout
+
+ def setblocking(self, blocking):
+ """
+ Set blocking or non-blocking mode of the channel: if C{blocking} is 0,
+ the channel is set to non-blocking mode; otherwise it's set to blocking
+ mode. Initially all channels are in blocking mode.
+
+ In non-blocking mode, if a L{recv} call doesn't find any data, or if a
+ L{send} call can't immediately dispose of the data, an error exception
+ is raised. In blocking mode, the calls block until they can proceed.
+
+ C{chan.setblocking(0)} is equivalent to C{chan.settimeout(0)};
+ C{chan.setblocking(1)} is equivalent to C{chan.settimeout(None)}.
+
+ @param blocking: 0 to set non-blocking mode; non-0 to set blocking
+ mode.
+ @type blocking: int
+ """
+ if blocking:
+ self.settimeout(None)
+ else:
+ self.settimeout(0.0)
+
+ def close(self):
+ """
+ Close the channel. All future read/write operations on the channel
+ will fail. The remote end will receive no more data (after queued data
+ is flushed). Channels are automatically closed when their L{Transport}
+ is closed or when they are garbage collected.
+ """
+ self.lock.acquire()
+ try:
+ if not self.active or self.closed:
+ return
+ msgs = self._close_internal()
+
+ # only close the pipe when the user explicitly closes the channel.
+ # otherwise they will get unpleasant surprises.
+ if self.pipe is not None:
+ self.pipe.close()
+ self.pipe = None
+ finally:
+ self.lock.release()
+ for m in msgs:
+ if m is not None:
+ self.transport._send_user_message(m)
+
+ def recv_ready(self):
+ """
+ Returns true if data is buffered and ready to be read from this
+ channel. A C{False} result does not mean that the channel has closed;
+ it means you may need to wait before more data arrives.
+
+ @return: C{True} if a L{recv} call on this channel would immediately
+ return at least one byte; C{False} otherwise.
+ @rtype: boolean
+ """
+ self.lock.acquire()
+ try:
+ if len(self.in_buffer) == 0:
+ return False
+ return True
+ finally:
+ self.lock.release()
+
+ def recv(self, nbytes):
+ """
+ Receive data from the channel. The return value is a string
+ representing the data received. The maximum amount of data to be
+ received at once is specified by C{nbytes}. If a string of length zero
+ is returned, the channel stream has closed.
+
+ @param nbytes: maximum number of bytes to read.
+ @type nbytes: int
+ @return: data.
+ @rtype: str
+
+ @raise socket.timeout: if no data is ready before the timeout set by
+ L{settimeout}.
+ """
+ out = ''
+ self.lock.acquire()
+ try:
+ if len(self.in_buffer) == 0:
+ if self.closed or self.eof_received:
+ return out
+ # should we block?
+ if self.timeout == 0.0:
+ raise socket.timeout()
+ # loop here in case we get woken up but a different thread has grabbed everything in the buffer
+ timeout = self.timeout
+ while (len(self.in_buffer) == 0) and not self.closed and not self.eof_received:
+ then = time.time()
+ self.in_buffer_cv.wait(timeout)
+ if timeout != None:
+ timeout -= time.time() - then
+ if timeout <= 0.0:
+ raise socket.timeout()
+ # something in the buffer and we have the lock
+ if len(self.in_buffer) <= nbytes:
+ out = self.in_buffer
+ self.in_buffer = ''
+ if self.pipe is not None:
+ # clear the pipe, since no more data is buffered
+ self.pipe.clear()
+ else:
+ out = self.in_buffer[:nbytes]
+ self.in_buffer = self.in_buffer[nbytes:]
+ ack = self._check_add_window(len(out))
+ finally:
+ self.lock.release()
+
+ # no need to hold the channel lock when sending this
+ if ack > 0:
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_WINDOW_ADJUST))
+ m.add_int(self.remote_chanid)
+ m.add_int(ack)
+ self.transport._send_user_message(m)
+
+ return out
+
+ def recv_stderr_ready(self):
+ """
+ Returns true if data is buffered and ready to be read from this
+ channel's stderr stream. Only channels using L{exec_command} or
+ L{invoke_shell} without a pty will ever have data on the stderr
+ stream.
+
+ @return: C{True} if a L{recv_stderr} call on this channel would
+ immediately return at least one byte; C{False} otherwise.
+ @rtype: boolean
+
+ @since: 1.1
+ """
+ self.lock.acquire()
+ try:
+ if len(self.in_stderr_buffer) == 0:
+ return False
+ return True
+ finally:
+ self.lock.release()
+
+ def recv_stderr(self, nbytes):
+ """
+ Receive data from the channel's stderr stream. Only channels using
+ L{exec_command} or L{invoke_shell} without a pty will ever have data
+ on the stderr stream. The return value is a string representing the
+ data received. The maximum amount of data to be received at once is
+ specified by C{nbytes}. If a string of length zero is returned, the
+ channel stream has closed.
+
+ @param nbytes: maximum number of bytes to read.
+ @type nbytes: int
+ @return: data.
+ @rtype: str
+
+ @raise socket.timeout: if no data is ready before the timeout set by
+ L{settimeout}.
+
+ @since: 1.1
+ """
+ out = ''
+ self.lock.acquire()
+ try:
+ if len(self.in_stderr_buffer) == 0:
+ if self.closed or self.eof_received:
+ return out
+ # should we block?
+ if self.timeout == 0.0:
+ raise socket.timeout()
+ # loop here in case we get woken up but a different thread has grabbed everything in the buffer
+ timeout = self.timeout
+ while (len(self.in_stderr_buffer) == 0) and not self.closed and not self.eof_received:
+ then = time.time()
+ self.in_stderr_buffer_cv.wait(timeout)
+ if timeout != None:
+ timeout -= time.time() - then
+ if timeout <= 0.0:
+ raise socket.timeout()
+ # something in the buffer and we have the lock
+ if len(self.in_stderr_buffer) <= nbytes:
+ out = self.in_stderr_buffer
+ self.in_stderr_buffer = ''
+ else:
+ out = self.in_stderr_buffer[:nbytes]
+ self.in_stderr_buffer = self.in_stderr_buffer[nbytes:]
+ self._check_add_window(len(out))
+ finally:
+ self.lock.release()
+ return out
+
+ def send(self, s):
+ """
+ Send data to the channel. Returns the number of bytes sent, or 0 if
+ the channel stream is closed. Applications are responsible for
+ checking that all data has been sent: if only some of the data was
+ transmitted, the application needs to attempt delivery of the remaining
+ data.
+
+ @param s: data to send.
+ @type s: str
+ @return: number of bytes actually sent.
+ @rtype: int
+
+ @raise socket.timeout: if no data could be sent before the timeout set
+ by L{settimeout}.
+ """
+ size = len(s)
+ self.lock.acquire()
+ try:
+ size = self._wait_for_send_window(size)
+ if size == 0:
+ # eof or similar
+ return 0
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_DATA))
+ m.add_int(self.remote_chanid)
+ m.add_string(s[:size])
+ self.transport._send_user_message(m)
+ finally:
+ self.lock.release()
+ return size
+
+ def send_stderr(self, s):
+ """
+ Send data to the channel on the "stderr" stream. This is normally
+ only used by servers to send output from shell commands -- clients
+ won't use this. Returns the number of bytes sent, or 0 if the channel
+ stream is closed. Applications are responsible for checking that all
+ data has been sent: if only some of the data was transmitted, the
+ application needs to attempt delivery of the remaining data.
+
+ @param s: data to send.
+ @type s: str
+ @return: number of bytes actually sent.
+ @rtype: int
+
+ @raise socket.timeout: if no data could be sent before the timeout set
+ by L{settimeout}.
+
+ @since: 1.1
+ """
+ size = len(s)
+ self.lock.acquire()
+ try:
+ size = self._wait_for_send_window(size)
+ if size == 0:
+ # eof or similar
+ return 0
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_EXTENDED_DATA))
+ m.add_int(self.remote_chanid)
+ m.add_int(1)
+ m.add_string(s[:size])
+ self.transport._send_user_message(m)
+ finally:
+ self.lock.release()
+ return size
+
+ def sendall(self, s):
+ """
+ Send data to the channel, without allowing partial results. Unlike
+ L{send}, this method continues to send data from the given string until
+ either all data has been sent or an error occurs. Nothing is returned.
+
+ @param s: data to send.
+ @type s: str
+
+ @raise socket.timeout: if sending stalled for longer than the timeout
+ set by L{settimeout}.
+ @raise socket.error: if an error occured before the entire string was
+ sent.
+
+ @note: If the channel is closed while only part of the data hase been
+ sent, there is no way to determine how much data (if any) was sent.
+ This is irritating, but identically follows python's API.
+ """
+ while s:
+ if self.closed:
+ # this doesn't seem useful, but it is the documented behavior of Socket
+ raise socket.error('Socket is closed')
+ sent = self.send(s)
+ s = s[sent:]
+ return None
+
+ def sendall_stderr(self, s):
+ """
+ Send data to the channel's "stderr" stream, without allowing partial
+ results. Unlike L{send_stderr}, this method continues to send data
+ from the given string until all data has been sent or an error occurs.
+ Nothing is returned.
+
+ @param s: data to send to the client as "stderr" output.
+ @type s: str
+
+ @raise socket.timeout: if sending stalled for longer than the timeout
+ set by L{settimeout}.
+ @raise socket.error: if an error occured before the entire string was
+ sent.
+
+ @since: 1.1
+ """
+ while s:
+ if self.closed:
+ raise socket.error('Socket is closed')
+ sent = self.send_stderr(s)
+ s = s[sent:]
+ return None
+
+ def makefile(self, *params):
+ """
+ Return a file-like object associated with this channel. The optional
+ C{mode} and C{bufsize} arguments are interpreted the same way as by
+ the built-in C{file()} function in python.
+
+ @return: object which can be used for python file I/O.
+ @rtype: L{ChannelFile}
+ """
+ return ChannelFile(*([self] + list(params)))
+
+ def makefile_stderr(self, *params):
+ """
+ Return a file-like object associated with this channel's stderr
+ stream. Only channels using L{exec_command} or L{invoke_shell}
+ without a pty will ever have data on the stderr stream.
+
+ The optional C{mode} and C{bufsize} arguments are interpreted the
+ same way as by the built-in C{file()} function in python. For a
+ client, it only makes sense to open this file for reading. For a
+ server, it only makes sense to open this file for writing.
+
+ @return: object which can be used for python file I/O.
+ @rtype: L{ChannelFile}
+
+ @since: 1.1
+ """
+ return ChannelStderrFile(*([self] + list(params)))
+
+ def fileno(self):
+ """
+ Returns an OS-level file descriptor which can be used for polling, but
+ but I{not} for reading or writing). This is primaily to allow python's
+ C{select} module to work.
+
+ The first time C{fileno} is called on a channel, a pipe is created to
+ simulate real OS-level file descriptor (FD) behavior. Because of this,
+ two OS-level FDs are created, which will use up FDs faster than normal.
+ You won't notice this effect unless you open hundreds or thousands of
+ channels simultaneously, but it's still notable.
+
+ @return: an OS-level file descriptor
+ @rtype: int
+
+ @warning: This method causes channel reads to be slightly less
+ efficient.
+ """
+ self.lock.acquire()
+ try:
+ if self.pipe is not None:
+ return self.pipe.fileno()
+ # create the pipe and feed in any existing data
+ self.pipe = pipe.make_pipe()
+ if len(self.in_buffer) > 0:
+ self.pipe.set()
+ return self.pipe.fileno()
+ finally:
+ self.lock.release()
+
+ def shutdown(self, how):
+ """
+ Shut down one or both halves of the connection. If C{how} is 0,
+ further receives are disallowed. If C{how} is 1, further sends
+ are disallowed. If C{how} is 2, further sends and receives are
+ disallowed. This closes the stream in one or both directions.
+
+ @param how: 0 (stop receiving), 1 (stop sending), or 2 (stop
+ receiving and sending).
+ @type how: int
+ """
+ if (how == 0) or (how == 2):
+ # feign "read" shutdown
+ self.eof_received = 1
+ if (how == 1) or (how == 2):
+ self.lock.acquire()
+ try:
+ m = self._send_eof()
+ finally:
+ self.lock.release()
+ if m is not None:
+ self.transport._send_user_message(m)
+
+ def shutdown_read(self):
+ """
+ Shutdown the receiving side of this socket, closing the stream in
+ the incoming direction. After this call, future reads on this
+ channel will fail instantly. This is a convenience method, equivalent
+ to C{shutdown(0)}, for people who don't make it a habit to
+ memorize unix constants from the 1970s.
+
+ @since: 1.2
+ """
+ self.shutdown(0)
+
+ def shutdown_write(self):
+ """
+ Shutdown the sending side of this socket, closing the stream in
+ the outgoing direction. After this call, future writes on this
+ channel will fail instantly. This is a convenience method, equivalent
+ to C{shutdown(1)}, for people who don't make it a habit to
+ memorize unix constants from the 1970s.
+
+ @since: 1.2
+ """
+ self.shutdown(1)
+
+
+ ### calls from Transport
+
+
+ def _set_transport(self, transport):
+ self.transport = transport
+ self.logger = util.get_logger(self.transport.get_log_channel() + '.' + self.name)
+
+ def _set_window(self, window_size, max_packet_size):
+ self.in_window_size = window_size
+ self.in_max_packet_size = max_packet_size
+ # threshold of bytes we receive before we bother to send a window update
+ self.in_window_threshold = window_size // 10
+ self.in_window_sofar = 0
+ self._log(DEBUG, 'Max packet in: %d bytes' % max_packet_size)
+
+ def _set_remote_channel(self, chanid, window_size, max_packet_size):
+ self.remote_chanid = chanid
+ self.out_window_size = window_size
+ self.out_max_packet_size = max(max_packet_size, self.MIN_PACKET_SIZE)
+ self.active = 1
+ self._log(DEBUG, 'Max packet out: %d bytes' % max_packet_size)
+
+ def _request_success(self, m):
+ self._log(DEBUG, 'Sesch channel %d request ok' % self.chanid)
+ self.event.set()
+ return
+
+ def _request_failed(self, m):
+ self.lock.acquire()
+ try:
+ msgs = self._close_internal()
+ finally:
+ self.lock.release()
+ for m in msgs:
+ if m is not None:
+ self.transport._send_user_message(m)
+
+ def _feed(self, m):
+ if type(m) is str:
+ # passed from _feed_extended
+ s = m
+ else:
+ s = m.get_string()
+ self.lock.acquire()
+ try:
+ if self.ultra_debug:
+ self._log(DEBUG, 'fed %d bytes' % len(s))
+ if self.pipe is not None:
+ self.pipe.set()
+ self.in_buffer += s
+ self.in_buffer_cv.notifyAll()
+ finally:
+ self.lock.release()
+
+ def _feed_extended(self, m):
+ code = m.get_int()
+ s = m.get_string()
+ if code != 1:
+ self._log(ERROR, 'unknown extended_data type %d; discarding' % code)
+ return
+ if self.combine_stderr:
+ return self._feed(s)
+ self.lock.acquire()
+ try:
+ if self.ultra_debug:
+ self._log(DEBUG, 'fed %d stderr bytes' % len(s))
+ self.in_stderr_buffer += s
+ self.in_stderr_buffer_cv.notifyAll()
+ finally:
+ self.lock.release()
+
+ def _window_adjust(self, m):
+ nbytes = m.get_int()
+ self.lock.acquire()
+ try:
+ if self.ultra_debug:
+ self._log(DEBUG, 'window up %d' % nbytes)
+ self.out_window_size += nbytes
+ self.out_buffer_cv.notifyAll()
+ finally:
+ self.lock.release()
+
+ def _handle_request(self, m):
+ key = m.get_string()
+ want_reply = m.get_boolean()
+ server = self.transport.server_object
+ ok = False
+ if key == 'exit-status':
+ self.exit_status = m.get_int()
+ self.status_event.set()
+ ok = True
+ elif key == 'xon-xoff':
+ # ignore
+ ok = True
+ elif key == 'pty-req':
+ term = m.get_string()
+ width = m.get_int()
+ height = m.get_int()
+ pixelwidth = m.get_int()
+ pixelheight = m.get_int()
+ modes = m.get_string()
+ if server is None:
+ ok = False
+ else:
+ ok = server.check_channel_pty_request(self, term, width, height, pixelwidth,
+ pixelheight, modes)
+ elif key == 'shell':
+ if server is None:
+ ok = False
+ else:
+ ok = server.check_channel_shell_request(self)
+ elif key == 'exec':
+ cmd = m.get_string()
+ if server is None:
+ ok = False
+ else:
+ ok = server.check_channel_exec_request(self, cmd)
+ elif key == 'subsystem':
+ name = m.get_string()
+ if server is None:
+ ok = False
+ else:
+ ok = server.check_channel_subsystem_request(self, name)
+ elif key == 'window-change':
+ width = m.get_int()
+ height = m.get_int()
+ pixelwidth = m.get_int()
+ pixelheight = m.get_int()
+ if server is None:
+ ok = False
+ else:
+ ok = server.check_channel_window_change_request(self, width, height, pixelwidth,
+ pixelheight)
+ else:
+ self._log(DEBUG, 'Unhandled channel request "%s"' % key)
+ ok = False
+ if want_reply:
+ m = Message()
+ if ok:
+ m.add_byte(chr(MSG_CHANNEL_SUCCESS))
+ else:
+ m.add_byte(chr(MSG_CHANNEL_FAILURE))
+ m.add_int(self.remote_chanid)
+ self.transport._send_user_message(m)
+
+ def _handle_eof(self, m):
+ self.lock.acquire()
+ try:
+ if not self.eof_received:
+ self.eof_received = True
+ self.in_buffer_cv.notifyAll()
+ self.in_stderr_buffer_cv.notifyAll()
+ if self.pipe is not None:
+ self.pipe.set_forever()
+ finally:
+ self.lock.release()
+ self._log(DEBUG, 'EOF received')
+
+ def _handle_close(self, m):
+ self.lock.acquire()
+ try:
+ msgs = self._close_internal()
+ self.transport._unlink_channel(self.chanid)
+ finally:
+ self.lock.release()
+ for m in msgs:
+ if m is not None:
+ self.transport._send_user_message(m)
+
+
+ ### internals...
+
+
+ def _log(self, level, msg):
+ self.logger.log(level, msg)
+
+ def _set_closed(self):
+ # you are holding the lock.
+ self.closed = True
+ self.in_buffer_cv.notifyAll()
+ self.in_stderr_buffer_cv.notifyAll()
+ self.out_buffer_cv.notifyAll()
+ if self.pipe is not None:
+ self.pipe.set_forever()
+
+ def _send_eof(self):
+ # you are holding the lock.
+ if self.eof_sent:
+ return None
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_EOF))
+ m.add_int(self.remote_chanid)
+ self.eof_sent = True
+ self._log(DEBUG, 'EOF sent')
+ return m
+
+ def _close_internal(self):
+ # you are holding the lock.
+ if not self.active or self.closed:
+ return None, None
+ m1 = self._send_eof()
+ m2 = Message()
+ m2.add_byte(chr(MSG_CHANNEL_CLOSE))
+ m2.add_int(self.remote_chanid)
+ self._set_closed()
+ # can't unlink from the Transport yet -- the remote side may still
+ # try to send meta-data (exit-status, etc)
+ return m1, m2
+
+ def _unlink(self):
+ # server connection could die before we become active: still signal the close!
+ if self.closed:
+ return
+ self.lock.acquire()
+ try:
+ self._set_closed()
+ self.transport._unlink_channel(self.chanid)
+ finally:
+ self.lock.release()
+
+ def _check_add_window(self, n):
+ # already holding the lock!
+ if self.closed or self.eof_received or not self.active:
+ return 0
+ if self.ultra_debug:
+ self._log(DEBUG, 'addwindow %d' % n)
+ self.in_window_sofar += n
+ if self.in_window_sofar <= self.in_window_threshold:
+ return 0
+ if self.ultra_debug:
+ self._log(DEBUG, 'addwindow send %d' % self.in_window_sofar)
+ out = self.in_window_sofar
+ self.in_window_sofar = 0
+ return out
+
+ def _wait_for_send_window(self, size):
+ """
+ (You are already holding the lock.)
+ Wait for the send window to open up, and allocate up to C{size} bytes
+ for transmission. If no space opens up before the timeout, a timeout
+ exception is raised. Returns the number of bytes available to send
+ (may be less than requested).
+ """
+ # you are already holding the lock
+ if self.closed or self.eof_sent:
+ return 0
+ if self.out_window_size == 0:
+ # should we block?
+ if self.timeout == 0.0:
+ raise socket.timeout()
+ # loop here in case we get woken up but a different thread has filled the buffer
+ timeout = self.timeout
+ while self.out_window_size == 0:
+ if self.closed or self.eof_sent:
+ return 0
+ then = time.time()
+ self.out_buffer_cv.wait(timeout)
+ if timeout != None:
+ timeout -= time.time() - then
+ if timeout <= 0.0:
+ raise socket.timeout()
+ # we have some window to squeeze into
+ if self.closed or self.eof_sent:
+ return 0
+ if self.out_window_size < size:
+ size = self.out_window_size
+ if self.out_max_packet_size - 64 < size:
+ size = self.out_max_packet_size - 64
+ self.out_window_size -= size
+ if self.ultra_debug:
+ self._log(DEBUG, 'window down to %d' % self.out_window_size)
+ return size
+
+
+class ChannelFile (BufferedFile):
+ """
+ A file-like wrapper around L{Channel}. A ChannelFile is created by calling
+ L{Channel.makefile}.
+
+ @bug: To correctly emulate the file object created from a socket's
+ C{makefile} method, a L{Channel} and its C{ChannelFile} should be able
+ to be closed or garbage-collected independently. Currently, closing
+ the C{ChannelFile} does nothing but flush the buffer.
+ """
+
+ def __init__(self, channel, mode = 'r', bufsize = -1):
+ self.channel = channel
+ BufferedFile.__init__(self)
+ self._set_mode(mode, bufsize)
+
+ def __repr__(self):
+ """
+ Returns a string representation of this object, for debugging.
+
+ @rtype: str
+ """
+ return '<paramiko.ChannelFile from ' + repr(self.channel) + '>'
+
+ def _read(self, size):
+ return self.channel.recv(size)
+
+ def _write(self, data):
+ self.channel.sendall(data)
+ return len(data)
+
+ seek = BufferedFile.seek
+
+
+class ChannelStderrFile (ChannelFile):
+ def __init__(self, channel, mode = 'r', bufsize = -1):
+ ChannelFile.__init__(self, channel, mode, bufsize)
+
+ def _read(self, size):
+ return self.channel.recv_stderr(size)
+
+ def _write(self, data):
+ self.channel.sendall_stderr(data)
+ return len(data)
+
+
+# vim: set shiftwidth=4 expandtab :