aboutsummaryrefslogtreecommitdiff
path: root/paramiko/hostkeys.py
diff options
context:
space:
mode:
Diffstat (limited to 'paramiko/hostkeys.py')
-rw-r--r--paramiko/hostkeys.py333
1 files changed, 172 insertions, 161 deletions
diff --git a/paramiko/hostkeys.py b/paramiko/hostkeys.py
index 9bcf0d5..c0caeda 100644
--- a/paramiko/hostkeys.py
+++ b/paramiko/hostkeys.py
@@ -16,118 +16,45 @@
# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
-"""
-L{HostKeys}
-"""
-import base64
import binascii
-from Crypto.Hash import SHA, HMAC
-import UserDict
-
-from paramiko.common import *
-from paramiko.dsskey import DSSKey
-from paramiko.rsakey import RSAKey
-from paramiko.util import get_logger
-from paramiko.ecdsakey import ECDSAKey
-
-
-class InvalidHostKey(Exception):
-
- def __init__(self, line, exc):
- self.line = line
- self.exc = exc
- self.args = (line, exc)
+import os
+from hashlib import sha1
+from hmac import HMAC
-class HostKeyEntry:
- """
- Representation of a line in an OpenSSH-style "known hosts" file.
- """
+from paramiko.py3compat import b, u, encodebytes, decodebytes
- def __init__(self, hostnames=None, key=None):
- self.valid = (hostnames is not None) and (key is not None)
- self.hostnames = hostnames
- self.key = key
-
- def from_line(cls, line, lineno=None):
- """
- Parses the given line of text to find the names for the host,
- the type of key, and the key data. The line is expected to be in the
- format used by the openssh known_hosts file.
-
- Lines are expected to not have leading or trailing whitespace.
- We don't bother to check for comments or empty lines. All of
- that should be taken care of before sending the line to us.
-
- @param line: a line from an OpenSSH known_hosts file
- @type line: str
- """
- log = get_logger('paramiko.hostkeys')
- fields = line.split(' ')
- if len(fields) < 3:
- # Bad number of fields
- log.info("Not enough fields found in known_hosts in line %s (%r)" %
- (lineno, line))
- return None
- fields = fields[:3]
+try:
+ from collections import MutableMapping
+except ImportError:
+ # noinspection PyUnresolvedReferences
+ from UserDict import DictMixin as MutableMapping
- names, keytype, key = fields
- names = names.split(',')
-
- # Decide what kind of key we're looking at and create an object
- # to hold it accordingly.
- try:
- if keytype == 'ssh-rsa':
- key = RSAKey(data=base64.decodestring(key))
- elif keytype == 'ssh-dss':
- key = DSSKey(data=base64.decodestring(key))
- elif keytype == 'ecdsa-sha2-nistp256':
- key = ECDSAKey(data=base64.decodestring(key))
- else:
- log.info("Unable to handle key of type %s" % (keytype,))
- return None
-
- except binascii.Error, e:
- raise InvalidHostKey(line, e)
-
- return cls(names, key)
- from_line = classmethod(from_line)
-
- def to_line(self):
- """
- Returns a string in OpenSSH known_hosts file format, or None if
- the object is not in a valid state. A trailing newline is
- included.
- """
- if self.valid:
- return '%s %s %s\n' % (','.join(self.hostnames), self.key.get_name(),
- self.key.get_base64())
- return None
-
- def __repr__(self):
- return '<HostKeyEntry %r: %r>' % (self.hostnames, self.key)
+from paramiko.dsskey import DSSKey
+from paramiko.rsakey import RSAKey
+from paramiko.util import get_logger, constant_time_bytes_eq
+from paramiko.ecdsakey import ECDSAKey
-class HostKeys (UserDict.DictMixin):
+class HostKeys (MutableMapping):
"""
- Representation of an openssh-style "known hosts" file. Host keys can be
+ Representation of an OpenSSH-style "known hosts" file. Host keys can be
read from one or more files, and then individual hosts can be looked up to
verify server keys during SSH negotiation.
- A HostKeys object can be treated like a dict; any dict lookup is equivalent
- to calling L{lookup}.
+ A `.HostKeys` object can be treated like a dict; any dict lookup is
+ equivalent to calling `lookup`.
- @since: 1.5.3
+ .. versionadded:: 1.5.3
"""
def __init__(self, filename=None):
"""
- Create a new HostKeys object, optionally loading keys from an openssh
+ Create a new HostKeys object, optionally loading keys from an OpenSSH
style host-key file.
- @param filename: filename to load host keys from, or C{None}
- @type filename: str
+ :param str filename: filename to load host keys from, or ``None``
"""
# emulate a dict of { hostname: { keytype: PKey } }
self._entries = []
@@ -137,14 +64,11 @@ class HostKeys (UserDict.DictMixin):
def add(self, hostname, keytype, key):
"""
Add a host key entry to the table. Any existing entry for a
- C{(hostname, keytype)} pair will be replaced.
-
- @param hostname: the hostname (or IP) to add
- @type hostname: str
- @param keytype: key type (C{"ssh-rsa"} or C{"ssh-dss"})
- @type keytype: str
- @param key: the key to add
- @type key: L{PKey}
+ ``(hostname, keytype)`` pair will be replaced.
+
+ :param str hostname: the hostname (or IP) to add
+ :param str keytype: key type (``"ssh-rsa"`` or ``"ssh-dss"``)
+ :param .PKey key: the key to add
"""
for e in self._entries:
if (hostname in e.hostnames) and (e.key.get_name() == keytype):
@@ -154,73 +78,81 @@ class HostKeys (UserDict.DictMixin):
def load(self, filename):
"""
- Read a file of known SSH host keys, in the format used by openssh.
+ Read a file of known SSH host keys, in the format used by OpenSSH.
This type of file unfortunately doesn't exist on Windows, but on
posix, it will usually be stored in
- C{os.path.expanduser("~/.ssh/known_hosts")}.
+ ``os.path.expanduser("~/.ssh/known_hosts")``.
If this method is called multiple times, the host keys are merged,
- not cleared. So multiple calls to C{load} will just call L{add},
+ not cleared. So multiple calls to `load` will just call `add`,
replacing any existing entries and adding new ones.
- @param filename: name of the file to read host keys from
- @type filename: str
+ :param str filename: name of the file to read host keys from
- @raise IOError: if there was an error reading the file
+ :raises IOError: if there was an error reading the file
"""
- f = open(filename, 'r')
- for lineno, line in enumerate(f):
- line = line.strip()
- if (len(line) == 0) or (line[0] == '#'):
- continue
- e = HostKeyEntry.from_line(line, lineno)
- if e is not None:
- _hostnames = e.hostnames
- for h in _hostnames:
- if self.check(h, e.key):
- e.hostnames.remove(h)
- if len(e.hostnames):
- self._entries.append(e)
- f.close()
+ with open(filename, 'r') as f:
+ for lineno, line in enumerate(f):
+ line = line.strip()
+ if (len(line) == 0) or (line[0] == '#'):
+ continue
+ e = HostKeyEntry.from_line(line, lineno)
+ if e is not None:
+ _hostnames = e.hostnames
+ for h in _hostnames:
+ if self.check(h, e.key):
+ e.hostnames.remove(h)
+ if len(e.hostnames):
+ self._entries.append(e)
def save(self, filename):
"""
- Save host keys into a file, in the format used by openssh. The order of
+ Save host keys into a file, in the format used by OpenSSH. The order of
keys in the file will be preserved when possible (if these keys were
loaded from a file originally). The single exception is that combined
lines will be split into individual key lines, which is arguably a bug.
- @param filename: name of the file to write
- @type filename: str
+ :param str filename: name of the file to write
- @raise IOError: if there was an error writing the file
+ :raises IOError: if there was an error writing the file
- @since: 1.6.1
+ .. versionadded:: 1.6.1
"""
- f = open(filename, 'w')
- for e in self._entries:
- line = e.to_line()
- if line:
- f.write(line)
- f.close()
+ with open(filename, 'w') as f:
+ for e in self._entries:
+ line = e.to_line()
+ if line:
+ f.write(line)
def lookup(self, hostname):
"""
Find a hostkey entry for a given hostname or IP. If no entry is found,
- C{None} is returned. Otherwise a dictionary of keytype to key is
- returned. The keytype will be either C{"ssh-rsa"} or C{"ssh-dss"}.
+ ``None`` is returned. Otherwise a dictionary of keytype to key is
+ returned. The keytype will be either ``"ssh-rsa"`` or ``"ssh-dss"``.
- @param hostname: the hostname (or IP) to lookup
- @type hostname: str
- @return: keys associated with this host (or C{None})
- @rtype: dict(str, L{PKey})
+ :param str hostname: the hostname (or IP) to lookup
+ :return: dict of `str` -> `.PKey` keys associated with this host (or ``None``)
"""
- class SubDict (UserDict.DictMixin):
+ class SubDict (MutableMapping):
def __init__(self, hostname, entries, hostkeys):
self._hostname = hostname
self._entries = entries
self._hostkeys = hostkeys
+ def __iter__(self):
+ for k in self.keys():
+ yield k
+
+ def __len__(self):
+ return len(self.keys())
+
+ def __delitem__(self, key):
+ for e in list(self._entries):
+ if e.key.get_name() == key:
+ self._entries.remove(e)
+ else:
+ raise KeyError(key)
+
def __getitem__(self, key):
for e in self._entries:
if e.key.get_name() == key:
@@ -247,7 +179,7 @@ class HostKeys (UserDict.DictMixin):
entries = []
for e in self._entries:
for h in e.hostnames:
- if (h.startswith('|1|') and (self.hash_host(hostname, h) == h)) or (h == hostname):
+ if h.startswith('|1|') and constant_time_bytes_eq(self.hash_host(hostname, h), h) or h == hostname:
entries.append(e)
if len(entries) == 0:
return None
@@ -258,13 +190,10 @@ class HostKeys (UserDict.DictMixin):
Return True if the given key is associated with the given hostname
in this dictionary.
- @param hostname: hostname (or IP) of the SSH server
- @type hostname: str
- @param key: the key to check
- @type key: L{PKey}
- @return: C{True} if the key is associated with the hostname; C{False}
- if not
- @rtype: bool
+ :param str hostname: hostname (or IP) of the SSH server
+ :param .PKey key: the key to check
+ :return:
+ ``True`` if the key is associated with the hostname; else ``False``
"""
k = self.lookup(hostname)
if k is None:
@@ -272,7 +201,7 @@ class HostKeys (UserDict.DictMixin):
host_key = k.get(key.get_name(), None)
if host_key is None:
return False
- return str(host_key) == str(key)
+ return host_key.asbytes() == key.asbytes()
def clear(self):
"""
@@ -280,6 +209,16 @@ class HostKeys (UserDict.DictMixin):
"""
self._entries = []
+ def __iter__(self):
+ for k in self.keys():
+ yield k
+
+ def __len__(self):
+ return len(self.keys())
+
+ def __delitem__(self, key):
+ k = self[key]
+
def __getitem__(self, key):
ret = self.lookup(key)
if ret is None:
@@ -302,7 +241,7 @@ class HostKeys (UserDict.DictMixin):
self._entries.append(HostKeyEntry([hostname], entry[key_type]))
def keys(self):
- # python 2.4 sets would be nice here.
+ # Python 2.4 sets would be nice here.
ret = []
for e in self._entries:
for h in e.hostnames:
@@ -318,25 +257,97 @@ class HostKeys (UserDict.DictMixin):
def hash_host(hostname, salt=None):
"""
- Return a "hashed" form of the hostname, as used by openssh when storing
+ Return a "hashed" form of the hostname, as used by OpenSSH when storing
hashed hostnames in the known_hosts file.
- @param hostname: the hostname to hash
- @type hostname: str
- @param salt: optional salt to use when hashing (must be 20 bytes long)
- @type salt: str
- @return: the hashed hostname
- @rtype: str
+ :param str hostname: the hostname to hash
+ :param str salt: optional salt to use when hashing (must be 20 bytes long)
+ :return: the hashed hostname as a `str`
"""
if salt is None:
- salt = rng.read(SHA.digest_size)
+ salt = os.urandom(sha1().digest_size)
else:
if salt.startswith('|1|'):
salt = salt.split('|')[2]
- salt = base64.decodestring(salt)
- assert len(salt) == SHA.digest_size
- hmac = HMAC.HMAC(salt, hostname, SHA).digest()
- hostkey = '|1|%s|%s' % (base64.encodestring(salt), base64.encodestring(hmac))
+ salt = decodebytes(b(salt))
+ assert len(salt) == sha1().digest_size
+ hmac = HMAC(salt, b(hostname), sha1).digest()
+ hostkey = '|1|%s|%s' % (u(encodebytes(salt)), u(encodebytes(hmac)))
return hostkey.replace('\n', '')
hash_host = staticmethod(hash_host)
+
+class InvalidHostKey(Exception):
+ def __init__(self, line, exc):
+ self.line = line
+ self.exc = exc
+ self.args = (line, exc)
+
+
+class HostKeyEntry:
+ """
+ Representation of a line in an OpenSSH-style "known hosts" file.
+ """
+
+ def __init__(self, hostnames=None, key=None):
+ self.valid = (hostnames is not None) and (key is not None)
+ self.hostnames = hostnames
+ self.key = key
+
+ def from_line(cls, line, lineno=None):
+ """
+ Parses the given line of text to find the names for the host,
+ the type of key, and the key data. The line is expected to be in the
+ format used by the OpenSSH known_hosts file.
+
+ Lines are expected to not have leading or trailing whitespace.
+ We don't bother to check for comments or empty lines. All of
+ that should be taken care of before sending the line to us.
+
+ :param str line: a line from an OpenSSH known_hosts file
+ """
+ log = get_logger('paramiko.hostkeys')
+ fields = line.split(' ')
+ if len(fields) < 3:
+ # Bad number of fields
+ log.info("Not enough fields found in known_hosts in line %s (%r)" %
+ (lineno, line))
+ return None
+ fields = fields[:3]
+
+ names, keytype, key = fields
+ names = names.split(',')
+
+ # Decide what kind of key we're looking at and create an object
+ # to hold it accordingly.
+ try:
+ key = b(key)
+ if keytype == 'ssh-rsa':
+ key = RSAKey(data=decodebytes(key))
+ elif keytype == 'ssh-dss':
+ key = DSSKey(data=decodebytes(key))
+ elif keytype == 'ecdsa-sha2-nistp256':
+ key = ECDSAKey(data=decodebytes(key))
+ else:
+ log.info("Unable to handle key of type %s" % (keytype,))
+ return None
+
+ except binascii.Error as e:
+ raise InvalidHostKey(line, e)
+
+ return cls(names, key)
+ from_line = classmethod(from_line)
+
+ def to_line(self):
+ """
+ Returns a string in OpenSSH known_hosts file format, or None if
+ the object is not in a valid state. A trailing newline is
+ included.
+ """
+ if self.valid:
+ return '%s %s %s\n' % (','.join(self.hostnames), self.key.get_name(),
+ self.key.get_base64())
+ return None
+
+ def __repr__(self):
+ return '<HostKeyEntry %r: %r>' % (self.hostnames, self.key)