aboutsummaryrefslogtreecommitdiff
path: root/paramiko/transport.py
diff options
context:
space:
mode:
Diffstat (limited to 'paramiko/transport.py')
-rw-r--r--paramiko/transport.py238
1 files changed, 148 insertions, 90 deletions
diff --git a/paramiko/transport.py b/paramiko/transport.py
index 30de295..fd6dab7 100644
--- a/paramiko/transport.py
+++ b/paramiko/transport.py
@@ -29,6 +29,7 @@ import threading
import time
import weakref
+import paramiko
from paramiko import util
from paramiko.auth_handler import AuthHandler
from paramiko.channel import Channel
@@ -43,7 +44,9 @@ from paramiko.primes import ModulusPack
from paramiko.rsakey import RSAKey
from paramiko.server import ServerInterface
from paramiko.sftp_client import SFTPClient
-from paramiko.ssh_exception import SSHException, BadAuthenticationType, ChannelException
+from paramiko.ssh_exception import (SSHException, BadAuthenticationType,
+ ChannelException, ProxyCommandFailure)
+from paramiko.util import retry_on_signal
from Crypto import Random
from Crypto.Cipher import Blowfish, AES, DES3, ARC4
@@ -194,7 +197,7 @@ class Transport (threading.Thread):
"""
_PROTO_ID = '2.0'
- _CLIENT_ID = 'paramiko_1.7.7.1'
+ _CLIENT_ID = 'paramiko_%s' % (paramiko.__version__)
_preferred_ciphers = ( 'aes128-ctr', 'aes256-ctr', 'aes128-cbc', 'blowfish-cbc', 'aes256-cbc', '3des-cbc',
'arcfour128', 'arcfour256' )
@@ -288,7 +291,7 @@ class Transport (threading.Thread):
addr = sockaddr
sock = socket.socket(af, socket.SOCK_STREAM)
try:
- sock.connect((hostname, port))
+ retry_on_signal(lambda: sock.connect((hostname, port)))
except socket.error, e:
reason = str(e)
else:
@@ -341,6 +344,7 @@ class Transport (threading.Thread):
self._channel_counter = 1
self.window_size = 65536
self.max_packet_size = 34816
+ self._forward_agent_handler = None
self._x11_handler = None
self._tcp_handler = None
@@ -673,6 +677,20 @@ class Transport (threading.Thread):
"""
return self.open_channel('x11', src_addr=src_addr)
+ def open_forward_agent_channel(self):
+ """
+ Request a new channel to the client, of type
+ C{"auth-agent@openssh.com"}.
+
+ This is just an alias for C{open_channel('auth-agent@openssh.com')}.
+ @return: a new L{Channel}
+ @rtype: L{Channel}
+
+ @raise SSHException: if the request is rejected or the session ends
+ prematurely
+ """
+ return self.open_channel('auth-agent@openssh.com')
+
def open_forwarded_tcpip_channel(self, (src_addr, src_port), (dest_addr, dest_port)):
"""
Request a new channel back to the client, of type C{"forwarded-tcpip"}.
@@ -1481,6 +1499,14 @@ class Transport (threading.Thread):
else:
return self._cipher_info[name]['class'].new(key, self._cipher_info[name]['mode'], iv)
+ def _set_forward_agent_handler(self, handler):
+ if handler is None:
+ def default_handler(channel):
+ self._queue_incoming_channel(channel)
+ self._forward_agent_handler = default_handler
+ else:
+ self._forward_agent_handler = handler
+
def _set_x11_handler(self, handler):
# only called if a channel has turned on x11 forwarding
if handler is None:
@@ -1505,6 +1531,18 @@ class Transport (threading.Thread):
# indefinitely, creating a GC cycle and not letting Transport ever be
# GC'd. it's a bug in Thread.)
+ # Hold reference to 'sys' so we can test sys.modules to detect
+ # interpreter shutdown.
+ self.sys = sys
+
+ # Required to prevent RNG errors when running inside many subprocess
+ # containers.
+ Random.atfork()
+
+ # Hold reference to 'sys' so we can test sys.modules to detect
+ # interpreter shutdown.
+ self.sys = sys
+
# active=True occurs before the thread is launched, to avoid a race
_active_threads.append(self)
if self.server_mode:
@@ -1512,94 +1550,102 @@ class Transport (threading.Thread):
else:
self._log(DEBUG, 'starting thread (client mode): %s' % hex(long(id(self)) & 0xffffffffL))
try:
- self.packetizer.write_all(self.local_version + '\r\n')
- self._check_banner()
- self._send_kex_init()
- self._expect_packet(MSG_KEXINIT)
-
- while self.active:
- if self.packetizer.need_rekey() and not self.in_kex:
- self._send_kex_init()
- try:
- ptype, m = self.packetizer.read_message()
- except NeedRekeyException:
- continue
- if ptype == MSG_IGNORE:
- continue
- elif ptype == MSG_DISCONNECT:
- self._parse_disconnect(m)
- self.active = False
- self.packetizer.close()
- break
- elif ptype == MSG_DEBUG:
- self._parse_debug(m)
- continue
- if len(self._expected_packet) > 0:
- if ptype not in self._expected_packet:
- raise SSHException('Expecting packet from %r, got %d' % (self._expected_packet, ptype))
- self._expected_packet = tuple()
- if (ptype >= 30) and (ptype <= 39):
- self.kex_engine.parse_next(ptype, m)
+ try:
+ self.packetizer.write_all(self.local_version + '\r\n')
+ self._check_banner()
+ self._send_kex_init()
+ self._expect_packet(MSG_KEXINIT)
+
+ while self.active:
+ if self.packetizer.need_rekey() and not self.in_kex:
+ self._send_kex_init()
+ try:
+ ptype, m = self.packetizer.read_message()
+ except NeedRekeyException:
continue
-
- if ptype in self._handler_table:
- self._handler_table[ptype](self, m)
- elif ptype in self._channel_handler_table:
- chanid = m.get_int()
- chan = self._channels.get(chanid)
- if chan is not None:
- self._channel_handler_table[ptype](chan, m)
- elif chanid in self.channels_seen:
- self._log(DEBUG, 'Ignoring message for dead channel %d' % chanid)
- else:
- self._log(ERROR, 'Channel request for unknown channel %d' % chanid)
+ if ptype == MSG_IGNORE:
+ continue
+ elif ptype == MSG_DISCONNECT:
+ self._parse_disconnect(m)
self.active = False
self.packetizer.close()
- elif (self.auth_handler is not None) and (ptype in self.auth_handler._handler_table):
- self.auth_handler._handler_table[ptype](self.auth_handler, m)
+ break
+ elif ptype == MSG_DEBUG:
+ self._parse_debug(m)
+ continue
+ if len(self._expected_packet) > 0:
+ if ptype not in self._expected_packet:
+ raise SSHException('Expecting packet from %r, got %d' % (self._expected_packet, ptype))
+ self._expected_packet = tuple()
+ if (ptype >= 30) and (ptype <= 39):
+ self.kex_engine.parse_next(ptype, m)
+ continue
+
+ if ptype in self._handler_table:
+ self._handler_table[ptype](self, m)
+ elif ptype in self._channel_handler_table:
+ chanid = m.get_int()
+ chan = self._channels.get(chanid)
+ if chan is not None:
+ self._channel_handler_table[ptype](chan, m)
+ elif chanid in self.channels_seen:
+ self._log(DEBUG, 'Ignoring message for dead channel %d' % chanid)
+ else:
+ self._log(ERROR, 'Channel request for unknown channel %d' % chanid)
+ self.active = False
+ self.packetizer.close()
+ elif (self.auth_handler is not None) and (ptype in self.auth_handler._handler_table):
+ self.auth_handler._handler_table[ptype](self.auth_handler, m)
+ else:
+ self._log(WARNING, 'Oops, unhandled type %d' % ptype)
+ msg = Message()
+ msg.add_byte(chr(MSG_UNIMPLEMENTED))
+ msg.add_int(m.seqno)
+ self._send_message(msg)
+ except SSHException, e:
+ self._log(ERROR, 'Exception: ' + str(e))
+ self._log(ERROR, util.tb_strings())
+ self.saved_exception = e
+ except EOFError, e:
+ self._log(DEBUG, 'EOF in transport thread')
+ #self._log(DEBUG, util.tb_strings())
+ self.saved_exception = e
+ except socket.error, e:
+ if type(e.args) is tuple:
+ emsg = '%s (%d)' % (e.args[1], e.args[0])
else:
- self._log(WARNING, 'Oops, unhandled type %d' % ptype)
- msg = Message()
- msg.add_byte(chr(MSG_UNIMPLEMENTED))
- msg.add_int(m.seqno)
- self._send_message(msg)
- except SSHException, e:
- self._log(ERROR, 'Exception: ' + str(e))
- self._log(ERROR, util.tb_strings())
- self.saved_exception = e
- except EOFError, e:
- self._log(DEBUG, 'EOF in transport thread')
- #self._log(DEBUG, util.tb_strings())
- self.saved_exception = e
- except socket.error, e:
- if type(e.args) is tuple:
- emsg = '%s (%d)' % (e.args[1], e.args[0])
- else:
- emsg = e.args
- self._log(ERROR, 'Socket exception: ' + emsg)
- self.saved_exception = e
- except Exception, e:
- self._log(ERROR, 'Unknown exception: ' + str(e))
- self._log(ERROR, util.tb_strings())
- self.saved_exception = e
- _active_threads.remove(self)
- for chan in self._channels.values():
- chan._unlink()
- if self.active:
- self.active = False
- self.packetizer.close()
- if self.completion_event != None:
- self.completion_event.set()
- if self.auth_handler is not None:
- self.auth_handler.abort()
- for event in self.channel_events.values():
- event.set()
- try:
- self.lock.acquire()
- self.server_accept_cv.notify()
- finally:
- self.lock.release()
- self.sock.close()
+ emsg = e.args
+ self._log(ERROR, 'Socket exception: ' + emsg)
+ self.saved_exception = e
+ except Exception, e:
+ self._log(ERROR, 'Unknown exception: ' + str(e))
+ self._log(ERROR, util.tb_strings())
+ self.saved_exception = e
+ _active_threads.remove(self)
+ for chan in self._channels.values():
+ chan._unlink()
+ if self.active:
+ self.active = False
+ self.packetizer.close()
+ if self.completion_event != None:
+ self.completion_event.set()
+ if self.auth_handler is not None:
+ self.auth_handler.abort()
+ for event in self.channel_events.values():
+ event.set()
+ try:
+ self.lock.acquire()
+ self.server_accept_cv.notify()
+ finally:
+ self.lock.release()
+ self.sock.close()
+ except:
+ # Don't raise spurious 'NoneType has no attribute X' errors when we
+ # wake up during interpreter shutdown. Or rather -- raise
+ # everything *if* sys.modules (used as a convenient sentinel)
+ # appears to still exist.
+ if self.sys.modules is not None:
+ raise
### protocol stages
@@ -1629,6 +1675,8 @@ class Transport (threading.Thread):
timeout = 2
try:
buf = self.packetizer.readline(timeout)
+ except ProxyCommandFailure:
+ raise
except Exception, x:
raise SSHException('Error reading SSH protocol banner' + str(x))
if buf[:4] == 'SSH-':
@@ -1837,7 +1885,8 @@ class Transport (threading.Thread):
mac_key = self._compute_key('F', mac_engine.digest_size)
else:
mac_key = self._compute_key('E', mac_engine.digest_size)
- self.packetizer.set_outbound_cipher(engine, block_size, mac_engine, mac_size, mac_key)
+ sdctr = self.local_cipher.endswith('-ctr')
+ self.packetizer.set_outbound_cipher(engine, block_size, mac_engine, mac_size, mac_key, sdctr)
compress_out = self._compression_info[self.local_compression][0]
if (compress_out is not None) and ((self.local_compression != 'zlib@openssh.com') or self.authenticated):
self._log(DEBUG, 'Switching on outbound compression ...')
@@ -1980,7 +2029,14 @@ class Transport (threading.Thread):
initial_window_size = m.get_int()
max_packet_size = m.get_int()
reject = False
- if (kind == 'x11') and (self._x11_handler is not None):
+ if (kind == 'auth-agent@openssh.com') and (self._forward_agent_handler is not None):
+ self._log(DEBUG, 'Incoming forward agent connection')
+ self.lock.acquire()
+ try:
+ my_chanid = self._next_channel()
+ finally:
+ self.lock.release()
+ elif (kind == 'x11') and (self._x11_handler is not None):
origin_addr = m.get_string()
origin_port = m.get_int()
self._log(DEBUG, 'Incoming x11 connection from %s:%d' % (origin_addr, origin_port))
@@ -2052,7 +2108,9 @@ class Transport (threading.Thread):
m.add_int(self.max_packet_size)
self._send_message(m)
self._log(INFO, 'Secsh channel %d (%s) opened.', my_chanid, kind)
- if kind == 'x11':
+ if kind == 'auth-agent@openssh.com':
+ self._forward_agent_handler(chan)
+ elif kind == 'x11':
self._x11_handler(chan, (origin_addr, origin_port))
elif kind == 'forwarded-tcpip':
chan.origin_addr = (origin_addr, origin_port)