diff options
Diffstat (limited to 'paramiko/transport.py')
-rw-r--r-- | paramiko/transport.py | 238 |
1 files changed, 148 insertions, 90 deletions
diff --git a/paramiko/transport.py b/paramiko/transport.py index 30de295..fd6dab7 100644 --- a/paramiko/transport.py +++ b/paramiko/transport.py @@ -29,6 +29,7 @@ import threading import time import weakref +import paramiko from paramiko import util from paramiko.auth_handler import AuthHandler from paramiko.channel import Channel @@ -43,7 +44,9 @@ from paramiko.primes import ModulusPack from paramiko.rsakey import RSAKey from paramiko.server import ServerInterface from paramiko.sftp_client import SFTPClient -from paramiko.ssh_exception import SSHException, BadAuthenticationType, ChannelException +from paramiko.ssh_exception import (SSHException, BadAuthenticationType, + ChannelException, ProxyCommandFailure) +from paramiko.util import retry_on_signal from Crypto import Random from Crypto.Cipher import Blowfish, AES, DES3, ARC4 @@ -194,7 +197,7 @@ class Transport (threading.Thread): """ _PROTO_ID = '2.0' - _CLIENT_ID = 'paramiko_1.7.7.1' + _CLIENT_ID = 'paramiko_%s' % (paramiko.__version__) _preferred_ciphers = ( 'aes128-ctr', 'aes256-ctr', 'aes128-cbc', 'blowfish-cbc', 'aes256-cbc', '3des-cbc', 'arcfour128', 'arcfour256' ) @@ -288,7 +291,7 @@ class Transport (threading.Thread): addr = sockaddr sock = socket.socket(af, socket.SOCK_STREAM) try: - sock.connect((hostname, port)) + retry_on_signal(lambda: sock.connect((hostname, port))) except socket.error, e: reason = str(e) else: @@ -341,6 +344,7 @@ class Transport (threading.Thread): self._channel_counter = 1 self.window_size = 65536 self.max_packet_size = 34816 + self._forward_agent_handler = None self._x11_handler = None self._tcp_handler = None @@ -673,6 +677,20 @@ class Transport (threading.Thread): """ return self.open_channel('x11', src_addr=src_addr) + def open_forward_agent_channel(self): + """ + Request a new channel to the client, of type + C{"auth-agent@openssh.com"}. + + This is just an alias for C{open_channel('auth-agent@openssh.com')}. + @return: a new L{Channel} + @rtype: L{Channel} + + @raise SSHException: if the request is rejected or the session ends + prematurely + """ + return self.open_channel('auth-agent@openssh.com') + def open_forwarded_tcpip_channel(self, (src_addr, src_port), (dest_addr, dest_port)): """ Request a new channel back to the client, of type C{"forwarded-tcpip"}. @@ -1481,6 +1499,14 @@ class Transport (threading.Thread): else: return self._cipher_info[name]['class'].new(key, self._cipher_info[name]['mode'], iv) + def _set_forward_agent_handler(self, handler): + if handler is None: + def default_handler(channel): + self._queue_incoming_channel(channel) + self._forward_agent_handler = default_handler + else: + self._forward_agent_handler = handler + def _set_x11_handler(self, handler): # only called if a channel has turned on x11 forwarding if handler is None: @@ -1505,6 +1531,18 @@ class Transport (threading.Thread): # indefinitely, creating a GC cycle and not letting Transport ever be # GC'd. it's a bug in Thread.) + # Hold reference to 'sys' so we can test sys.modules to detect + # interpreter shutdown. + self.sys = sys + + # Required to prevent RNG errors when running inside many subprocess + # containers. + Random.atfork() + + # Hold reference to 'sys' so we can test sys.modules to detect + # interpreter shutdown. + self.sys = sys + # active=True occurs before the thread is launched, to avoid a race _active_threads.append(self) if self.server_mode: @@ -1512,94 +1550,102 @@ class Transport (threading.Thread): else: self._log(DEBUG, 'starting thread (client mode): %s' % hex(long(id(self)) & 0xffffffffL)) try: - self.packetizer.write_all(self.local_version + '\r\n') - self._check_banner() - self._send_kex_init() - self._expect_packet(MSG_KEXINIT) - - while self.active: - if self.packetizer.need_rekey() and not self.in_kex: - self._send_kex_init() - try: - ptype, m = self.packetizer.read_message() - except NeedRekeyException: - continue - if ptype == MSG_IGNORE: - continue - elif ptype == MSG_DISCONNECT: - self._parse_disconnect(m) - self.active = False - self.packetizer.close() - break - elif ptype == MSG_DEBUG: - self._parse_debug(m) - continue - if len(self._expected_packet) > 0: - if ptype not in self._expected_packet: - raise SSHException('Expecting packet from %r, got %d' % (self._expected_packet, ptype)) - self._expected_packet = tuple() - if (ptype >= 30) and (ptype <= 39): - self.kex_engine.parse_next(ptype, m) + try: + self.packetizer.write_all(self.local_version + '\r\n') + self._check_banner() + self._send_kex_init() + self._expect_packet(MSG_KEXINIT) + + while self.active: + if self.packetizer.need_rekey() and not self.in_kex: + self._send_kex_init() + try: + ptype, m = self.packetizer.read_message() + except NeedRekeyException: continue - - if ptype in self._handler_table: - self._handler_table[ptype](self, m) - elif ptype in self._channel_handler_table: - chanid = m.get_int() - chan = self._channels.get(chanid) - if chan is not None: - self._channel_handler_table[ptype](chan, m) - elif chanid in self.channels_seen: - self._log(DEBUG, 'Ignoring message for dead channel %d' % chanid) - else: - self._log(ERROR, 'Channel request for unknown channel %d' % chanid) + if ptype == MSG_IGNORE: + continue + elif ptype == MSG_DISCONNECT: + self._parse_disconnect(m) self.active = False self.packetizer.close() - elif (self.auth_handler is not None) and (ptype in self.auth_handler._handler_table): - self.auth_handler._handler_table[ptype](self.auth_handler, m) + break + elif ptype == MSG_DEBUG: + self._parse_debug(m) + continue + if len(self._expected_packet) > 0: + if ptype not in self._expected_packet: + raise SSHException('Expecting packet from %r, got %d' % (self._expected_packet, ptype)) + self._expected_packet = tuple() + if (ptype >= 30) and (ptype <= 39): + self.kex_engine.parse_next(ptype, m) + continue + + if ptype in self._handler_table: + self._handler_table[ptype](self, m) + elif ptype in self._channel_handler_table: + chanid = m.get_int() + chan = self._channels.get(chanid) + if chan is not None: + self._channel_handler_table[ptype](chan, m) + elif chanid in self.channels_seen: + self._log(DEBUG, 'Ignoring message for dead channel %d' % chanid) + else: + self._log(ERROR, 'Channel request for unknown channel %d' % chanid) + self.active = False + self.packetizer.close() + elif (self.auth_handler is not None) and (ptype in self.auth_handler._handler_table): + self.auth_handler._handler_table[ptype](self.auth_handler, m) + else: + self._log(WARNING, 'Oops, unhandled type %d' % ptype) + msg = Message() + msg.add_byte(chr(MSG_UNIMPLEMENTED)) + msg.add_int(m.seqno) + self._send_message(msg) + except SSHException, e: + self._log(ERROR, 'Exception: ' + str(e)) + self._log(ERROR, util.tb_strings()) + self.saved_exception = e + except EOFError, e: + self._log(DEBUG, 'EOF in transport thread') + #self._log(DEBUG, util.tb_strings()) + self.saved_exception = e + except socket.error, e: + if type(e.args) is tuple: + emsg = '%s (%d)' % (e.args[1], e.args[0]) else: - self._log(WARNING, 'Oops, unhandled type %d' % ptype) - msg = Message() - msg.add_byte(chr(MSG_UNIMPLEMENTED)) - msg.add_int(m.seqno) - self._send_message(msg) - except SSHException, e: - self._log(ERROR, 'Exception: ' + str(e)) - self._log(ERROR, util.tb_strings()) - self.saved_exception = e - except EOFError, e: - self._log(DEBUG, 'EOF in transport thread') - #self._log(DEBUG, util.tb_strings()) - self.saved_exception = e - except socket.error, e: - if type(e.args) is tuple: - emsg = '%s (%d)' % (e.args[1], e.args[0]) - else: - emsg = e.args - self._log(ERROR, 'Socket exception: ' + emsg) - self.saved_exception = e - except Exception, e: - self._log(ERROR, 'Unknown exception: ' + str(e)) - self._log(ERROR, util.tb_strings()) - self.saved_exception = e - _active_threads.remove(self) - for chan in self._channels.values(): - chan._unlink() - if self.active: - self.active = False - self.packetizer.close() - if self.completion_event != None: - self.completion_event.set() - if self.auth_handler is not None: - self.auth_handler.abort() - for event in self.channel_events.values(): - event.set() - try: - self.lock.acquire() - self.server_accept_cv.notify() - finally: - self.lock.release() - self.sock.close() + emsg = e.args + self._log(ERROR, 'Socket exception: ' + emsg) + self.saved_exception = e + except Exception, e: + self._log(ERROR, 'Unknown exception: ' + str(e)) + self._log(ERROR, util.tb_strings()) + self.saved_exception = e + _active_threads.remove(self) + for chan in self._channels.values(): + chan._unlink() + if self.active: + self.active = False + self.packetizer.close() + if self.completion_event != None: + self.completion_event.set() + if self.auth_handler is not None: + self.auth_handler.abort() + for event in self.channel_events.values(): + event.set() + try: + self.lock.acquire() + self.server_accept_cv.notify() + finally: + self.lock.release() + self.sock.close() + except: + # Don't raise spurious 'NoneType has no attribute X' errors when we + # wake up during interpreter shutdown. Or rather -- raise + # everything *if* sys.modules (used as a convenient sentinel) + # appears to still exist. + if self.sys.modules is not None: + raise ### protocol stages @@ -1629,6 +1675,8 @@ class Transport (threading.Thread): timeout = 2 try: buf = self.packetizer.readline(timeout) + except ProxyCommandFailure: + raise except Exception, x: raise SSHException('Error reading SSH protocol banner' + str(x)) if buf[:4] == 'SSH-': @@ -1837,7 +1885,8 @@ class Transport (threading.Thread): mac_key = self._compute_key('F', mac_engine.digest_size) else: mac_key = self._compute_key('E', mac_engine.digest_size) - self.packetizer.set_outbound_cipher(engine, block_size, mac_engine, mac_size, mac_key) + sdctr = self.local_cipher.endswith('-ctr') + self.packetizer.set_outbound_cipher(engine, block_size, mac_engine, mac_size, mac_key, sdctr) compress_out = self._compression_info[self.local_compression][0] if (compress_out is not None) and ((self.local_compression != 'zlib@openssh.com') or self.authenticated): self._log(DEBUG, 'Switching on outbound compression ...') @@ -1980,7 +2029,14 @@ class Transport (threading.Thread): initial_window_size = m.get_int() max_packet_size = m.get_int() reject = False - if (kind == 'x11') and (self._x11_handler is not None): + if (kind == 'auth-agent@openssh.com') and (self._forward_agent_handler is not None): + self._log(DEBUG, 'Incoming forward agent connection') + self.lock.acquire() + try: + my_chanid = self._next_channel() + finally: + self.lock.release() + elif (kind == 'x11') and (self._x11_handler is not None): origin_addr = m.get_string() origin_port = m.get_int() self._log(DEBUG, 'Incoming x11 connection from %s:%d' % (origin_addr, origin_port)) @@ -2052,7 +2108,9 @@ class Transport (threading.Thread): m.add_int(self.max_packet_size) self._send_message(m) self._log(INFO, 'Secsh channel %d (%s) opened.', my_chanid, kind) - if kind == 'x11': + if kind == 'auth-agent@openssh.com': + self._forward_agent_handler(chan) + elif kind == 'x11': self._x11_handler(chan, (origin_addr, origin_port)) elif kind == 'forwarded-tcpip': chan.origin_addr = (origin_addr, origin_port) |