diff options
author | Jeremy T. Bouse <jbouse@debian.org> | 2009-11-27 16:20:12 -0500 |
---|---|---|
committer | Jeremy T. Bouse <jbouse@debian.org> | 2009-11-27 16:20:12 -0500 |
commit | ed280d5ac360e2af796e9bd973d7b4df89f0c449 (patch) | |
tree | ce892d6ce9dad8c0ecbc9cbe73f8095195bef0b4 /paramiko/channel.py | |
parent | 176c6caf4ea7918e1698438634b237fab8456471 (diff) | |
download | python-paramiko-ed280d5ac360e2af796e9bd973d7b4df89f0c449.tar python-paramiko-ed280d5ac360e2af796e9bd973d7b4df89f0c449.tar.gz |
Imported Upstream version 1.7.4upstream/1.7.4
Diffstat (limited to 'paramiko/channel.py')
-rw-r--r-- | paramiko/channel.py | 471 |
1 files changed, 263 insertions, 208 deletions
diff --git a/paramiko/channel.py b/paramiko/channel.py index 8a00233..910a03c 100644 --- a/paramiko/channel.py +++ b/paramiko/channel.py @@ -1,4 +1,4 @@ -# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net> +# Copyright (C) 2003-2007 Robey Pointer <robey@lag.net> # # This file is part of paramiko. # @@ -20,6 +20,7 @@ Abstraction for an SSH2 channel. """ +import binascii import sys import time import threading @@ -31,9 +32,14 @@ from paramiko import util from paramiko.message import Message from paramiko.ssh_exception import SSHException from paramiko.file import BufferedFile +from paramiko.buffered_pipe import BufferedPipe, PipeTimeout from paramiko import pipe +# lower bound on the max packet size we'll accept from the remote host +MIN_PACKET_SIZE = 1024 + + class Channel (object): """ A secure tunnel across an SSH L{Transport}. A Channel is meant to behave @@ -49,9 +55,6 @@ class Channel (object): is exactly like a normal network socket, so it shouldn't be too surprising. """ - # lower bound on the max packet size we'll accept from the remote host - MIN_PACKET_SIZE = 1024 - def __init__(self, chanid): """ Create a new channel. The channel is not associated with any @@ -69,14 +72,12 @@ class Channel (object): self.active = False self.eof_received = 0 self.eof_sent = 0 - self.in_buffer = '' - self.in_stderr_buffer = '' + self.in_buffer = BufferedPipe() + self.in_stderr_buffer = BufferedPipe() self.timeout = None self.closed = False self.ultra_debug = False self.lock = threading.Lock() - self.in_buffer_cv = threading.Condition(self.lock) - self.in_stderr_buffer_cv = threading.Condition(self.lock) self.out_buffer_cv = threading.Condition(self.lock) self.in_window_size = 0 self.out_window_size = 0 @@ -85,15 +86,19 @@ class Channel (object): self.in_window_threshold = 0 self.in_window_sofar = 0 self.status_event = threading.Event() - self.name = str(chanid) - self.logger = util.get_logger('paramiko.chan.' + str(chanid)) - self.pipe = None + self._name = str(chanid) + self.logger = util.get_logger('paramiko.transport') + self._pipe = None self.event = threading.Event() self.combine_stderr = False self.exit_status = -1 + self.origin_addr = None def __del__(self): - self.close() + try: + self.close() + except: + pass def __repr__(self): """ @@ -124,14 +129,15 @@ class Channel (object): It isn't necessary (or desirable) to call this method if you're going to exectue a single command with L{exec_command}. - @param term: the terminal type to emulate (for example, C{'vt100'}). + @param term: the terminal type to emulate (for example, C{'vt100'}) @type term: str @param width: width (in characters) of the terminal screen @type width: int @param height: height (in characters) of the terminal screen @type height: int - @return: C{True} if the operation succeeded; C{False} if not. - @rtype: bool + + @raise SSHException: if the request was rejected or the channel was + closed """ if self.closed or self.eof_received or self.eof_sent or not self.active: raise SSHException('Channel is not open') @@ -148,12 +154,7 @@ class Channel (object): m.add_string('') self.event.clear() self.transport._send_user_message(m) - while True: - self.event.wait(0.1) - if self.closed: - return False - if self.event.isSet(): - return True + self._wait_for_event() def invoke_shell(self): """ @@ -168,8 +169,8 @@ class Channel (object): When the shell exits, the channel will be closed and can't be reused. You must open a new channel if you wish to open another shell. - @return: C{True} if the operation succeeded; C{False} if not. - @rtype: bool + @raise SSHException: if the request was rejected or the channel was + closed """ if self.closed or self.eof_received or self.eof_sent or not self.active: raise SSHException('Channel is not open') @@ -180,12 +181,7 @@ class Channel (object): m.add_boolean(1) self.event.clear() self.transport._send_user_message(m) - while True: - self.event.wait(0.1) - if self.closed: - return False - if self.event.isSet(): - return True + self._wait_for_event() def exec_command(self, command): """ @@ -199,8 +195,9 @@ class Channel (object): @param command: a shell command to execute. @type command: str - @return: C{True} if the operation succeeded; C{False} if not. - @rtype: bool + + @raise SSHException: if the request was rejected or the channel was + closed """ if self.closed or self.eof_received or self.eof_sent or not self.active: raise SSHException('Channel is not open') @@ -208,16 +205,11 @@ class Channel (object): m.add_byte(chr(MSG_CHANNEL_REQUEST)) m.add_int(self.remote_chanid) m.add_string('exec') - m.add_boolean(1) + m.add_boolean(True) m.add_string(command) self.event.clear() self.transport._send_user_message(m) - while True: - self.event.wait(0.1) - if self.closed: - return False - if self.event.isSet(): - return True + self._wait_for_event() def invoke_subsystem(self, subsystem): """ @@ -230,8 +222,9 @@ class Channel (object): @param subsystem: name of the subsystem being requested. @type subsystem: str - @return: C{True} if the operation succeeded; C{False} if not. - @rtype: bool + + @raise SSHException: if the request was rejected or the channel was + closed """ if self.closed or self.eof_received or self.eof_sent or not self.active: raise SSHException('Channel is not open') @@ -239,16 +232,11 @@ class Channel (object): m.add_byte(chr(MSG_CHANNEL_REQUEST)) m.add_int(self.remote_chanid) m.add_string('subsystem') - m.add_boolean(1) + m.add_boolean(True) m.add_string(subsystem) self.event.clear() self.transport._send_user_message(m) - while True: - self.event.wait(0.1) - if self.closed: - return False - if self.event.isSet(): - return True + self._wait_for_event() def resize_pty(self, width=80, height=24): """ @@ -259,8 +247,9 @@ class Channel (object): @type width: int @param height: new height (in characters) of the terminal screen @type height: int - @return: C{True} if the operation succeeded; C{False} if not. - @rtype: bool + + @raise SSHException: if the request was rejected or the channel was + closed """ if self.closed or self.eof_received or self.eof_sent or not self.active: raise SSHException('Channel is not open') @@ -268,19 +257,27 @@ class Channel (object): m.add_byte(chr(MSG_CHANNEL_REQUEST)) m.add_int(self.remote_chanid) m.add_string('window-change') - m.add_boolean(1) + m.add_boolean(True) m.add_int(width) m.add_int(height) m.add_int(0).add_int(0) self.event.clear() self.transport._send_user_message(m) - while True: - self.event.wait(0.1) - if self.closed: - return False - if self.event.isSet(): - return True + self._wait_for_event() + def exit_status_ready(self): + """ + Return true if the remote process has exited and returned an exit + status. You may use this to poll the process status if you don't + want to block in L{recv_exit_status}. Note that the server may not + return an exit status in some cases (like bad servers). + + @return: True if L{recv_exit_status} will return immediately + @rtype: bool + @since: 1.7.3 + """ + return self.closed or self.status_event.isSet() + def recv_exit_status(self): """ Return the exit status from the process on the server. This is @@ -296,8 +293,9 @@ class Channel (object): """ while True: if self.closed or self.status_event.isSet(): - return self.exit_status + break self.status_event.wait(0.1) + return self.exit_status def send_exit_status(self, status): """ @@ -317,10 +315,73 @@ class Channel (object): m.add_byte(chr(MSG_CHANNEL_REQUEST)) m.add_int(self.remote_chanid) m.add_string('exit-status') - m.add_boolean(0) + m.add_boolean(False) m.add_int(status) self.transport._send_user_message(m) + + def request_x11(self, screen_number=0, auth_protocol=None, auth_cookie=None, + single_connection=False, handler=None): + """ + Request an x11 session on this channel. If the server allows it, + further x11 requests can be made from the server to the client, + when an x11 application is run in a shell session. + + From RFC4254:: + + It is RECOMMENDED that the 'x11 authentication cookie' that is + sent be a fake, random cookie, and that the cookie be checked and + replaced by the real cookie when a connection request is received. + If you omit the auth_cookie, a new secure random 128-bit value will be + generated, used, and returned. You will need to use this value to + verify incoming x11 requests and replace them with the actual local + x11 cookie (which requires some knoweldge of the x11 protocol). + + If a handler is passed in, the handler is called from another thread + whenever a new x11 connection arrives. The default handler queues up + incoming x11 connections, which may be retrieved using + L{Transport.accept}. The handler's calling signature is:: + + handler(channel: Channel, (address: str, port: int)) + + @param screen_number: the x11 screen number (0, 10, etc) + @type screen_number: int + @param auth_protocol: the name of the X11 authentication method used; + if none is given, C{"MIT-MAGIC-COOKIE-1"} is used + @type auth_protocol: str + @param auth_cookie: hexadecimal string containing the x11 auth cookie; + if none is given, a secure random 128-bit value is generated + @type auth_cookie: str + @param single_connection: if True, only a single x11 connection will be + forwarded (by default, any number of x11 connections can arrive + over this session) + @type single_connection: bool + @param handler: an optional handler to use for incoming X11 connections + @type handler: function + @return: the auth_cookie used + """ + if self.closed or self.eof_received or self.eof_sent or not self.active: + raise SSHException('Channel is not open') + if auth_protocol is None: + auth_protocol = 'MIT-MAGIC-COOKIE-1' + if auth_cookie is None: + auth_cookie = binascii.hexlify(self.transport.randpool.get_bytes(16)) + + m = Message() + m.add_byte(chr(MSG_CHANNEL_REQUEST)) + m.add_int(self.remote_chanid) + m.add_string('x11-req') + m.add_boolean(True) + m.add_boolean(single_connection) + m.add_string(auth_protocol) + m.add_string(auth_cookie) + m.add_int(screen_number) + self.event.clear() + self.transport._send_user_message(m) + self._wait_for_event() + self.transport._set_x11_handler(handler) + return auth_cookie + def get_transport(self): """ Return the L{Transport} associated with this channel. @@ -333,14 +394,13 @@ class Channel (object): def set_name(self, name): """ Set a name for this channel. Currently it's only used to set the name - of the log level used for debugging. The name can be fetched with the + of the channel in logfile entries. The name can be fetched with the L{get_name} method. - @param name: new channel name. + @param name: new channel name @type name: str """ - self.name = name - self.logger = util.get_logger(self.transport.get_log_channel() + '.' + self.name) + self._name = name def get_name(self): """ @@ -349,7 +409,7 @@ class Channel (object): @return: the name of this channel. @rtype: str """ - return self.name + return self._name def get_id(self): """ @@ -360,8 +420,6 @@ class Channel (object): @return: the ID of this channel. @rtype: int - - @since: ivysaur """ return self.chanid @@ -394,8 +452,7 @@ class Channel (object): self.combine_stderr = combine if combine and not old: # copy old stderr buffer into primary buffer - data = self.in_stderr_buffer - self.in_stderr_buffer = '' + data = self.in_stderr_buffer.empty() finally: self.lock.release() if len(data) > 0: @@ -419,7 +476,7 @@ class Channel (object): C{chan.settimeout(None)} is equivalent to C{chan.setblocking(1)}. @param timeout: seconds to wait for a pending read/write operation - before raising C{socket.timeout}, or C{None} for no timeout. + before raising C{socket.timeout}, or C{None} for no timeout. @type timeout: float """ self.timeout = timeout @@ -439,17 +496,19 @@ class Channel (object): """ Set blocking or non-blocking mode of the channel: if C{blocking} is 0, the channel is set to non-blocking mode; otherwise it's set to blocking - mode. Initially all channels are in blocking mode. + mode. Initially all channels are in blocking mode. In non-blocking mode, if a L{recv} call doesn't find any data, or if a L{send} call can't immediately dispose of the data, an error exception - is raised. In blocking mode, the calls block until they can proceed. + is raised. In blocking mode, the calls block until they can proceed. An + EOF condition is considered "immediate data" for L{recv}, so if the + channel is closed in the read direction, it will never block. C{chan.setblocking(0)} is equivalent to C{chan.settimeout(0)}; C{chan.setblocking(1)} is equivalent to C{chan.settimeout(None)}. @param blocking: 0 to set non-blocking mode; non-0 to set blocking - mode. + mode. @type blocking: int """ if blocking: @@ -457,6 +516,18 @@ class Channel (object): else: self.settimeout(0.0) + def getpeername(self): + """ + Return the address of the remote side of this Channel, if possible. + This is just a wrapper around C{'getpeername'} on the Transport, used + to provide enough of a socket-like interface to allow asyncore to work. + (asyncore likes to call C{'getpeername'}.) + + @return: the address if the remote host, if known + @rtype: tuple(str, int) + """ + return self.transport.getpeername() + def close(self): """ Close the channel. All future read/write operations on the channel @@ -466,15 +537,17 @@ class Channel (object): """ self.lock.acquire() try: + # only close the pipe when the user explicitly closes the channel. + # otherwise they will get unpleasant surprises. (and do it before + # checking self.closed, since the remote host may have already + # closed the connection.) + if self._pipe is not None: + self._pipe.close() + self._pipe = None + if not self.active or self.closed: return msgs = self._close_internal() - - # only close the pipe when the user explicitly closes the channel. - # otherwise they will get unpleasant surprises. - if self.pipe is not None: - self.pipe.close() - self.pipe = None finally: self.lock.release() for m in msgs: @@ -491,13 +564,7 @@ class Channel (object): return at least one byte; C{False} otherwise. @rtype: boolean """ - self.lock.acquire() - try: - if len(self.in_buffer) == 0: - return False - return True - finally: - self.lock.release() + return self.in_buffer.read_ready() def recv(self, nbytes): """ @@ -514,38 +581,12 @@ class Channel (object): @raise socket.timeout: if no data is ready before the timeout set by L{settimeout}. """ - out = '' - self.lock.acquire() try: - if len(self.in_buffer) == 0: - if self.closed or self.eof_received: - return out - # should we block? - if self.timeout == 0.0: - raise socket.timeout() - # loop here in case we get woken up but a different thread has grabbed everything in the buffer - timeout = self.timeout - while (len(self.in_buffer) == 0) and not self.closed and not self.eof_received: - then = time.time() - self.in_buffer_cv.wait(timeout) - if timeout != None: - timeout -= time.time() - then - if timeout <= 0.0: - raise socket.timeout() - # something in the buffer and we have the lock - if len(self.in_buffer) <= nbytes: - out = self.in_buffer - self.in_buffer = '' - if self.pipe is not None: - # clear the pipe, since no more data is buffered - self.pipe.clear() - else: - out = self.in_buffer[:nbytes] - self.in_buffer = self.in_buffer[nbytes:] - ack = self._check_add_window(len(out)) - finally: - self.lock.release() + out = self.in_buffer.read(nbytes, self.timeout) + except PipeTimeout, e: + raise socket.timeout() + ack = self._check_add_window(len(out)) # no need to hold the channel lock when sending this if ack > 0: m = Message() @@ -569,13 +610,7 @@ class Channel (object): @since: 1.1 """ - self.lock.acquire() - try: - if len(self.in_stderr_buffer) == 0: - return False - return True - finally: - self.lock.release() + return self.in_stderr_buffer.read_ready() def recv_stderr(self, nbytes): """ @@ -596,36 +631,43 @@ class Channel (object): @since: 1.1 """ - out = '' + try: + out = self.in_stderr_buffer.read(nbytes, self.timeout) + except PipeTimeout, e: + raise socket.timeout() + + ack = self._check_add_window(len(out)) + # no need to hold the channel lock when sending this + if ack > 0: + m = Message() + m.add_byte(chr(MSG_CHANNEL_WINDOW_ADJUST)) + m.add_int(self.remote_chanid) + m.add_int(ack) + self.transport._send_user_message(m) + + return out + + def send_ready(self): + """ + Returns true if data can be written to this channel without blocking. + This means the channel is either closed (so any write attempt would + return immediately) or there is at least one byte of space in the + outbound buffer. If there is at least one byte of space in the + outbound buffer, a L{send} call will succeed immediately and return + the number of bytes actually written. + + @return: C{True} if a L{send} call on this channel would immediately + succeed or fail + @rtype: boolean + """ self.lock.acquire() try: - if len(self.in_stderr_buffer) == 0: - if self.closed or self.eof_received: - return out - # should we block? - if self.timeout == 0.0: - raise socket.timeout() - # loop here in case we get woken up but a different thread has grabbed everything in the buffer - timeout = self.timeout - while (len(self.in_stderr_buffer) == 0) and not self.closed and not self.eof_received: - then = time.time() - self.in_stderr_buffer_cv.wait(timeout) - if timeout != None: - timeout -= time.time() - then - if timeout <= 0.0: - raise socket.timeout() - # something in the buffer and we have the lock - if len(self.in_stderr_buffer) <= nbytes: - out = self.in_stderr_buffer - self.in_stderr_buffer = '' - else: - out = self.in_stderr_buffer[:nbytes] - self.in_stderr_buffer = self.in_stderr_buffer[nbytes:] - self._check_add_window(len(out)) + if self.closed or self.eof_sent: + return True + return self.out_window_size > 0 finally: self.lock.release() - return out - + def send(self, s): """ Send data to the channel. Returns the number of bytes sent, or 0 if @@ -634,9 +676,9 @@ class Channel (object): transmitted, the application needs to attempt delivery of the remaining data. - @param s: data to send. + @param s: data to send @type s: str - @return: number of bytes actually sent. + @return: number of bytes actually sent @rtype: int @raise socket.timeout: if no data could be sent before the timeout set @@ -653,9 +695,11 @@ class Channel (object): m.add_byte(chr(MSG_CHANNEL_DATA)) m.add_int(self.remote_chanid) m.add_string(s[:size]) - self.transport._send_user_message(m) finally: self.lock.release() + # Note: We release self.lock before calling _send_user_message. + # Otherwise, we can deadlock during re-keying. + self.transport._send_user_message(m) return size def send_stderr(self, s): @@ -689,9 +733,11 @@ class Channel (object): m.add_int(self.remote_chanid) m.add_int(1) m.add_string(s[:size]) - self.transport._send_user_message(m) finally: self.lock.release() + # Note: We release self.lock before calling _send_user_message. + # Otherwise, we can deadlock during re-keying. + self.transport._send_user_message(m) return size def sendall(self, s): @@ -776,14 +822,14 @@ class Channel (object): def fileno(self): """ Returns an OS-level file descriptor which can be used for polling, but - but I{not} for reading or writing). This is primaily to allow python's + but I{not} for reading or writing. This is primaily to allow python's C{select} module to work. The first time C{fileno} is called on a channel, a pipe is created to simulate real OS-level file descriptor (FD) behavior. Because of this, two OS-level FDs are created, which will use up FDs faster than normal. - You won't notice this effect unless you open hundreds or thousands of - channels simultaneously, but it's still notable. + (You won't notice this effect unless you have hundreds of channels + open at the same time.) @return: an OS-level file descriptor @rtype: int @@ -793,13 +839,14 @@ class Channel (object): """ self.lock.acquire() try: - if self.pipe is not None: - return self.pipe.fileno() + if self._pipe is not None: + return self._pipe.fileno() # create the pipe and feed in any existing data - self.pipe = pipe.make_pipe() - if len(self.in_buffer) > 0: - self.pipe.set() - return self.pipe.fileno() + self._pipe = pipe.make_pipe() + p1, p2 = pipe.make_or_pipe(self._pipe) + self.in_buffer.set_event(p1) + self.in_stderr_buffer.set_event(p2) + return self._pipe.fileno() finally: self.lock.release() @@ -856,7 +903,7 @@ class Channel (object): def _set_transport(self, transport): self.transport = transport - self.logger = util.get_logger(self.transport.get_log_channel() + '.' + self.name) + self.logger = util.get_logger(self.transport.get_log_channel()) def _set_window(self, window_size, max_packet_size): self.in_window_size = window_size @@ -869,7 +916,7 @@ class Channel (object): def _set_remote_channel(self, chanid, window_size, max_packet_size): self.remote_chanid = chanid self.out_window_size = window_size - self.out_max_packet_size = max(max_packet_size, self.MIN_PACKET_SIZE) + self.out_max_packet_size = max(max_packet_size, MIN_PACKET_SIZE) self.active = 1 self._log(DEBUG, 'Max packet out: %d bytes' % max_packet_size) @@ -894,16 +941,7 @@ class Channel (object): s = m else: s = m.get_string() - self.lock.acquire() - try: - if self.ultra_debug: - self._log(DEBUG, 'fed %d bytes' % len(s)) - if self.pipe is not None: - self.pipe.set() - self.in_buffer += s - self.in_buffer_cv.notifyAll() - finally: - self.lock.release() + self.in_buffer.feed(s) def _feed_extended(self, m): code = m.get_int() @@ -912,15 +950,9 @@ class Channel (object): self._log(ERROR, 'unknown extended_data type %d; discarding' % code) return if self.combine_stderr: - return self._feed(s) - self.lock.acquire() - try: - if self.ultra_debug: - self._log(DEBUG, 'fed %d stderr bytes' % len(s)) - self.in_stderr_buffer += s - self.in_stderr_buffer_cv.notifyAll() - finally: - self.lock.release() + self._feed(s) + else: + self.in_stderr_buffer.feed(s) def _window_adjust(self, m): nbytes = m.get_int() @@ -984,6 +1016,16 @@ class Channel (object): else: ok = server.check_channel_window_change_request(self, width, height, pixelwidth, pixelheight) + elif key == 'x11-req': + single_connection = m.get_boolean() + auth_proto = m.get_string() + auth_cookie = m.get_string() + screen_number = m.get_int() + if server is None: + ok = False + else: + ok = server.check_channel_x11_request(self, single_connection, + auth_proto, auth_cookie, screen_number) else: self._log(DEBUG, 'Unhandled channel request "%s"' % key) ok = False @@ -1001,13 +1043,13 @@ class Channel (object): try: if not self.eof_received: self.eof_received = True - self.in_buffer_cv.notifyAll() - self.in_stderr_buffer_cv.notifyAll() - if self.pipe is not None: - self.pipe.set_forever() + self.in_buffer.close() + self.in_stderr_buffer.close() + if self._pipe is not None: + self._pipe.set_forever() finally: self.lock.release() - self._log(DEBUG, 'EOF received') + self._log(DEBUG, 'EOF received (%s)', self._name) def _handle_close(self, m): self.lock.acquire() @@ -1024,17 +1066,29 @@ class Channel (object): ### internals... - def _log(self, level, msg): - self.logger.log(level, msg) + def _log(self, level, msg, *args): + self.logger.log(level, "[chan " + self._name + "] " + msg, *args) + + def _wait_for_event(self): + while True: + self.event.wait(0.1) + if self.event.isSet(): + return + if self.closed: + e = self.transport.get_exception() + if e is None: + e = SSHException('Channel closed.') + raise e + return def _set_closed(self): # you are holding the lock. self.closed = True - self.in_buffer_cv.notifyAll() - self.in_stderr_buffer_cv.notifyAll() + self.in_buffer.close() + self.in_stderr_buffer.close() self.out_buffer_cv.notifyAll() - if self.pipe is not None: - self.pipe.set_forever() + if self._pipe is not None: + self._pipe.set_forever() def _send_eof(self): # you are holding the lock. @@ -1044,7 +1098,7 @@ class Channel (object): m.add_byte(chr(MSG_CHANNEL_EOF)) m.add_int(self.remote_chanid) self.eof_sent = True - self._log(DEBUG, 'EOF sent') + self._log(DEBUG, 'EOF sent (%s)', self._name) return m def _close_internal(self): @@ -1072,19 +1126,22 @@ class Channel (object): self.lock.release() def _check_add_window(self, n): - # already holding the lock! - if self.closed or self.eof_received or not self.active: - return 0 - if self.ultra_debug: - self._log(DEBUG, 'addwindow %d' % n) - self.in_window_sofar += n - if self.in_window_sofar <= self.in_window_threshold: - return 0 - if self.ultra_debug: - self._log(DEBUG, 'addwindow send %d' % self.in_window_sofar) - out = self.in_window_sofar - self.in_window_sofar = 0 - return out + self.lock.acquire() + try: + if self.closed or self.eof_received or not self.active: + return 0 + if self.ultra_debug: + self._log(DEBUG, 'addwindow %d' % n) + self.in_window_sofar += n + if self.in_window_sofar <= self.in_window_threshold: + return 0 + if self.ultra_debug: + self._log(DEBUG, 'addwindow send %d' % self.in_window_sofar) + out = self.in_window_sofar + self.in_window_sofar = 0 + return out + finally: + self.lock.release() def _wait_for_send_window(self, size): """ @@ -1155,8 +1212,6 @@ class ChannelFile (BufferedFile): def _write(self, data): self.channel.sendall(data) return len(data) - - seek = BufferedFile.seek class ChannelStderrFile (ChannelFile): |