summaryrefslogtreecommitdiff
path: root/paramiko/channel.py
diff options
context:
space:
mode:
authorJeremy T. Bouse <jbouse@debian.org>2009-11-27 16:20:12 -0500
committerJeremy T. Bouse <jbouse@debian.org>2009-11-27 16:20:12 -0500
commited280d5ac360e2af796e9bd973d7b4df89f0c449 (patch)
treece892d6ce9dad8c0ecbc9cbe73f8095195bef0b4 /paramiko/channel.py
parent176c6caf4ea7918e1698438634b237fab8456471 (diff)
downloadpython-paramiko-ed280d5ac360e2af796e9bd973d7b4df89f0c449.tar
python-paramiko-ed280d5ac360e2af796e9bd973d7b4df89f0c449.tar.gz
Imported Upstream version 1.7.4upstream/1.7.4
Diffstat (limited to 'paramiko/channel.py')
-rw-r--r--paramiko/channel.py471
1 files changed, 263 insertions, 208 deletions
diff --git a/paramiko/channel.py b/paramiko/channel.py
index 8a00233..910a03c 100644
--- a/paramiko/channel.py
+++ b/paramiko/channel.py
@@ -1,4 +1,4 @@
-# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
+# Copyright (C) 2003-2007 Robey Pointer <robey@lag.net>
#
# This file is part of paramiko.
#
@@ -20,6 +20,7 @@
Abstraction for an SSH2 channel.
"""
+import binascii
import sys
import time
import threading
@@ -31,9 +32,14 @@ from paramiko import util
from paramiko.message import Message
from paramiko.ssh_exception import SSHException
from paramiko.file import BufferedFile
+from paramiko.buffered_pipe import BufferedPipe, PipeTimeout
from paramiko import pipe
+# lower bound on the max packet size we'll accept from the remote host
+MIN_PACKET_SIZE = 1024
+
+
class Channel (object):
"""
A secure tunnel across an SSH L{Transport}. A Channel is meant to behave
@@ -49,9 +55,6 @@ class Channel (object):
is exactly like a normal network socket, so it shouldn't be too surprising.
"""
- # lower bound on the max packet size we'll accept from the remote host
- MIN_PACKET_SIZE = 1024
-
def __init__(self, chanid):
"""
Create a new channel. The channel is not associated with any
@@ -69,14 +72,12 @@ class Channel (object):
self.active = False
self.eof_received = 0
self.eof_sent = 0
- self.in_buffer = ''
- self.in_stderr_buffer = ''
+ self.in_buffer = BufferedPipe()
+ self.in_stderr_buffer = BufferedPipe()
self.timeout = None
self.closed = False
self.ultra_debug = False
self.lock = threading.Lock()
- self.in_buffer_cv = threading.Condition(self.lock)
- self.in_stderr_buffer_cv = threading.Condition(self.lock)
self.out_buffer_cv = threading.Condition(self.lock)
self.in_window_size = 0
self.out_window_size = 0
@@ -85,15 +86,19 @@ class Channel (object):
self.in_window_threshold = 0
self.in_window_sofar = 0
self.status_event = threading.Event()
- self.name = str(chanid)
- self.logger = util.get_logger('paramiko.chan.' + str(chanid))
- self.pipe = None
+ self._name = str(chanid)
+ self.logger = util.get_logger('paramiko.transport')
+ self._pipe = None
self.event = threading.Event()
self.combine_stderr = False
self.exit_status = -1
+ self.origin_addr = None
def __del__(self):
- self.close()
+ try:
+ self.close()
+ except:
+ pass
def __repr__(self):
"""
@@ -124,14 +129,15 @@ class Channel (object):
It isn't necessary (or desirable) to call this method if you're going
to exectue a single command with L{exec_command}.
- @param term: the terminal type to emulate (for example, C{'vt100'}).
+ @param term: the terminal type to emulate (for example, C{'vt100'})
@type term: str
@param width: width (in characters) of the terminal screen
@type width: int
@param height: height (in characters) of the terminal screen
@type height: int
- @return: C{True} if the operation succeeded; C{False} if not.
- @rtype: bool
+
+ @raise SSHException: if the request was rejected or the channel was
+ closed
"""
if self.closed or self.eof_received or self.eof_sent or not self.active:
raise SSHException('Channel is not open')
@@ -148,12 +154,7 @@ class Channel (object):
m.add_string('')
self.event.clear()
self.transport._send_user_message(m)
- while True:
- self.event.wait(0.1)
- if self.closed:
- return False
- if self.event.isSet():
- return True
+ self._wait_for_event()
def invoke_shell(self):
"""
@@ -168,8 +169,8 @@ class Channel (object):
When the shell exits, the channel will be closed and can't be reused.
You must open a new channel if you wish to open another shell.
- @return: C{True} if the operation succeeded; C{False} if not.
- @rtype: bool
+ @raise SSHException: if the request was rejected or the channel was
+ closed
"""
if self.closed or self.eof_received or self.eof_sent or not self.active:
raise SSHException('Channel is not open')
@@ -180,12 +181,7 @@ class Channel (object):
m.add_boolean(1)
self.event.clear()
self.transport._send_user_message(m)
- while True:
- self.event.wait(0.1)
- if self.closed:
- return False
- if self.event.isSet():
- return True
+ self._wait_for_event()
def exec_command(self, command):
"""
@@ -199,8 +195,9 @@ class Channel (object):
@param command: a shell command to execute.
@type command: str
- @return: C{True} if the operation succeeded; C{False} if not.
- @rtype: bool
+
+ @raise SSHException: if the request was rejected or the channel was
+ closed
"""
if self.closed or self.eof_received or self.eof_sent or not self.active:
raise SSHException('Channel is not open')
@@ -208,16 +205,11 @@ class Channel (object):
m.add_byte(chr(MSG_CHANNEL_REQUEST))
m.add_int(self.remote_chanid)
m.add_string('exec')
- m.add_boolean(1)
+ m.add_boolean(True)
m.add_string(command)
self.event.clear()
self.transport._send_user_message(m)
- while True:
- self.event.wait(0.1)
- if self.closed:
- return False
- if self.event.isSet():
- return True
+ self._wait_for_event()
def invoke_subsystem(self, subsystem):
"""
@@ -230,8 +222,9 @@ class Channel (object):
@param subsystem: name of the subsystem being requested.
@type subsystem: str
- @return: C{True} if the operation succeeded; C{False} if not.
- @rtype: bool
+
+ @raise SSHException: if the request was rejected or the channel was
+ closed
"""
if self.closed or self.eof_received or self.eof_sent or not self.active:
raise SSHException('Channel is not open')
@@ -239,16 +232,11 @@ class Channel (object):
m.add_byte(chr(MSG_CHANNEL_REQUEST))
m.add_int(self.remote_chanid)
m.add_string('subsystem')
- m.add_boolean(1)
+ m.add_boolean(True)
m.add_string(subsystem)
self.event.clear()
self.transport._send_user_message(m)
- while True:
- self.event.wait(0.1)
- if self.closed:
- return False
- if self.event.isSet():
- return True
+ self._wait_for_event()
def resize_pty(self, width=80, height=24):
"""
@@ -259,8 +247,9 @@ class Channel (object):
@type width: int
@param height: new height (in characters) of the terminal screen
@type height: int
- @return: C{True} if the operation succeeded; C{False} if not.
- @rtype: bool
+
+ @raise SSHException: if the request was rejected or the channel was
+ closed
"""
if self.closed or self.eof_received or self.eof_sent or not self.active:
raise SSHException('Channel is not open')
@@ -268,19 +257,27 @@ class Channel (object):
m.add_byte(chr(MSG_CHANNEL_REQUEST))
m.add_int(self.remote_chanid)
m.add_string('window-change')
- m.add_boolean(1)
+ m.add_boolean(True)
m.add_int(width)
m.add_int(height)
m.add_int(0).add_int(0)
self.event.clear()
self.transport._send_user_message(m)
- while True:
- self.event.wait(0.1)
- if self.closed:
- return False
- if self.event.isSet():
- return True
+ self._wait_for_event()
+ def exit_status_ready(self):
+ """
+ Return true if the remote process has exited and returned an exit
+ status. You may use this to poll the process status if you don't
+ want to block in L{recv_exit_status}. Note that the server may not
+ return an exit status in some cases (like bad servers).
+
+ @return: True if L{recv_exit_status} will return immediately
+ @rtype: bool
+ @since: 1.7.3
+ """
+ return self.closed or self.status_event.isSet()
+
def recv_exit_status(self):
"""
Return the exit status from the process on the server. This is
@@ -296,8 +293,9 @@ class Channel (object):
"""
while True:
if self.closed or self.status_event.isSet():
- return self.exit_status
+ break
self.status_event.wait(0.1)
+ return self.exit_status
def send_exit_status(self, status):
"""
@@ -317,10 +315,73 @@ class Channel (object):
m.add_byte(chr(MSG_CHANNEL_REQUEST))
m.add_int(self.remote_chanid)
m.add_string('exit-status')
- m.add_boolean(0)
+ m.add_boolean(False)
m.add_int(status)
self.transport._send_user_message(m)
+
+ def request_x11(self, screen_number=0, auth_protocol=None, auth_cookie=None,
+ single_connection=False, handler=None):
+ """
+ Request an x11 session on this channel. If the server allows it,
+ further x11 requests can be made from the server to the client,
+ when an x11 application is run in a shell session.
+
+ From RFC4254::
+
+ It is RECOMMENDED that the 'x11 authentication cookie' that is
+ sent be a fake, random cookie, and that the cookie be checked and
+ replaced by the real cookie when a connection request is received.
+ If you omit the auth_cookie, a new secure random 128-bit value will be
+ generated, used, and returned. You will need to use this value to
+ verify incoming x11 requests and replace them with the actual local
+ x11 cookie (which requires some knoweldge of the x11 protocol).
+
+ If a handler is passed in, the handler is called from another thread
+ whenever a new x11 connection arrives. The default handler queues up
+ incoming x11 connections, which may be retrieved using
+ L{Transport.accept}. The handler's calling signature is::
+
+ handler(channel: Channel, (address: str, port: int))
+
+ @param screen_number: the x11 screen number (0, 10, etc)
+ @type screen_number: int
+ @param auth_protocol: the name of the X11 authentication method used;
+ if none is given, C{"MIT-MAGIC-COOKIE-1"} is used
+ @type auth_protocol: str
+ @param auth_cookie: hexadecimal string containing the x11 auth cookie;
+ if none is given, a secure random 128-bit value is generated
+ @type auth_cookie: str
+ @param single_connection: if True, only a single x11 connection will be
+ forwarded (by default, any number of x11 connections can arrive
+ over this session)
+ @type single_connection: bool
+ @param handler: an optional handler to use for incoming X11 connections
+ @type handler: function
+ @return: the auth_cookie used
+ """
+ if self.closed or self.eof_received or self.eof_sent or not self.active:
+ raise SSHException('Channel is not open')
+ if auth_protocol is None:
+ auth_protocol = 'MIT-MAGIC-COOKIE-1'
+ if auth_cookie is None:
+ auth_cookie = binascii.hexlify(self.transport.randpool.get_bytes(16))
+
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_REQUEST))
+ m.add_int(self.remote_chanid)
+ m.add_string('x11-req')
+ m.add_boolean(True)
+ m.add_boolean(single_connection)
+ m.add_string(auth_protocol)
+ m.add_string(auth_cookie)
+ m.add_int(screen_number)
+ self.event.clear()
+ self.transport._send_user_message(m)
+ self._wait_for_event()
+ self.transport._set_x11_handler(handler)
+ return auth_cookie
+
def get_transport(self):
"""
Return the L{Transport} associated with this channel.
@@ -333,14 +394,13 @@ class Channel (object):
def set_name(self, name):
"""
Set a name for this channel. Currently it's only used to set the name
- of the log level used for debugging. The name can be fetched with the
+ of the channel in logfile entries. The name can be fetched with the
L{get_name} method.
- @param name: new channel name.
+ @param name: new channel name
@type name: str
"""
- self.name = name
- self.logger = util.get_logger(self.transport.get_log_channel() + '.' + self.name)
+ self._name = name
def get_name(self):
"""
@@ -349,7 +409,7 @@ class Channel (object):
@return: the name of this channel.
@rtype: str
"""
- return self.name
+ return self._name
def get_id(self):
"""
@@ -360,8 +420,6 @@ class Channel (object):
@return: the ID of this channel.
@rtype: int
-
- @since: ivysaur
"""
return self.chanid
@@ -394,8 +452,7 @@ class Channel (object):
self.combine_stderr = combine
if combine and not old:
# copy old stderr buffer into primary buffer
- data = self.in_stderr_buffer
- self.in_stderr_buffer = ''
+ data = self.in_stderr_buffer.empty()
finally:
self.lock.release()
if len(data) > 0:
@@ -419,7 +476,7 @@ class Channel (object):
C{chan.settimeout(None)} is equivalent to C{chan.setblocking(1)}.
@param timeout: seconds to wait for a pending read/write operation
- before raising C{socket.timeout}, or C{None} for no timeout.
+ before raising C{socket.timeout}, or C{None} for no timeout.
@type timeout: float
"""
self.timeout = timeout
@@ -439,17 +496,19 @@ class Channel (object):
"""
Set blocking or non-blocking mode of the channel: if C{blocking} is 0,
the channel is set to non-blocking mode; otherwise it's set to blocking
- mode. Initially all channels are in blocking mode.
+ mode. Initially all channels are in blocking mode.
In non-blocking mode, if a L{recv} call doesn't find any data, or if a
L{send} call can't immediately dispose of the data, an error exception
- is raised. In blocking mode, the calls block until they can proceed.
+ is raised. In blocking mode, the calls block until they can proceed. An
+ EOF condition is considered "immediate data" for L{recv}, so if the
+ channel is closed in the read direction, it will never block.
C{chan.setblocking(0)} is equivalent to C{chan.settimeout(0)};
C{chan.setblocking(1)} is equivalent to C{chan.settimeout(None)}.
@param blocking: 0 to set non-blocking mode; non-0 to set blocking
- mode.
+ mode.
@type blocking: int
"""
if blocking:
@@ -457,6 +516,18 @@ class Channel (object):
else:
self.settimeout(0.0)
+ def getpeername(self):
+ """
+ Return the address of the remote side of this Channel, if possible.
+ This is just a wrapper around C{'getpeername'} on the Transport, used
+ to provide enough of a socket-like interface to allow asyncore to work.
+ (asyncore likes to call C{'getpeername'}.)
+
+ @return: the address if the remote host, if known
+ @rtype: tuple(str, int)
+ """
+ return self.transport.getpeername()
+
def close(self):
"""
Close the channel. All future read/write operations on the channel
@@ -466,15 +537,17 @@ class Channel (object):
"""
self.lock.acquire()
try:
+ # only close the pipe when the user explicitly closes the channel.
+ # otherwise they will get unpleasant surprises. (and do it before
+ # checking self.closed, since the remote host may have already
+ # closed the connection.)
+ if self._pipe is not None:
+ self._pipe.close()
+ self._pipe = None
+
if not self.active or self.closed:
return
msgs = self._close_internal()
-
- # only close the pipe when the user explicitly closes the channel.
- # otherwise they will get unpleasant surprises.
- if self.pipe is not None:
- self.pipe.close()
- self.pipe = None
finally:
self.lock.release()
for m in msgs:
@@ -491,13 +564,7 @@ class Channel (object):
return at least one byte; C{False} otherwise.
@rtype: boolean
"""
- self.lock.acquire()
- try:
- if len(self.in_buffer) == 0:
- return False
- return True
- finally:
- self.lock.release()
+ return self.in_buffer.read_ready()
def recv(self, nbytes):
"""
@@ -514,38 +581,12 @@ class Channel (object):
@raise socket.timeout: if no data is ready before the timeout set by
L{settimeout}.
"""
- out = ''
- self.lock.acquire()
try:
- if len(self.in_buffer) == 0:
- if self.closed or self.eof_received:
- return out
- # should we block?
- if self.timeout == 0.0:
- raise socket.timeout()
- # loop here in case we get woken up but a different thread has grabbed everything in the buffer
- timeout = self.timeout
- while (len(self.in_buffer) == 0) and not self.closed and not self.eof_received:
- then = time.time()
- self.in_buffer_cv.wait(timeout)
- if timeout != None:
- timeout -= time.time() - then
- if timeout <= 0.0:
- raise socket.timeout()
- # something in the buffer and we have the lock
- if len(self.in_buffer) <= nbytes:
- out = self.in_buffer
- self.in_buffer = ''
- if self.pipe is not None:
- # clear the pipe, since no more data is buffered
- self.pipe.clear()
- else:
- out = self.in_buffer[:nbytes]
- self.in_buffer = self.in_buffer[nbytes:]
- ack = self._check_add_window(len(out))
- finally:
- self.lock.release()
+ out = self.in_buffer.read(nbytes, self.timeout)
+ except PipeTimeout, e:
+ raise socket.timeout()
+ ack = self._check_add_window(len(out))
# no need to hold the channel lock when sending this
if ack > 0:
m = Message()
@@ -569,13 +610,7 @@ class Channel (object):
@since: 1.1
"""
- self.lock.acquire()
- try:
- if len(self.in_stderr_buffer) == 0:
- return False
- return True
- finally:
- self.lock.release()
+ return self.in_stderr_buffer.read_ready()
def recv_stderr(self, nbytes):
"""
@@ -596,36 +631,43 @@ class Channel (object):
@since: 1.1
"""
- out = ''
+ try:
+ out = self.in_stderr_buffer.read(nbytes, self.timeout)
+ except PipeTimeout, e:
+ raise socket.timeout()
+
+ ack = self._check_add_window(len(out))
+ # no need to hold the channel lock when sending this
+ if ack > 0:
+ m = Message()
+ m.add_byte(chr(MSG_CHANNEL_WINDOW_ADJUST))
+ m.add_int(self.remote_chanid)
+ m.add_int(ack)
+ self.transport._send_user_message(m)
+
+ return out
+
+ def send_ready(self):
+ """
+ Returns true if data can be written to this channel without blocking.
+ This means the channel is either closed (so any write attempt would
+ return immediately) or there is at least one byte of space in the
+ outbound buffer. If there is at least one byte of space in the
+ outbound buffer, a L{send} call will succeed immediately and return
+ the number of bytes actually written.
+
+ @return: C{True} if a L{send} call on this channel would immediately
+ succeed or fail
+ @rtype: boolean
+ """
self.lock.acquire()
try:
- if len(self.in_stderr_buffer) == 0:
- if self.closed or self.eof_received:
- return out
- # should we block?
- if self.timeout == 0.0:
- raise socket.timeout()
- # loop here in case we get woken up but a different thread has grabbed everything in the buffer
- timeout = self.timeout
- while (len(self.in_stderr_buffer) == 0) and not self.closed and not self.eof_received:
- then = time.time()
- self.in_stderr_buffer_cv.wait(timeout)
- if timeout != None:
- timeout -= time.time() - then
- if timeout <= 0.0:
- raise socket.timeout()
- # something in the buffer and we have the lock
- if len(self.in_stderr_buffer) <= nbytes:
- out = self.in_stderr_buffer
- self.in_stderr_buffer = ''
- else:
- out = self.in_stderr_buffer[:nbytes]
- self.in_stderr_buffer = self.in_stderr_buffer[nbytes:]
- self._check_add_window(len(out))
+ if self.closed or self.eof_sent:
+ return True
+ return self.out_window_size > 0
finally:
self.lock.release()
- return out
-
+
def send(self, s):
"""
Send data to the channel. Returns the number of bytes sent, or 0 if
@@ -634,9 +676,9 @@ class Channel (object):
transmitted, the application needs to attempt delivery of the remaining
data.
- @param s: data to send.
+ @param s: data to send
@type s: str
- @return: number of bytes actually sent.
+ @return: number of bytes actually sent
@rtype: int
@raise socket.timeout: if no data could be sent before the timeout set
@@ -653,9 +695,11 @@ class Channel (object):
m.add_byte(chr(MSG_CHANNEL_DATA))
m.add_int(self.remote_chanid)
m.add_string(s[:size])
- self.transport._send_user_message(m)
finally:
self.lock.release()
+ # Note: We release self.lock before calling _send_user_message.
+ # Otherwise, we can deadlock during re-keying.
+ self.transport._send_user_message(m)
return size
def send_stderr(self, s):
@@ -689,9 +733,11 @@ class Channel (object):
m.add_int(self.remote_chanid)
m.add_int(1)
m.add_string(s[:size])
- self.transport._send_user_message(m)
finally:
self.lock.release()
+ # Note: We release self.lock before calling _send_user_message.
+ # Otherwise, we can deadlock during re-keying.
+ self.transport._send_user_message(m)
return size
def sendall(self, s):
@@ -776,14 +822,14 @@ class Channel (object):
def fileno(self):
"""
Returns an OS-level file descriptor which can be used for polling, but
- but I{not} for reading or writing). This is primaily to allow python's
+ but I{not} for reading or writing. This is primaily to allow python's
C{select} module to work.
The first time C{fileno} is called on a channel, a pipe is created to
simulate real OS-level file descriptor (FD) behavior. Because of this,
two OS-level FDs are created, which will use up FDs faster than normal.
- You won't notice this effect unless you open hundreds or thousands of
- channels simultaneously, but it's still notable.
+ (You won't notice this effect unless you have hundreds of channels
+ open at the same time.)
@return: an OS-level file descriptor
@rtype: int
@@ -793,13 +839,14 @@ class Channel (object):
"""
self.lock.acquire()
try:
- if self.pipe is not None:
- return self.pipe.fileno()
+ if self._pipe is not None:
+ return self._pipe.fileno()
# create the pipe and feed in any existing data
- self.pipe = pipe.make_pipe()
- if len(self.in_buffer) > 0:
- self.pipe.set()
- return self.pipe.fileno()
+ self._pipe = pipe.make_pipe()
+ p1, p2 = pipe.make_or_pipe(self._pipe)
+ self.in_buffer.set_event(p1)
+ self.in_stderr_buffer.set_event(p2)
+ return self._pipe.fileno()
finally:
self.lock.release()
@@ -856,7 +903,7 @@ class Channel (object):
def _set_transport(self, transport):
self.transport = transport
- self.logger = util.get_logger(self.transport.get_log_channel() + '.' + self.name)
+ self.logger = util.get_logger(self.transport.get_log_channel())
def _set_window(self, window_size, max_packet_size):
self.in_window_size = window_size
@@ -869,7 +916,7 @@ class Channel (object):
def _set_remote_channel(self, chanid, window_size, max_packet_size):
self.remote_chanid = chanid
self.out_window_size = window_size
- self.out_max_packet_size = max(max_packet_size, self.MIN_PACKET_SIZE)
+ self.out_max_packet_size = max(max_packet_size, MIN_PACKET_SIZE)
self.active = 1
self._log(DEBUG, 'Max packet out: %d bytes' % max_packet_size)
@@ -894,16 +941,7 @@ class Channel (object):
s = m
else:
s = m.get_string()
- self.lock.acquire()
- try:
- if self.ultra_debug:
- self._log(DEBUG, 'fed %d bytes' % len(s))
- if self.pipe is not None:
- self.pipe.set()
- self.in_buffer += s
- self.in_buffer_cv.notifyAll()
- finally:
- self.lock.release()
+ self.in_buffer.feed(s)
def _feed_extended(self, m):
code = m.get_int()
@@ -912,15 +950,9 @@ class Channel (object):
self._log(ERROR, 'unknown extended_data type %d; discarding' % code)
return
if self.combine_stderr:
- return self._feed(s)
- self.lock.acquire()
- try:
- if self.ultra_debug:
- self._log(DEBUG, 'fed %d stderr bytes' % len(s))
- self.in_stderr_buffer += s
- self.in_stderr_buffer_cv.notifyAll()
- finally:
- self.lock.release()
+ self._feed(s)
+ else:
+ self.in_stderr_buffer.feed(s)
def _window_adjust(self, m):
nbytes = m.get_int()
@@ -984,6 +1016,16 @@ class Channel (object):
else:
ok = server.check_channel_window_change_request(self, width, height, pixelwidth,
pixelheight)
+ elif key == 'x11-req':
+ single_connection = m.get_boolean()
+ auth_proto = m.get_string()
+ auth_cookie = m.get_string()
+ screen_number = m.get_int()
+ if server is None:
+ ok = False
+ else:
+ ok = server.check_channel_x11_request(self, single_connection,
+ auth_proto, auth_cookie, screen_number)
else:
self._log(DEBUG, 'Unhandled channel request "%s"' % key)
ok = False
@@ -1001,13 +1043,13 @@ class Channel (object):
try:
if not self.eof_received:
self.eof_received = True
- self.in_buffer_cv.notifyAll()
- self.in_stderr_buffer_cv.notifyAll()
- if self.pipe is not None:
- self.pipe.set_forever()
+ self.in_buffer.close()
+ self.in_stderr_buffer.close()
+ if self._pipe is not None:
+ self._pipe.set_forever()
finally:
self.lock.release()
- self._log(DEBUG, 'EOF received')
+ self._log(DEBUG, 'EOF received (%s)', self._name)
def _handle_close(self, m):
self.lock.acquire()
@@ -1024,17 +1066,29 @@ class Channel (object):
### internals...
- def _log(self, level, msg):
- self.logger.log(level, msg)
+ def _log(self, level, msg, *args):
+ self.logger.log(level, "[chan " + self._name + "] " + msg, *args)
+
+ def _wait_for_event(self):
+ while True:
+ self.event.wait(0.1)
+ if self.event.isSet():
+ return
+ if self.closed:
+ e = self.transport.get_exception()
+ if e is None:
+ e = SSHException('Channel closed.')
+ raise e
+ return
def _set_closed(self):
# you are holding the lock.
self.closed = True
- self.in_buffer_cv.notifyAll()
- self.in_stderr_buffer_cv.notifyAll()
+ self.in_buffer.close()
+ self.in_stderr_buffer.close()
self.out_buffer_cv.notifyAll()
- if self.pipe is not None:
- self.pipe.set_forever()
+ if self._pipe is not None:
+ self._pipe.set_forever()
def _send_eof(self):
# you are holding the lock.
@@ -1044,7 +1098,7 @@ class Channel (object):
m.add_byte(chr(MSG_CHANNEL_EOF))
m.add_int(self.remote_chanid)
self.eof_sent = True
- self._log(DEBUG, 'EOF sent')
+ self._log(DEBUG, 'EOF sent (%s)', self._name)
return m
def _close_internal(self):
@@ -1072,19 +1126,22 @@ class Channel (object):
self.lock.release()
def _check_add_window(self, n):
- # already holding the lock!
- if self.closed or self.eof_received or not self.active:
- return 0
- if self.ultra_debug:
- self._log(DEBUG, 'addwindow %d' % n)
- self.in_window_sofar += n
- if self.in_window_sofar <= self.in_window_threshold:
- return 0
- if self.ultra_debug:
- self._log(DEBUG, 'addwindow send %d' % self.in_window_sofar)
- out = self.in_window_sofar
- self.in_window_sofar = 0
- return out
+ self.lock.acquire()
+ try:
+ if self.closed or self.eof_received or not self.active:
+ return 0
+ if self.ultra_debug:
+ self._log(DEBUG, 'addwindow %d' % n)
+ self.in_window_sofar += n
+ if self.in_window_sofar <= self.in_window_threshold:
+ return 0
+ if self.ultra_debug:
+ self._log(DEBUG, 'addwindow send %d' % self.in_window_sofar)
+ out = self.in_window_sofar
+ self.in_window_sofar = 0
+ return out
+ finally:
+ self.lock.release()
def _wait_for_send_window(self, size):
"""
@@ -1155,8 +1212,6 @@ class ChannelFile (BufferedFile):
def _write(self, data):
self.channel.sendall(data)
return len(data)
-
- seek = BufferedFile.seek
class ChannelStderrFile (ChannelFile):