aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorJeremy T. Bouse <jbouse@debian.org>2009-11-27 16:20:09 -0500
committerJeremy T. Bouse <jbouse@debian.org>2009-11-27 16:20:09 -0500
commit176c6caf4ea7918e1698438634b237fab8456471 (patch)
tree6e2a8e5be1af2a6ec324fdbf99589aa099f1ec2a /tests
downloadpython-paramiko-176c6caf4ea7918e1698438634b237fab8456471.tar
python-paramiko-176c6caf4ea7918e1698438634b237fab8456471.tar.gz
Imported Upstream version 1.5.2upstream/1.5.2
Diffstat (limited to 'tests')
-rw-r--r--tests/loop.py104
-rw-r--r--tests/stub_sftp.py192
-rw-r--r--tests/test_dss.key12
-rw-r--r--tests/test_dss_password.key15
-rw-r--r--tests/test_file.py153
-rw-r--r--tests/test_kex.py183
-rw-r--r--tests/test_message.py102
-rw-r--r--tests/test_packetizer.py70
-rw-r--r--tests/test_pkey.py140
-rw-r--r--tests/test_rsa.key15
-rw-r--r--tests/test_rsa_password.key18
-rw-r--r--tests/test_sftp.py740
-rw-r--r--tests/test_transport.py573
-rw-r--r--tests/test_util.py80
14 files changed, 2397 insertions, 0 deletions
diff --git a/tests/loop.py b/tests/loop.py
new file mode 100644
index 0000000..ad5f7ca
--- /dev/null
+++ b/tests/loop.py
@@ -0,0 +1,104 @@
+#!/usr/bin/python
+
+# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+...
+"""
+
+import threading, socket
+
+
+class LoopSocket (object):
+ """
+ A LoopSocket looks like a normal socket, but all data written to it is
+ delivered on the read-end of another LoopSocket, and vice versa. It's
+ like a software "socketpair".
+ """
+
+ def __init__(self):
+ self.__in_buffer = ''
+ self.__lock = threading.Lock()
+ self.__cv = threading.Condition(self.__lock)
+ self.__timeout = None
+ self.__mate = None
+
+ def close(self):
+ self.__unlink()
+ try:
+ self.__lock.acquire()
+ self.__in_buffer = ''
+ finally:
+ self.__lock.release()
+
+ def send(self, data):
+ if self.__mate is None:
+ # EOF
+ raise EOFError()
+ self.__mate.__feed(data)
+ return len(data)
+
+ def recv(self, n):
+ self.__lock.acquire()
+ try:
+ if self.__mate is None:
+ # EOF
+ return ''
+ if len(self.__in_buffer) == 0:
+ self.__cv.wait(self.__timeout)
+ if len(self.__in_buffer) == 0:
+ raise socket.timeout
+ if n < self.__in_buffer:
+ out = self.__in_buffer[:n]
+ self.__in_buffer = self.__in_buffer[n:]
+ else:
+ out = self.__in_buffer
+ self.__in_buffer = ''
+ return out
+ finally:
+ self.__lock.release()
+
+ def settimeout(self, n):
+ self.__timeout = n
+
+ def link(self, other):
+ self.__mate = other
+ self.__mate.__mate = self
+
+ def __feed(self, data):
+ self.__lock.acquire()
+ try:
+ self.__in_buffer += data
+ self.__cv.notifyAll()
+ finally:
+ self.__lock.release()
+
+ def __unlink(self):
+ m = None
+ self.__lock.acquire()
+ try:
+ if self.__mate is not None:
+ m = self.__mate
+ self.__mate = None
+ finally:
+ self.__lock.release()
+ if m is not None:
+ m.__unlink()
+
+
diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py
new file mode 100644
index 0000000..4b8b9c3
--- /dev/null
+++ b/tests/stub_sftp.py
@@ -0,0 +1,192 @@
+#!/usr/bin/python
+
+# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+A stub SFTP server for loopback SFTP testing.
+"""
+
+import os
+from paramiko import ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, \
+ SFTPHandle, SFTP_OK, AUTH_SUCCESSFUL, OPEN_SUCCEEDED
+
+
+class StubServer (ServerInterface):
+ def check_auth_password(self, username, password):
+ # all are allowed
+ return AUTH_SUCCESSFUL
+
+ def check_channel_request(self, kind, chanid):
+ return OPEN_SUCCEEDED
+
+
+class StubSFTPHandle (SFTPHandle):
+ def stat(self):
+ try:
+ return SFTPAttributes.from_stat(os.fstat(self.readfile.fileno()))
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+
+ def chattr(self, attr):
+ # python doesn't have equivalents to fchown or fchmod, so we have to
+ # use the stored filename
+ try:
+ SFTPServer.set_file_attr(self.filename, attr)
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+
+
+class StubSFTPServer (SFTPServerInterface):
+ # assume current folder is a fine root
+ # (the tests always create and eventualy delete a subfolder, so there shouldn't be any mess)
+ ROOT = os.getcwd()
+
+ def _realpath(self, path):
+ return self.ROOT + self.canonicalize(path)
+
+ def list_folder(self, path):
+ path = self._realpath(path)
+ try:
+ out = [ ]
+ flist = os.listdir(path)
+ for fname in flist:
+ attr = SFTPAttributes.from_stat(os.stat(os.path.join(path, fname)))
+ attr.filename = fname
+ out.append(attr)
+ return out
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+
+ def stat(self, path):
+ path = self._realpath(path)
+ try:
+ return SFTPAttributes.from_stat(os.stat(path))
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+
+ def lstat(self, path):
+ path = self._realpath(path)
+ try:
+ return SFTPAttributes.from_stat(os.lstat(path))
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+
+ def open(self, path, flags, attr):
+ path = self._realpath(path)
+ try:
+ fd = os.open(path, flags)
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+ if (flags & os.O_CREAT) and (attr is not None):
+ SFTPServer.set_file_attr(path, attr)
+ if flags & os.O_WRONLY:
+ fstr = 'w'
+ elif flags & os.O_RDWR:
+ fstr = 'r+'
+ else:
+ # O_RDONLY (== 0)
+ fstr = 'r'
+ try:
+ f = os.fdopen(fd, fstr)
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+ fobj = StubSFTPHandle()
+ fobj.filename = path
+ fobj.readfile = f
+ fobj.writefile = f
+ return fobj
+
+ def remove(self, path):
+ path = self._realpath(path)
+ try:
+ os.remove(path)
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def rename(self, oldpath, newpath):
+ oldpath = self._realpath(oldpath)
+ newpath = self._realpath(newpath)
+ try:
+ os.rename(oldpath, newpath)
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def mkdir(self, path, attr):
+ path = self._realpath(path)
+ try:
+ os.mkdir(path)
+ if attr is not None:
+ SFTPServer.set_file_attr(path, attr)
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def rmdir(self, path):
+ path = self._realpath(path)
+ try:
+ os.rmdir(path)
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def chattr(self, path, attr):
+ path = self._realpath(path)
+ try:
+ SFTPServer.set_file_attr(path, attr)
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def symlink(self, target_path, path):
+ path = self._realpath(path)
+ if (len(target_path) > 0) and (target_path[0] == '/'):
+ # absolute symlink
+ target_path = os.path.join(self.ROOT, target_path[1:])
+ if target_path[:2] == '//':
+ # bug in os.path.join
+ target_path = target_path[1:]
+ else:
+ # compute relative to path
+ abspath = os.path.join(os.path.dirname(path), target_path)
+ if abspath[:len(self.ROOT)] != self.ROOT:
+ # this symlink isn't going to work anyway -- just break it immediately
+ target_path = '<error>'
+ try:
+ os.symlink(target_path, path)
+ except:
+ return SFTPServer.convert_errno(e.errno)
+ return SFTP_OK
+
+ def readlink(self, path):
+ path = self._realpath(path)
+ try:
+ symlink = os.readlink(path)
+ except OSError, e:
+ return SFTPServer.convert_errno(e.errno)
+ # if it's absolute, remove the root
+ if os.path.isabs(symlink):
+ if symlink[:len(self.ROOT)] == self.ROOT:
+ symlink = symlink[len(self.ROOT):]
+ if (len(symlink) == 0) or (symlink[0] != '/'):
+ symlink = '/' + symlink
+ else:
+ symlink = '<error>'
+ return symlink
diff --git a/tests/test_dss.key b/tests/test_dss.key
new file mode 100644
index 0000000..e10807f
--- /dev/null
+++ b/tests/test_dss.key
@@ -0,0 +1,12 @@
+-----BEGIN DSA PRIVATE KEY-----
+MIIBuwIBAAKBgQDngaYDZ30c6/7cJgEEbtl8FgKdwhba1Z7oOrOn4MI/6C42G1bY
+wMuqZf4dBCglsdq39SHrcjbE8Vq54gPSOh3g4+uV9Rcg5IOoPLbwp2jQfF6f1FIb
+sx7hrDCIqUcQccPSxetPBKmXI9RN8rZLaFuQeTnI65BKM98Ruwvq6SI2LwIVAPDP
+hSeawaJI27mKqOfe5PPBSmyHAoGBAJMXxXmPD9sGaQ419DIpmZecJKBUAy9uXD8x
+gbgeDpwfDaFJP8owByCKREocPFfi86LjCuQkyUKOfjYMN6iHIf1oEZjB8uJAatUr
+FzI0ArXtUqOhwTLwTyFuUojE5own2WYsOAGByvgfyWjsGhvckYNhI4ODpNdPlxQ8
+ZamaPGPsAoGARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmn
+jO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacI
+BlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgECFGI9QPSc
+h9pT9XHqn+1rZ4bK+QGA
+-----END DSA PRIVATE KEY-----
diff --git a/tests/test_dss_password.key b/tests/test_dss_password.key
new file mode 100644
index 0000000..e2a9bc5
--- /dev/null
+++ b/tests/test_dss_password.key
@@ -0,0 +1,15 @@
+-----BEGIN DSA PRIVATE KEY-----
+Proc-Type: 4,ENCRYPTED
+DEK-Info: DES-EDE3-CBC,78DAEB836ED0A646
+
+ldWkq9OMlXqWmjIqppNnmNPIUj5uVT12LkBosTApTbibTme3kIJb1uDeG2BShVfY
++vDOTUE9koGPDLsxW1t5At+EVyIDK8aIO0uHteXM5AbBX20LLUWRbRVqZhsMxqQh
+3H3XlHiN+QhaWcb4fFuu18a8SkimTFpDnZuffoCDl/zh/B7XieARTLA805K/ZgVB
+BBwflkR2BE053XHrJAIx9BEUlLP76Fo18rvjLZOSeu3s+VnnhqUb5FCt5h50a46u
+YXQBbo2r9Zo1ilGMNEXJO0gk5hwGVmTySz53NkPA5HmWt8NIzv5jQHMDy7N+ZykF
+uwpP1R5M/ZIFY4Y5h/lvn6IJjQ7VySRPIbpN8o2YJv2OD1Ja80n3tU8Mg77o3o4d
+NwKm7cCjlq+FuIBdOsSgsB8FPQRUhW+jpFDxmWN64DM2cEg6RUdptby7WmMp0HwK
+1qyEfxHjLMuDVlD7lASIDBrRlUjPtXEH1DzIYQuYaRZaixFoZ7EY+X73TwmrKFEU
+US9ZnQZtRtroRqGwR4fz4wQQsjTl/AmOijlBmi29taJccJsT/THrLQ5plOEd8OMv
+9FsaPJXBU85gaRKo3JZtrw==
+-----END DSA PRIVATE KEY-----
diff --git a/tests/test_file.py b/tests/test_file.py
new file mode 100644
index 0000000..250821c
--- /dev/null
+++ b/tests/test_file.py
@@ -0,0 +1,153 @@
+#!/usr/bin/python
+
+# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Some unit tests for the BufferedFile abstraction.
+"""
+
+import unittest
+from paramiko.file import BufferedFile
+
+
+class LoopbackFile (BufferedFile):
+ """
+ BufferedFile object that you can write data into, and then read it back.
+ """
+ def __init__(self, mode='r', bufsize=-1):
+ BufferedFile.__init__(self)
+ self._set_mode(mode, bufsize)
+ self.buffer = ''
+
+ def _read(self, size):
+ if len(self.buffer) == 0:
+ return None
+ if size > len(self.buffer):
+ size = len(self.buffer)
+ data = self.buffer[:size]
+ self.buffer = self.buffer[size:]
+ return data
+
+ def _write(self, data):
+ self.buffer += data
+ return len(data)
+
+
+class BufferedFileTest (unittest.TestCase):
+
+ def test_1_simple(self):
+ f = LoopbackFile('r')
+ try:
+ f.write('hi')
+ self.assert_(False, 'no exception on write to read-only file')
+ except:
+ pass
+ f.close()
+
+ f = LoopbackFile('w')
+ try:
+ f.read(1)
+ self.assert_(False, 'no exception to read from write-only file')
+ except:
+ pass
+ f.close()
+
+ def test_2_readline(self):
+ f = LoopbackFile('r+U')
+ f.write('First line.\nSecond line.\r\nThird line.\nFinal line non-terminated.')
+ self.assertEqual(f.readline(), 'First line.\n')
+ # universal newline mode should convert this linefeed:
+ self.assertEqual(f.readline(), 'Second line.\n')
+ # truncated line:
+ self.assertEqual(f.readline(7), 'Third l')
+ self.assertEqual(f.readline(), 'ine.\n')
+ self.assertEqual(f.readline(), 'Final line non-terminated.')
+ self.assertEqual(f.readline(), '')
+ f.close()
+ try:
+ f.readline()
+ self.assert_(False, 'no exception on readline of closed file')
+ except IOError:
+ pass
+ self.assert_('\n' in f.newlines)
+ self.assert_('\r\n' in f.newlines)
+ self.assert_('\r' not in f.newlines)
+
+ def test_3_lf(self):
+ """
+ try to trick the linefeed detector.
+ """
+ f = LoopbackFile('r+U')
+ f.write('First line.\r')
+ self.assertEqual(f.readline(), 'First line.\n')
+ f.write('\nSecond.\r\n')
+ self.assertEqual(f.readline(), 'Second.\n')
+ f.close()
+ self.assertEqual(f.newlines, '\r\n')
+
+ def test_4_write(self):
+ """
+ verify that write buffering is on.
+ """
+ f = LoopbackFile('r+', 1)
+ f.write('Complete line.\nIncomplete line.')
+ self.assertEqual(f.readline(), 'Complete line.\n')
+ self.assertEqual(f.readline(), '')
+ f.write('..\n')
+ self.assertEqual(f.readline(), 'Incomplete line...\n')
+ f.close()
+
+ def test_5_flush(self):
+ """
+ verify that flush will force a write.
+ """
+ f = LoopbackFile('r+', 512)
+ f.write('Not\nquite\n512 bytes.\n')
+ self.assertEqual(f.read(1), '')
+ f.flush()
+ self.assertEqual(f.read(5), 'Not\nq')
+ self.assertEqual(f.read(10), 'uite\n512 b')
+ self.assertEqual(f.read(9), 'ytes.\n')
+ self.assertEqual(f.read(3), '')
+ f.close()
+
+ def test_6_buffering(self):
+ """
+ verify that flushing happens automatically on buffer crossing.
+ """
+ f = LoopbackFile('r+', 16)
+ f.write('Too small.')
+ self.assertEqual(f.read(4), '')
+ f.write(' ')
+ self.assertEqual(f.read(4), '')
+ f.write('Enough.')
+ self.assertEqual(f.read(20), 'Too small. Enough.')
+ f.close()
+
+ def test_7_read_all(self):
+ """
+ verify that read(-1) returns everything left in the file.
+ """
+ f = LoopbackFile('r+', 16)
+ f.write('The first thing you need to do is open your eyes. ')
+ f.write('Then, you need to close them again.\n')
+ s = f.read(-1)
+ self.assertEqual(s, 'The first thing you need to do is open your eyes. Then, you ' +
+ 'need to close them again.\n')
+ f.close()
diff --git a/tests/test_kex.py b/tests/test_kex.py
new file mode 100644
index 0000000..2680853
--- /dev/null
+++ b/tests/test_kex.py
@@ -0,0 +1,183 @@
+#!/usr/bin/python
+
+# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Some unit tests for the key exchange protocols.
+"""
+
+import unittest
+import paramiko.util
+from paramiko.kex_group1 import KexGroup1
+from paramiko.kex_gex import KexGex
+from paramiko import Message
+
+
+class FakeRandpool (object):
+ def stir(self):
+ pass
+ def get_bytes(self, n):
+ return chr(0xcc) * n
+
+class FakeKey (object):
+ def __str__(self):
+ return 'fake-key'
+ def sign_ssh_data(self, randpool, H):
+ return 'fake-sig'
+
+class FakeModulusPack (object):
+ P = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFFL
+ G = 2
+ def get_modulus(self, min, ask, max):
+ return self.G, self.P
+
+class FakeTransport (object):
+ randpool = FakeRandpool()
+ local_version = 'SSH-2.0-paramiko_1.0'
+ remote_version = 'SSH-2.0-lame'
+ local_kex_init = 'local-kex-init'
+ remote_kex_init = 'remote-kex-init'
+
+ def _send_message(self, m):
+ self._message = m
+ def _expect_packet(self, t):
+ self._expect = t
+ def _set_K_H(self, K, H):
+ self._K = K
+ self._H = H
+ def _verify_key(self, host_key, sig):
+ self._verify = (host_key, sig)
+ def _activate_outbound(self):
+ self._activated = True
+ def _log(self, level, s):
+ pass
+ def get_server_key(self):
+ return FakeKey()
+ def _get_modulus_pack(self):
+ return FakeModulusPack()
+
+
+class KexTest (unittest.TestCase):
+
+ K = 14730343317708716439807310032871972459448364195094179797249681733965528989482751523943515690110179031004049109375612685505881911274101441415545039654102474376472240501616988799699744135291070488314748284283496055223852115360852283821334858541043710301057312858051901453919067023103730011648890038847384890504L
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_1_group1_client(self):
+ transport = FakeTransport()
+ transport.server_mode = False
+ kex = KexGroup1(transport)
+ kex.start_kex()
+ x = '1E000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4'
+ self.assertEquals(x, paramiko.util.hexify(str(transport._message)))
+ self.assertEquals(paramiko.kex_group1._MSG_KEXDH_REPLY, transport._expect)
+
+ # fake "reply"
+ msg = Message()
+ msg.add_string('fake-host-key')
+ msg.add_mpint(69)
+ msg.add_string('fake-sig')
+ msg.rewind()
+ kex.parse_next(paramiko.kex_group1._MSG_KEXDH_REPLY, msg)
+ H = '03079780F3D3AD0B3C6DB30C8D21685F367A86D2'
+ self.assertEquals(self.K, transport._K)
+ self.assertEquals(H, paramiko.util.hexify(transport._H))
+ self.assertEquals(('fake-host-key', 'fake-sig'), transport._verify)
+ self.assert_(transport._activated)
+
+ def test_2_group1_server(self):
+ transport = FakeTransport()
+ transport.server_mode = True
+ kex = KexGroup1(transport)
+ kex.start_kex()
+ self.assertEquals(paramiko.kex_group1._MSG_KEXDH_INIT, transport._expect)
+
+ msg = Message()
+ msg.add_mpint(69)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_group1._MSG_KEXDH_INIT, msg)
+ H = 'B16BF34DD10945EDE84E9C1EF24A14BFDC843389'
+ x = '1F0000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967'
+ self.assertEquals(self.K, transport._K)
+ self.assertEquals(H, paramiko.util.hexify(transport._H))
+ self.assertEquals(x, paramiko.util.hexify(str(transport._message)))
+ self.assert_(transport._activated)
+
+ def test_3_gex_client(self):
+ transport = FakeTransport()
+ transport.server_mode = False
+ kex = KexGex(transport)
+ kex.start_kex()
+ x = '22000004000000080000002000'
+ self.assertEquals(x, paramiko.util.hexify(str(transport._message)))
+ self.assertEquals(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, transport._expect)
+
+ msg = Message()
+ msg.add_mpint(FakeModulusPack.P)
+ msg.add_mpint(FakeModulusPack.G)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
+ x = '20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4'
+ self.assertEquals(x, paramiko.util.hexify(str(transport._message)))
+ self.assertEquals(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, transport._expect)
+
+ msg = Message()
+ msg.add_string('fake-host-key')
+ msg.add_mpint(69)
+ msg.add_string('fake-sig')
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg)
+ H = 'A265563F2FA87F1A89BF007EE90D58BE2E4A4BD0'
+ self.assertEquals(self.K, transport._K)
+ self.assertEquals(H, paramiko.util.hexify(transport._H))
+ self.assertEquals(('fake-host-key', 'fake-sig'), transport._verify)
+ self.assert_(transport._activated)
+
+ def test_4_gex_server(self):
+ transport = FakeTransport()
+ transport.server_mode = True
+ kex = KexGex(transport)
+ kex.start_kex()
+ self.assertEquals(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, transport._expect)
+
+ msg = Message()
+ msg.add_int(1024)
+ msg.add_int(2048)
+ msg.add_int(4096)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg)
+ x = '1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102'
+ self.assertEquals(x, paramiko.util.hexify(str(transport._message)))
+ self.assertEquals(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, transport._expect)
+
+ msg = Message()
+ msg.add_mpint(12345)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
+ K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581L
+ H = 'CE754197C21BF3452863B4F44D0B3951F12516EF'
+ x = '210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967'
+ self.assertEquals(K, transport._K)
+ self.assertEquals(H, paramiko.util.hexify(transport._H))
+ self.assertEquals(x, paramiko.util.hexify(str(transport._message)))
+ self.assert_(transport._activated)
diff --git a/tests/test_message.py b/tests/test_message.py
new file mode 100644
index 0000000..441e3ce
--- /dev/null
+++ b/tests/test_message.py
@@ -0,0 +1,102 @@
+#!/usr/bin/python
+
+# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Some unit tests for ssh protocol message blocks.
+"""
+
+import unittest
+from paramiko.message import Message
+
+
+class MessageTest (unittest.TestCase):
+
+ __a = '\x00\x00\x00\x17\x07\x60\xe0\x90\x00\x00\x00\x01q\x00\x00\x00\x05hello\x00\x00\x03\xe8' + ('x' * 1000)
+ __b = '\x01\x00\xf3\x00\x3f\x00\x00\x00\x10huey,dewey,louie'
+ __c = '\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x01\x11\x00\x00\x00\x07\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x06\x9a\x1b\x2c\x3d\x4e\xf7'
+ __d = '\x00\x00\x00\x05\x00\x00\x00\x05\x11\x22\x33\x44\x55\x01\x00\x00\x00\x03cat\x00\x00\x00\x03a,b'
+
+ def test_1_encode(self):
+ msg = Message()
+ msg.add_int(23)
+ msg.add_int(123789456)
+ msg.add_string('q')
+ msg.add_string('hello')
+ msg.add_string('x' * 1000)
+ self.assertEquals(str(msg), self.__a)
+
+ msg = Message()
+ msg.add_boolean(True)
+ msg.add_boolean(False)
+ msg.add_byte('\xf3')
+ msg.add_bytes('\x00\x3f')
+ msg.add_list(['huey', 'dewey', 'louie'])
+ self.assertEquals(str(msg), self.__b)
+
+ msg = Message()
+ msg.add_int64(5)
+ msg.add_int64(0xf5e4d3c2b109L)
+ msg.add_mpint(17)
+ msg.add_mpint(0xf5e4d3c2b109L)
+ msg.add_mpint(-0x65e4d3c2b109L)
+ self.assertEquals(str(msg), self.__c)
+
+ def test_2_decode(self):
+ msg = Message(self.__a)
+ self.assertEquals(msg.get_int(), 23)
+ self.assertEquals(msg.get_int(), 123789456)
+ self.assertEquals(msg.get_string(), 'q')
+ self.assertEquals(msg.get_string(), 'hello')
+ self.assertEquals(msg.get_string(), 'x' * 1000)
+
+ msg = Message(self.__b)
+ self.assertEquals(msg.get_boolean(), True)
+ self.assertEquals(msg.get_boolean(), False)
+ self.assertEquals(msg.get_byte(), '\xf3')
+ self.assertEquals(msg.get_bytes(2), '\x00\x3f')
+ self.assertEquals(msg.get_list(), ['huey', 'dewey', 'louie'])
+
+ msg = Message(self.__c)
+ self.assertEquals(msg.get_int64(), 5)
+ self.assertEquals(msg.get_int64(), 0xf5e4d3c2b109L)
+ self.assertEquals(msg.get_mpint(), 17)
+ self.assertEquals(msg.get_mpint(), 0xf5e4d3c2b109L)
+ self.assertEquals(msg.get_mpint(), -0x65e4d3c2b109L)
+
+ def test_3_add(self):
+ msg = Message()
+ msg.add(5)
+ msg.add(0x1122334455L)
+ msg.add(True)
+ msg.add('cat')
+ msg.add(['a', 'b'])
+ self.assertEquals(str(msg), self.__d)
+
+ def test_4_misc(self):
+ msg = Message(self.__d)
+ self.assertEquals(msg.get_int(), 5)
+ self.assertEquals(msg.get_mpint(), 0x1122334455L)
+ self.assertEquals(msg.get_so_far(), self.__d[:13])
+ self.assertEquals(msg.get_remainder(), self.__d[13:])
+ msg.rewind()
+ self.assertEquals(msg.get_int(), 5)
+ self.assertEquals(msg.get_so_far(), self.__d[:4])
+ self.assertEquals(msg.get_remainder(), self.__d[4:])
+
diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py
new file mode 100644
index 0000000..8c992bd
--- /dev/null
+++ b/tests/test_packetizer.py
@@ -0,0 +1,70 @@
+# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Some unit tests for the ssh2 protocol in Transport.
+"""
+
+import unittest
+from loop import LoopSocket
+from Crypto.Cipher import AES
+from Crypto.Hash import SHA, HMAC
+from paramiko import Message, Packetizer, util
+
+class PacketizerTest (unittest.TestCase):
+
+ def test_1_write (self):
+ rsock = LoopSocket()
+ wsock = LoopSocket()
+ rsock.link(wsock)
+ p = Packetizer(wsock)
+ p.set_log(util.get_logger('paramiko.transport'))
+ p.set_hexdump(True)
+ cipher = AES.new('\x00' * 16, AES.MODE_CBC, '\x55' * 16)
+ p.set_outbound_cipher(cipher, 16, SHA, 12, '\x1f' * 20)
+
+ # message has to be at least 16 bytes long, so we'll have at least one
+ # block of data encrypted that contains zero random padding bytes
+ m = Message()
+ m.add_byte(chr(100))
+ m.add_int(100)
+ m.add_int(1)
+ m.add_int(900)
+ p.send_message(m)
+ data = rsock.recv(100)
+ # 32 + 12 bytes of MAC = 44
+ self.assertEquals(44, len(data))
+ self.assertEquals('\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0', data[:16])
+
+ def test_2_read (self):
+ rsock = LoopSocket()
+ wsock = LoopSocket()
+ rsock.link(wsock)
+ p = Packetizer(rsock)
+ p.set_log(util.get_logger('paramiko.transport'))
+ p.set_hexdump(True)
+ cipher = AES.new('\x00' * 16, AES.MODE_CBC, '\x55' * 16)
+ p.set_inbound_cipher(cipher, 16, SHA, 12, '\x1f' * 20)
+
+ wsock.send('C\x91\x97\xbd[P\xac%\x87\xc2\xc4k\xc7\xe98\xc0' + \
+ '\x90\xd2\x16V\rqsa8|L=\xfb\x97}\xe2n\x03\xb1\xa0\xc2\x1c\xd6AAL\xb4Y')
+ cmd, m = p.read_message()
+ self.assertEquals(100, cmd)
+ self.assertEquals(100, m.get_int())
+ self.assertEquals(1, m.get_int())
+ self.assertEquals(900, m.get_int())
diff --git a/tests/test_pkey.py b/tests/test_pkey.py
new file mode 100644
index 0000000..e56edb1
--- /dev/null
+++ b/tests/test_pkey.py
@@ -0,0 +1,140 @@
+# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Some unit tests for public/private key objects.
+"""
+
+import unittest
+from paramiko import RSAKey, DSSKey, Message, util, randpool
+
+# from openssh's ssh-keygen
+PUB_RSA = 'ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4c='
+PUB_DSS = 'ssh-dss AAAAB3NzaC1kc3MAAACBAOeBpgNnfRzr/twmAQRu2XwWAp3CFtrVnug6s6fgwj/oLjYbVtjAy6pl/h0EKCWx2rf1IetyNsTxWrniA9I6HeDj65X1FyDkg6g8tvCnaNB8Xp/UUhuzHuGsMIipRxBxw9LF608EqZcj1E3ytktoW5B5OcjrkEoz3xG7C+rpIjYvAAAAFQDwz4UnmsGiSNu5iqjn3uTzwUpshwAAAIEAkxfFeY8P2wZpDjX0MimZl5wkoFQDL25cPzGBuB4OnB8NoUk/yjAHIIpEShw8V+LzouMK5CTJQo5+Ngw3qIch/WgRmMHy4kBq1SsXMjQCte1So6HBMvBPIW5SiMTmjCfZZiw4AYHK+B/JaOwaG9yRg2Ejg4Ok10+XFDxlqZo8Y+wAAACARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmnjO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacIBlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgE='
+FINGER_RSA = '1024 60:73:38:44:cb:51:86:65:7f:de:da:a2:2b:5a:57:d5'
+FINGER_DSS = '1024 44:78:f0:b9:a2:3c:c5:18:20:09:ff:75:5b:c1:d2:6c'
+SIGNED_RSA = '20:d7:8a:31:21:cb:f7:92:12:f2:a4:89:37:f5:78:af:e6:16:b6:25:b9:97:3d:a2:cd:5f:ca:20:21:73:4c:ad:34:73:8f:20:77:28:e2:94:15:08:d8:91:40:7a:85:83:bf:18:37:95:dc:54:1a:9b:88:29:6c:73:ca:38:b4:04:f1:56:b9:f2:42:9d:52:1b:29:29:b4:4f:fd:c9:2d:af:47:d2:40:76:30:f3:63:45:0c:d9:1d:43:86:0f:1c:70:e2:93:12:34:f3:ac:c5:0a:2f:14:50:66:59:f1:88:ee:c1:4a:e9:d1:9c:4e:46:f0:0e:47:6f:38:74:f1:44:a8'
+
+
+class KeyTest (unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_1_generate_key_bytes(self):
+ from Crypto.Hash import MD5
+ key = util.generate_key_bytes(MD5, '\x01\x02\x03\x04', 'happy birthday', 30)
+ exp = util.unhexify('61E1F272F4C1C4561586BD322498C0E924672780F47BB37DDA7D54019E64')
+ self.assertEquals(exp, key)
+
+ def test_2_load_rsa(self):
+ key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ self.assertEquals('ssh-rsa', key.get_name())
+ exp_rsa = FINGER_RSA.split()[1].replace(':', '')
+ my_rsa = util.hexify(key.get_fingerprint()).lower()
+ self.assertEquals(exp_rsa, my_rsa)
+ self.assertEquals(PUB_RSA.split()[1], key.get_base64())
+ self.assertEquals(1024, key.get_bits())
+
+ def test_3_load_rsa_password(self):
+ key = RSAKey.from_private_key_file('tests/test_rsa_password.key', 'television')
+ self.assertEquals('ssh-rsa', key.get_name())
+ exp_rsa = FINGER_RSA.split()[1].replace(':', '')
+ my_rsa = util.hexify(key.get_fingerprint()).lower()
+ self.assertEquals(exp_rsa, my_rsa)
+ self.assertEquals(PUB_RSA.split()[1], key.get_base64())
+ self.assertEquals(1024, key.get_bits())
+
+ def test_4_load_dss(self):
+ key = DSSKey.from_private_key_file('tests/test_dss.key')
+ self.assertEquals('ssh-dss', key.get_name())
+ exp_dss = FINGER_DSS.split()[1].replace(':', '')
+ my_dss = util.hexify(key.get_fingerprint()).lower()
+ self.assertEquals(exp_dss, my_dss)
+ self.assertEquals(PUB_DSS.split()[1], key.get_base64())
+ self.assertEquals(1024, key.get_bits())
+
+ def test_5_load_dss_password(self):
+ key = DSSKey.from_private_key_file('tests/test_dss_password.key', 'television')
+ self.assertEquals('ssh-dss', key.get_name())
+ exp_dss = FINGER_DSS.split()[1].replace(':', '')
+ my_dss = util.hexify(key.get_fingerprint()).lower()
+ self.assertEquals(exp_dss, my_dss)
+ self.assertEquals(PUB_DSS.split()[1], key.get_base64())
+ self.assertEquals(1024, key.get_bits())
+
+ def test_6_compare_rsa(self):
+ # verify that the private & public keys compare equal
+ key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ self.assertEquals(key, key)
+ pub = RSAKey(data=str(key))
+ self.assert_(key.can_sign())
+ self.assert_(not pub.can_sign())
+ self.assertEquals(key, pub)
+
+ def test_7_compare_dss(self):
+ # verify that the private & public keys compare equal
+ key = DSSKey.from_private_key_file('tests/test_dss.key')
+ self.assertEquals(key, key)
+ pub = DSSKey(data=str(key))
+ self.assert_(key.can_sign())
+ self.assert_(not pub.can_sign())
+ self.assertEquals(key, pub)
+
+ def test_8_sign_rsa(self):
+ # verify that the rsa private key can sign and verify
+ key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ msg = key.sign_ssh_data(randpool, 'ice weasels')
+ self.assert_(type(msg) is Message)
+ msg.rewind()
+ self.assertEquals('ssh-rsa', msg.get_string())
+ sig = ''.join([chr(int(x, 16)) for x in SIGNED_RSA.split(':')])
+ self.assertEquals(sig, msg.get_string())
+ msg.rewind()
+ pub = RSAKey(data=str(key))
+ self.assert_(pub.verify_ssh_sig('ice weasels', msg))
+
+ def test_9_sign_dss(self):
+ # verify that the dss private key can sign and verify
+ key = DSSKey.from_private_key_file('tests/test_dss.key')
+ msg = key.sign_ssh_data(randpool, 'ice weasels')
+ self.assert_(type(msg) is Message)
+ msg.rewind()
+ self.assertEquals('ssh-dss', msg.get_string())
+ # can't do the same test as we do for RSA, because DSS signatures
+ # are usually different each time. but we can test verification
+ # anyway so it's ok.
+ self.assertEquals(40, len(msg.get_string()))
+ msg.rewind()
+ pub = DSSKey(data=str(key))
+ self.assert_(pub.verify_ssh_sig('ice weasels', msg))
+
+ def test_A_generate_rsa(self):
+ key = RSAKey.generate(1024)
+ msg = key.sign_ssh_data(randpool, 'jerri blank')
+ msg.rewind()
+ self.assert_(key.verify_ssh_sig('jerri blank', msg))
+
+ def test_B_generate_dss(self):
+ key = DSSKey.generate(1024)
+ msg = key.sign_ssh_data(randpool, 'jerri blank')
+ msg.rewind()
+ self.assert_(key.verify_ssh_sig('jerri blank', msg))
diff --git a/tests/test_rsa.key b/tests/test_rsa.key
new file mode 100644
index 0000000..f50e9c5
--- /dev/null
+++ b/tests/test_rsa.key
@@ -0,0 +1,15 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz
+oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/
+d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB
+gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0
+EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon
+soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H
+tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU
+avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA
+4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g
+H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv
+qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV
+HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc
+nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7
+-----END RSA PRIVATE KEY-----
diff --git a/tests/test_rsa_password.key b/tests/test_rsa_password.key
new file mode 100644
index 0000000..7713049
--- /dev/null
+++ b/tests/test_rsa_password.key
@@ -0,0 +1,18 @@
+-----BEGIN RSA PRIVATE KEY-----
+Proc-Type: 4,ENCRYPTED
+DEK-Info: DES-EDE3-CBC,DAA422E8A5A8EFB7
+
++nssHGmWl91IcmGiE6DdCIqGvAP04tuLh60wLjWBvdjtF9CjztPnF57xe+6pBk7o
+YgF/Ry3ik9ZV9rHNcRXifDKM9crxtYlpUlkM2C0SP89sXaO0P1Q1yCnrtZUwDIKO
+BNV8et5X7+AGMFsy/nmv0NFMrbpoG03Dppsloecd29NTRlIXwxHRFyHxy6BdEib/
+Dn0mEVbwg3dTvKrd/sODWR9hRwpDGM9nkEbUNJCh7vMwFKkIZZF8yqFvmGckuO5C
+HZkDJ6RkEDYrSZJAavQaiOPF5bu3cHughRfnrIKVrQuTTDiWjwX9Ny8e4p4k7dy7
+rLpbPhtxUOUbpOF7T1QxljDi1Tcq3Ebk3kN/ZLPRFnDrJfyUx+m9BXmAa78Wxs/l
+KaS8DTkYykd3+EGOeJFjZg2bvgqil4V+5JIt/+MQ5pZ/ui7i4GcH2bvZyGAbrXzP
+3LipSAdN5RG+fViLe3HUtfCx4ZAgtU78TWJrLk2FwKQGglFxKLnswp+IKZb09rZV
+uxmG4pPLUnH+mMYdiy5ugzj+5C8iZ0/IstpHVmO6GWROfedpJ82eMztTOtdhfMep
+8Z3HwAwkDtksL7Gq9klb0Wq5+uRlBWetixddAvnmqXNzYhaANWcAF/2a2Hz06Rb0
+e6pe/g0Ek5KV+6YI+D+oEblG0Sr+d4NtxtDTmIJKNVkmzlhI2s53bHp6txCb5JWJ
+S8mKLPBBBzaNXYd3odDvGXguuxUntWSsD11KyR6B9DXMIfWQW5dT7hp5kTMGlXWJ
+lD2hYab13DCCuAkwVTdpzhHYLZyxLYoSu05W6z8SAOs=
+-----END RSA PRIVATE KEY-----
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
new file mode 100644
index 0000000..993899a
--- /dev/null
+++ b/tests/test_sftp.py
@@ -0,0 +1,740 @@
+# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+some unit tests to make sure sftp works.
+
+a real actual sftp server is contacted, and a new folder is created there to
+do test file operations in (so no existing files will be harmed).
+"""
+
+import logging
+import os
+import random
+import sys
+import threading
+import time
+import unittest
+
+import paramiko
+from stub_sftp import StubServer, StubSFTPServer
+from loop import LoopSocket
+
+ARTICLE = '''
+Insulin sensitivity and liver insulin receptor structure in ducks from two
+genera
+
+T. Constans, B. Chevalier, M. Derouet and J. Simon
+Station de Recherches Avicoles, Institut National de la Recherche Agronomique,
+Nouzilly, France.
+
+Insulin sensitivity and liver insulin receptor structure were studied in
+5-wk-old ducks from two genera (Muscovy and Pekin). In the fasting state, both
+duck types were equally resistant to exogenous insulin compared with chicken.
+Despite the low potency of duck insulin, the number of insulin receptors was
+lower in Muscovy duck and similar in Pekin duck and chicken liver membranes.
+After 125I-insulin cross-linking, the size of the alpha-subunit of the
+receptors from the three species was 135,000. Wheat germ agglutinin-purified
+receptors from the three species were contaminated by an active and unusual
+adenosinetriphosphatase (ATPase) contaminant (highest activity in Muscovy
+duck). Sequential purification of solubilized receptor from both duck types on
+lentil and then wheat germ agglutinin lectins led to a fraction of receptors
+very poor in ATPase activity that exhibited a beta-subunit size (95,000) and
+tyrosine kinase activity similar to those of ATPase-free chicken insulin
+receptors. Therefore the ducks from the two genera exhibit an alpha-beta-
+structure for liver insulin receptors and a clear difference in the number of
+liver insulin receptors. Their sensitivity to insulin is, however, similarly
+decreased compared with chicken.
+'''
+
+FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000')
+
+sftp = None
+tc = None
+g_big_file_test = True
+
+
+class SFTPTest (unittest.TestCase):
+
+ def init(hostname, username, keyfile, passwd):
+ global sftp, tc
+
+ t = paramiko.Transport(hostname)
+ tc = t
+ try:
+ key = paramiko.RSAKey.from_private_key_file(keyfile, passwd)
+ except paramiko.PasswordRequiredException:
+ sys.stderr.write('\n\nparamiko.RSAKey.from_private_key_file REQUIRES PASSWORD.\n')
+ sys.stderr.write('You have two options:\n')
+ sys.stderr.write('* Use the "-K" option to point to a different (non-password-protected)\n')
+ sys.stderr.write(' private key file.\n')
+ sys.stderr.write('* Use the "-P" option to provide the password needed to unlock this private\n')
+ sys.stderr.write(' key.\n')
+ sys.stderr.write('\n')
+ sys.exit(1)
+ try:
+ t.connect(username=username, pkey=key)
+ except paramiko.SSHException:
+ t.close()
+ sys.stderr.write('\n\nparamiko.Transport.connect FAILED.\n')
+ sys.stderr.write('There are several possible reasons why it might fail so quickly:\n\n')
+ sys.stderr.write('* The host to connect to (%s) is not a valid SSH server.\n' % hostname)
+ sys.stderr.write(' (Use the "-H" option to change the host.)\n')
+ sys.stderr.write('* The username to auth as (%s) is invalid.\n' % username)
+ sys.stderr.write(' (Use the "-U" option to change the username.)\n')
+ sys.stderr.write('* The private key given (%s) is not accepted by the server.\n' % keyfile)
+ sys.stderr.write(' (Use the "-K" option to provide a different key file.)\n')
+ sys.stderr.write('\n')
+ sys.exit(1)
+ sftp = paramiko.SFTP.from_transport(t)
+ init = staticmethod(init)
+
+ def init_loopback():
+ global sftp, tc
+
+ socks = LoopSocket()
+ sockc = LoopSocket()
+ sockc.link(socks)
+ tc = paramiko.Transport(sockc)
+ ts = paramiko.Transport(socks)
+
+ host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
+ ts.add_server_key(host_key)
+ event = threading.Event()
+ server = StubServer()
+ ts.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer)
+ ts.start_server(event, server)
+ tc.connect(username='slowdive', password='pygmalion')
+ event.wait(1.0)
+
+ sftp = paramiko.SFTP.from_transport(tc)
+ init_loopback = staticmethod(init_loopback)
+
+ def set_big_file_test(onoff):
+ global g_big_file_test
+ g_big_file_test = onoff
+ set_big_file_test = staticmethod(set_big_file_test)
+
+ def setUp(self):
+ global FOLDER
+ for i in xrange(1000):
+ FOLDER = FOLDER[:-3] + '%03d' % i
+ try:
+ sftp.mkdir(FOLDER)
+ break
+ except (IOError, OSError):
+ pass
+
+ def tearDown(self):
+ sftp.rmdir(FOLDER)
+
+ def test_1_file(self):
+ """
+ verify that we can create a file.
+ """
+ f = sftp.open(FOLDER + '/test', 'w')
+ try:
+ self.assertEqual(f.stat().st_size, 0)
+ f.close()
+ finally:
+ sftp.remove(FOLDER + '/test')
+
+ def test_2_close(self):
+ """
+ verify that closing the sftp session doesn't do anything bad, and that
+ a new one can be opened.
+ """
+ global sftp
+ sftp.close()
+ try:
+ sftp.open(FOLDER + '/test2', 'w')
+ self.fail('expected exception')
+ except:
+ pass
+ sftp = paramiko.SFTP.from_transport(tc)
+
+ def test_3_write(self):
+ """
+ verify that a file can be created and written, and the size is correct.
+ """
+ f = sftp.open(FOLDER + '/duck.txt', 'w')
+ try:
+ f.write(ARTICLE)
+ f.close()
+ self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483)
+ finally:
+ sftp.remove(FOLDER + '/duck.txt')
+
+ def test_4_append(self):
+ """
+ verify that a file can be opened for append, and tell() still works.
+ """
+ f = sftp.open(FOLDER + '/append.txt', 'w')
+ try:
+ f.write('first line\nsecond line\n')
+ self.assertEqual(f.tell(), 23)
+ f.close()
+
+ f = sftp.open(FOLDER + '/append.txt', 'a+')
+ f.write('third line!!!\n')
+ self.assertEqual(f.tell(), 37)
+ self.assertEqual(f.stat().st_size, 37)
+ f.seek(-26, f.SEEK_CUR)
+ self.assertEqual(f.readline(), 'second line\n')
+ f.close()
+ finally:
+ sftp.remove(FOLDER + '/append.txt')
+
+ def test_5_rename(self):
+ """
+ verify that renaming a file works.
+ """
+ f = sftp.open(FOLDER + '/first.txt', 'w')
+ try:
+ f.write('content!\n');
+ f.close()
+ sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt')
+ try:
+ f = sftp.open(FOLDER + '/first.txt', 'r')
+ self.assert_(False, 'no exception on reading nonexistent file')
+ except IOError:
+ pass
+ f = sftp.open(FOLDER + '/second.txt', 'r')
+ f.seek(-6, f.SEEK_END)
+ self.assertEqual(f.read(4), 'tent')
+ f.close()
+ finally:
+ try:
+ sftp.remove(FOLDER + '/first.txt')
+ except:
+ pass
+ try:
+ sftp.remove(FOLDER + '/second.txt')
+ except:
+ pass
+
+ def test_6_folder(self):
+ """
+ create a temporary folder, verify that we can create a file in it, then
+ remove the folder and verify that we can't create a file in it anymore.
+ """
+ sftp.mkdir(FOLDER + '/subfolder')
+ f = sftp.open(FOLDER + '/subfolder/test', 'w')
+ f.close()
+ sftp.remove(FOLDER + '/subfolder/test')
+ sftp.rmdir(FOLDER + '/subfolder')
+ try:
+ f = sftp.open(FOLDER + '/subfolder/test')
+ # shouldn't be able to create that file
+ self.assert_(False, 'no exception at dummy file creation')
+ except IOError:
+ pass
+
+ def test_7_listdir(self):
+ """
+ verify that a folder can be created, a bunch of files can be placed in it,
+ and those files show up in sftp.listdir.
+ """
+ try:
+ f = sftp.open(FOLDER + '/duck.txt', 'w')
+ f.close()
+
+ f = sftp.open(FOLDER + '/fish.txt', 'w')
+ f.close()
+
+ f = sftp.open(FOLDER + '/tertiary.py', 'w')
+ f.close()
+
+ x = sftp.listdir(FOLDER)
+ self.assertEqual(len(x), 3)
+ self.assert_('duck.txt' in x)
+ self.assert_('fish.txt' in x)
+ self.assert_('tertiary.py' in x)
+ self.assert_('random' not in x)
+ finally:
+ sftp.remove(FOLDER + '/duck.txt')
+ sftp.remove(FOLDER + '/fish.txt')
+ sftp.remove(FOLDER + '/tertiary.py')
+
+ def test_8_setstat(self):
+ """
+ verify that the setstat functions (chown, chmod, utime) work.
+ """
+ f = sftp.open(FOLDER + '/special', 'w')
+ try:
+ f.close()
+
+ stat = sftp.stat(FOLDER + '/special')
+ sftp.chmod(FOLDER + '/special', (stat.st_mode & ~0777) | 0600)
+ self.assertEqual(sftp.stat(FOLDER + '/special').st_mode & 0777, 0600)
+
+ mtime = stat.st_mtime - 3600
+ atime = stat.st_atime - 1800
+ sftp.utime(FOLDER + '/special', (atime, mtime))
+ nstat = sftp.stat(FOLDER + '/special')
+ self.assertEqual(nstat.st_mtime, mtime)
+ self.assertEqual(nstat.st_atime, atime)
+
+ # can't really test chown, since we'd have to know a valid uid.
+ finally:
+ sftp.remove(FOLDER + '/special')
+
+ def test_9_readline_seek(self):
+ """
+ create a text file and write a bunch of text into it. then count the lines
+ in the file, and seek around to retreive particular lines. this should
+ verify that read buffering and 'tell' work well together, and that read
+ buffering is reset on 'seek'.
+ """
+ try:
+ f = sftp.open(FOLDER + '/duck.txt', 'w')
+ f.write(ARTICLE)
+ f.close()
+
+ f = sftp.open(FOLDER + '/duck.txt', 'r+')
+ line_number = 0
+ loc = 0
+ pos_list = []
+ for line in f:
+ line_number += 1
+ pos_list.append(loc)
+ loc = f.tell()
+ f.seek(pos_list[6], f.SEEK_SET)
+ self.assertEqual(f.readline(), 'Nouzilly, France.\n')
+ f.seek(pos_list[17], f.SEEK_SET)
+ self.assertEqual(f.readline()[:4], 'duck')
+ f.seek(pos_list[10], f.SEEK_SET)
+ self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n')
+ f.close()
+ finally:
+ sftp.remove(FOLDER + '/duck.txt')
+
+ def test_A_write_seek(self):
+ """
+ create a text file, seek back and change part of it, and verify that the
+ changes worked.
+ """
+ f = sftp.open(FOLDER + '/testing.txt', 'w')
+ try:
+ f.write('hello kitty.\n')
+ f.seek(-5, f.SEEK_CUR)
+ f.write('dd')
+ f.close()
+
+ self.assertEqual(sftp.stat(FOLDER + '/testing.txt').st_size, 13)
+ f = sftp.open(FOLDER + '/testing.txt', 'r')
+ data = f.read(20)
+ f.close()
+ self.assertEqual(data, 'hello kiddy.\n')
+ finally:
+ sftp.remove(FOLDER + '/testing.txt')
+
+ def test_B_symlink(self):
+ """
+ create a symlink and then check that lstat doesn't follow it.
+ """
+ f = sftp.open(FOLDER + '/original.txt', 'w')
+ try:
+ f.write('original\n')
+ f.close()
+ sftp.symlink('original.txt', FOLDER + '/link.txt')
+ self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt')
+
+ f = sftp.open(FOLDER + '/link.txt', 'r')
+ self.assertEqual(f.readlines(), [ 'original\n' ])
+ f.close()
+
+ cwd = sftp.normalize('.')
+ if cwd[-1] == '/':
+ cwd = cwd[:-1]
+ abs_path = cwd + '/' + FOLDER + '/original.txt'
+ sftp.symlink(abs_path, FOLDER + '/link2.txt')
+ self.assertEqual(abs_path, sftp.readlink(FOLDER + '/link2.txt'))
+
+ self.assertEqual(sftp.lstat(FOLDER + '/link.txt').st_size, 12)
+ self.assertEqual(sftp.stat(FOLDER + '/link.txt').st_size, 9)
+ # the sftp server may be hiding extra path members from us, so the
+ # length may be longer than we expect:
+ self.assert_(sftp.lstat(FOLDER + '/link2.txt').st_size >= len(abs_path))
+ self.assertEqual(sftp.stat(FOLDER + '/link2.txt').st_size, 9)
+ self.assertEqual(sftp.stat(FOLDER + '/original.txt').st_size, 9)
+ finally:
+ try:
+ sftp.remove(FOLDER + '/link.txt')
+ except:
+ pass
+ try:
+ sftp.remove(FOLDER + '/link2.txt')
+ except:
+ pass
+ try:
+ sftp.remove(FOLDER + '/original.txt')
+ except:
+ pass
+
+ def test_C_flush_seek(self):
+ """
+ verify that buffered writes are automatically flushed on seek.
+ """
+ f = sftp.open(FOLDER + '/happy.txt', 'w', 1)
+ try:
+ f.write('full line.\n')
+ f.write('partial')
+ f.seek(9, f.SEEK_SET)
+ f.write('?\n')
+ f.close()
+
+ f = sftp.open(FOLDER + '/happy.txt', 'r')
+ self.assertEqual(f.readline(), 'full line?\n')
+ self.assertEqual(f.read(7), 'partial')
+ f.close()
+ finally:
+ try:
+ sftp.remove(FOLDER + '/happy.txt')
+ except:
+ pass
+
+ def test_D_lots_of_files(self):
+ """
+ create a bunch of files over the same session.
+ """
+ global g_big_file_test
+ if not g_big_file_test:
+ return
+ numfiles = 100
+ try:
+ for i in range(numfiles):
+ f = sftp.open('%s/file%d.txt' % (FOLDER, i), 'w', 1)
+ f.write('this is file #%d.\n' % i)
+ f.close()
+ sftp.chmod('%s/file%d.txt' % (FOLDER, i), 0660)
+
+ # now make sure every file is there, by creating a list of filenmes
+ # and reading them in random order.
+ numlist = range(numfiles)
+ while len(numlist) > 0:
+ r = numlist[random.randint(0, len(numlist) - 1)]
+ f = sftp.open('%s/file%d.txt' % (FOLDER, r))
+ self.assertEqual(f.readline(), 'this is file #%d.\n' % r)
+ f.close()
+ numlist.remove(r)
+ finally:
+ for i in range(numfiles):
+ try:
+ sftp.remove('%s/file%d.txt' % (FOLDER, i))
+ except:
+ pass
+
+ def test_E_big_file(self):
+ """
+ write a 1MB file with no buffering.
+ """
+ global g_big_file_test
+ if not g_big_file_test:
+ return
+ kblob = (1024 * 'x')
+ start = time.time()
+ try:
+ f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
+ for n in range(1024):
+ f.write(kblob)
+ if n % 128 == 0:
+ sys.stderr.write('.')
+ f.close()
+ sys.stderr.write(' ')
+
+ self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ end = time.time()
+ sys.stderr.write('%ds ' % round(end - start))
+
+ start = time.time()
+ f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
+ for n in range(1024):
+ data = f.read(1024)
+ self.assertEqual(data, kblob)
+ f.close()
+
+ end = time.time()
+ sys.stderr.write('%ds ' % round(end - start))
+ finally:
+ sftp.remove('%s/hongry.txt' % FOLDER)
+
+ def test_F_big_file_pipelined(self):
+ """
+ write a 1MB file, with no linefeeds, using pipelining.
+ """
+ global g_big_file_test
+ if not g_big_file_test:
+ return
+ kblob = (1024 * 'x')
+ start = time.time()
+ try:
+ f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
+ f.set_pipelined(True)
+ for n in range(1024):
+ f.write(kblob)
+ if n % 128 == 0:
+ sys.stderr.write('.')
+ f.close()
+ sys.stderr.write(' ')
+
+ self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ end = time.time()
+ sys.stderr.write('%ds ' % round(end - start))
+
+ start = time.time()
+ f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
+ f.prefetch()
+ for n in range(1024):
+ data = f.read(1024)
+ self.assertEqual(data, kblob)
+ f.close()
+
+ end = time.time()
+ sys.stderr.write('%ds ' % round(end - start))
+ finally:
+ sftp.remove('%s/hongry.txt' % FOLDER)
+
+ def test_G_lots_of_prefetching(self):
+ """
+ prefetch a 1MB file a bunch of times, discarding the file object
+ without using it, to verify that paramiko doesn't get confused.
+ """
+ global g_big_file_test
+ if not g_big_file_test:
+ return
+ kblob = (1024 * 'x')
+ try:
+ f = sftp.open('%s/hongry.txt' % FOLDER, 'w')
+ f.set_pipelined(True)
+ for n in range(1024):
+ f.write(kblob)
+ if n % 128 == 0:
+ sys.stderr.write('.')
+ f.close()
+ sys.stderr.write(' ')
+
+ self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+
+ for i in range(10):
+ f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
+ f.prefetch()
+ f = sftp.open('%s/hongry.txt' % FOLDER, 'r')
+ f.prefetch()
+ for n in range(1024):
+ data = f.read(1024)
+ self.assertEqual(data, kblob)
+ if n % 128 == 0:
+ sys.stderr.write('.')
+ f.close()
+ sys.stderr.write(' ')
+ finally:
+ sftp.remove('%s/hongry.txt' % FOLDER)
+
+ def test_H_big_file_big_buffer(self):
+ """
+ write a 1MB file, with no linefeeds, and a big buffer.
+ """
+ global g_big_file_test
+ if not g_big_file_test:
+ return
+ mblob = (1024 * 1024 * 'x')
+ try:
+ f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024)
+ f.write(mblob)
+ f.close()
+
+ self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ finally:
+ sftp.remove('%s/hongry.txt' % FOLDER)
+
+ def test_I_big_file_renegotiate(self):
+ """
+ write a 1MB file, forcing key renegotiation in the middle.
+ """
+ global g_big_file_test
+ if not g_big_file_test:
+ return
+ t = sftp.sock.get_transport()
+ t.packetizer.REKEY_BYTES = 512 * 1024
+ k32blob = (32 * 1024 * 'x')
+ try:
+ f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024)
+ for i in xrange(32):
+ f.write(k32blob)
+ f.close()
+
+ self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024)
+ self.assertNotEquals(t.H, t.session_id)
+ finally:
+ sftp.remove('%s/hongry.txt' % FOLDER)
+ t.packetizer.REKEY_BYTES = pow(2, 30)
+
+ def test_J_realpath(self):
+ """
+ test that realpath is returning something non-empty and not an
+ error.
+ """
+ pwd = sftp.normalize('.')
+ self.assert_(len(pwd) > 0)
+ f = sftp.normalize('./' + FOLDER)
+ self.assert_(len(f) > 0)
+ self.assertEquals(os.path.join(pwd, FOLDER), f)
+
+ def test_K_mkdir(self):
+ """
+ verify that mkdir/rmdir work.
+ """
+ try:
+ sftp.mkdir(FOLDER + '/subfolder')
+ except:
+ self.assert_(False, 'exception creating subfolder')
+ try:
+ sftp.mkdir(FOLDER + '/subfolder')
+ self.assert_(False, 'no exception overwriting subfolder')
+ except IOError:
+ pass
+ try:
+ sftp.rmdir(FOLDER + '/subfolder')
+ except:
+ self.assert_(False, 'exception removing subfolder')
+ try:
+ sftp.rmdir(FOLDER + '/subfolder')
+ self.assert_(False, 'no exception removing nonexistent subfolder')
+ except IOError:
+ pass
+
+ def test_L_chdir(self):
+ """
+ verify that chdir/getcwd work.
+ """
+ root = sftp.normalize('.')
+ if root[-1] != '/':
+ root += '/'
+ try:
+ sftp.mkdir(FOLDER + '/alpha')
+ sftp.chdir(FOLDER + '/alpha')
+ sftp.mkdir('beta')
+ self.assertEquals(root + FOLDER + '/alpha', sftp.getcwd())
+ self.assertEquals(['beta'], sftp.listdir('.'))
+
+ sftp.chdir('beta')
+ f = sftp.open('fish', 'w')
+ f.write('hello\n')
+ f.close()
+ sftp.chdir('..')
+ self.assertEquals(['fish'], sftp.listdir('beta'))
+ sftp.chdir('..')
+ self.assertEquals(['fish'], sftp.listdir('alpha/beta'))
+ finally:
+ sftp.chdir(root)
+ try:
+ sftp.unlink(FOLDER + '/alpha/beta/fish')
+ except:
+ pass
+ try:
+ sftp.rmdir(FOLDER + '/alpha/beta')
+ except:
+ pass
+ try:
+ sftp.rmdir(FOLDER + '/alpha')
+ except:
+ pass
+
+ def test_M_get_put(self):
+ """
+ verify that get/put work.
+ """
+ import os, warnings
+ warnings.filterwarnings('ignore', 'tempnam.*')
+
+ localname = os.tempnam()
+ text = 'All I wanted was a plastic bunny rabbit.\n'
+ f = open(localname, 'w')
+ f.write(text)
+ f.close()
+ sftp.put(localname, FOLDER + '/bunny.txt')
+
+ f = sftp.open(FOLDER + '/bunny.txt', 'r')
+ self.assertEquals(text, f.read(128))
+ f.close()
+
+ os.unlink(localname)
+ localname = os.tempnam()
+ sftp.get(FOLDER + '/bunny.txt', localname)
+
+ f = open(localname, 'r')
+ self.assertEquals(text, f.read(128))
+ f.close()
+
+ os.unlink(localname)
+ sftp.unlink(FOLDER + '/bunny.txt')
+
+ def test_N_check(self):
+ """
+ verify that file.check() works against our own server.
+ (it's an sftp extension that we support, and may be the only ones who
+ support it.)
+ """
+ f = sftp.open(FOLDER + '/kitty.txt', 'w')
+ f.write('here kitty kitty' * 64)
+ f.close()
+
+ try:
+ f = sftp.open(FOLDER + '/kitty.txt', 'r')
+ sum = f.check('sha1')
+ self.assertEquals('91059CFC6615941378D413CB5ADAF4C5EB293402', paramiko.util.hexify(sum))
+ sum = f.check('md5', 0, 512)
+ self.assertEquals('93DE4788FCA28D471516963A1FE3856A', paramiko.util.hexify(sum))
+ sum = f.check('md5', 0, 0, 510)
+ self.assertEquals('EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6',
+ paramiko.util.hexify(sum))
+ finally:
+ sftp.unlink(FOLDER + '/kitty.txt')
+
+ def test_O_x_flag(self):
+ """
+ verify that the 'x' flag works when opening a file.
+ """
+ f = sftp.open(FOLDER + '/unusual.txt', 'wx')
+ f.close()
+
+ try:
+ try:
+ f = sftp.open(FOLDER + '/unusual.txt', 'wx')
+ self.fail('expected exception')
+ except IOError, x:
+ pass
+ finally:
+ sftp.unlink(FOLDER + '/unusual.txt')
+
+ def test_P_utf8(self):
+ """
+ verify that unicode strings are encoded into utf8 correctly.
+ """
+ f = sftp.open(FOLDER + '/something', 'w')
+ f.write('okay')
+ f.close()
+
+ try:
+ sftp.rename(FOLDER + '/something', FOLDER + u'/\u00fcnic\u00f8de')
+ sftp.open(FOLDER + '/\xc3\xbcnic\xc3\xb8\x64\x65', 'r')
+ except Exception, e:
+ self.fail('exception ' + e)
+ sftp.unlink(FOLDER + '/\xc3\xbcnic\xc3\xb8\x64\x65')
+
diff --git a/tests/test_transport.py b/tests/test_transport.py
new file mode 100644
index 0000000..5fcc786
--- /dev/null
+++ b/tests/test_transport.py
@@ -0,0 +1,573 @@
+# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Some unit tests for the ssh2 protocol in Transport.
+"""
+
+import sys, time, threading, unittest
+import select
+from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \
+ SSHException, BadAuthenticationType, InteractiveQuery, util
+from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL
+from paramiko import OPEN_SUCCEEDED
+from loop import LoopSocket
+
+
+class NullServer (ServerInterface):
+ paranoid_did_password = False
+ paranoid_did_public_key = False
+ paranoid_key = DSSKey.from_private_key_file('tests/test_dss.key')
+
+ def get_allowed_auths(self, username):
+ if username == 'slowdive':
+ return 'publickey,password'
+ if username == 'paranoid':
+ if not self.paranoid_did_password and not self.paranoid_did_public_key:
+ return 'publickey,password'
+ elif self.paranoid_did_password:
+ return 'publickey'
+ else:
+ return 'password'
+ if username == 'commie':
+ return 'keyboard-interactive'
+ return 'publickey'
+
+ def check_auth_password(self, username, password):
+ if (username == 'slowdive') and (password == 'pygmalion'):
+ return AUTH_SUCCESSFUL
+ if (username == 'paranoid') and (password == 'paranoid'):
+ # 2-part auth (even openssh doesn't support this)
+ self.paranoid_did_password = True
+ if self.paranoid_did_public_key:
+ return AUTH_SUCCESSFUL
+ return AUTH_PARTIALLY_SUCCESSFUL
+ return AUTH_FAILED
+
+ def check_auth_publickey(self, username, key):
+ if (username == 'paranoid') and (key == self.paranoid_key):
+ # 2-part auth
+ self.paranoid_did_public_key = True
+ if self.paranoid_did_password:
+ return AUTH_SUCCESSFUL
+ return AUTH_PARTIALLY_SUCCESSFUL
+ return AUTH_FAILED
+
+ def check_auth_interactive(self, username, submethods):
+ if username == 'commie':
+ self.username = username
+ return InteractiveQuery('password', 'Please enter a password.', ('Password', False))
+ return AUTH_FAILED
+
+ def check_auth_interactive_response(self, responses):
+ if self.username == 'commie':
+ if (len(responses) == 1) and (responses[0] == 'cat'):
+ return AUTH_SUCCESSFUL
+ return AUTH_FAILED
+
+ def check_channel_request(self, kind, chanid):
+ return OPEN_SUCCEEDED
+
+ def check_channel_exec_request(self, channel, command):
+ if command != 'yes':
+ return False
+ return True
+
+ def check_channel_shell_request(self, channel):
+ return True
+
+ def check_global_request(self, kind, msg):
+ self._global_request = kind
+ return False
+
+
+class TransportTest (unittest.TestCase):
+
+ def setUp(self):
+ self.socks = LoopSocket()
+ self.sockc = LoopSocket()
+ self.sockc.link(self.socks)
+ self.tc = Transport(self.sockc)
+ self.ts = Transport(self.socks)
+
+ def tearDown(self):
+ self.tc.close()
+ self.ts.close()
+ self.socks.close()
+ self.sockc.close()
+
+ def test_1_security_options(self):
+ o = self.tc.get_security_options()
+ self.assertEquals(type(o), SecurityOptions)
+ self.assert_(('aes256-cbc', 'blowfish-cbc') != o.ciphers)
+ o.ciphers = ('aes256-cbc', 'blowfish-cbc')
+ self.assertEquals(('aes256-cbc', 'blowfish-cbc'), o.ciphers)
+ try:
+ o.ciphers = ('aes256-cbc', 'made-up-cipher')
+ self.assert_(False)
+ except ValueError:
+ pass
+ try:
+ o.ciphers = 23
+ self.assert_(False)
+ except TypeError:
+ pass
+
+ def test_2_compute_key(self):
+ self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929L
+ self.tc.H = util.unhexify('0C8307CDE6856FF30BA93684EB0F04C2520E9ED3')
+ self.tc.session_id = self.tc.H
+ key = self.tc._compute_key('C', 32)
+ self.assertEquals('207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995',
+ util.hexify(key))
+
+ def test_3_simple(self):
+ """
+ verify that we can establish an ssh link with ourselves across the
+ loopback sockets. this is hardly "simple" but it's simpler than the
+ later tests. :)
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assert_(not event.isSet())
+ self.assertEquals(None, self.tc.get_username())
+ self.assertEquals(None, self.ts.get_username())
+ self.assertEquals(False, self.tc.is_authenticated())
+ self.assertEquals(False, self.ts.is_authenticated())
+ self.ts.start_server(event, server)
+ self.tc.connect(hostkey=public_host_key,
+ username='slowdive', password='pygmalion')
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+ self.assertEquals('slowdive', self.tc.get_username())
+ self.assertEquals('slowdive', self.ts.get_username())
+ self.assertEquals(True, self.tc.is_authenticated())
+ self.assertEquals(True, self.ts.is_authenticated())
+
+ def test_4_special(self):
+ """
+ verify that the client can demand odd handshake settings, and can
+ renegotiate keys in mid-stream.
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assert_(not event.isSet())
+ self.ts.start_server(event, server)
+ options = self.tc.get_security_options()
+ options.ciphers = ('aes256-cbc',)
+ options.digests = ('hmac-md5-96',)
+ self.tc.connect(hostkey=public_host_key,
+ username='slowdive', password='pygmalion')
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+ self.assertEquals('aes256-cbc', self.tc.local_cipher)
+ self.assertEquals('aes256-cbc', self.tc.remote_cipher)
+ self.assertEquals(12, self.tc.packetizer.get_mac_size_out())
+ self.assertEquals(12, self.tc.packetizer.get_mac_size_in())
+
+ self.tc.send_ignore(1024)
+ self.assert_(self.tc.renegotiate_keys())
+ self.ts.send_ignore(1024)
+
+ def test_5_keepalive(self):
+ """
+ verify that the keepalive will be sent.
+ """
+ self.tc.set_hexdump(True)
+
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assert_(not event.isSet())
+ self.ts.start_server(event, server)
+ self.tc.connect(hostkey=public_host_key,
+ username='slowdive', password='pygmalion')
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+
+ self.assertEquals(None, getattr(server, '_global_request', None))
+ self.tc.set_keepalive(1)
+ time.sleep(2)
+ self.assertEquals('keepalive@lag.net', server._global_request)
+
+ def test_6_bad_auth_type(self):
+ """
+ verify that we get the right exception when an unsupported auth
+ type is requested.
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assert_(not event.isSet())
+ self.ts.start_server(event, server)
+ try:
+ self.tc.connect(hostkey=public_host_key,
+ username='unknown', password='error')
+ self.assert_(False)
+ except:
+ etype, evalue, etb = sys.exc_info()
+ self.assertEquals(BadAuthenticationType, etype)
+ self.assertEquals(['publickey'], evalue.allowed_types)
+
+ def test_7_bad_password(self):
+ """
+ verify that a bad password gets the right exception, and that a retry
+ with the right password works.
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assert_(not event.isSet())
+ self.ts.start_server(event, server)
+ self.tc.ultra_debug = True
+ self.tc.connect(hostkey=public_host_key)
+ try:
+ self.tc.auth_password(username='slowdive', password='error')
+ self.assert_(False)
+ except:
+ etype, evalue, etb = sys.exc_info()
+ self.assertEquals(SSHException, etype)
+ self.tc.auth_password(username='slowdive', password='pygmalion')
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+
+ def test_8_multipart_auth(self):
+ """
+ verify that multipart auth works.
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assert_(not event.isSet())
+ self.ts.start_server(event, server)
+ self.tc.ultra_debug = True
+ self.tc.connect(hostkey=public_host_key)
+ remain = self.tc.auth_password(username='paranoid', password='paranoid')
+ self.assertEquals(['publickey'], remain)
+ key = DSSKey.from_private_key_file('tests/test_dss.key')
+ remain = self.tc.auth_publickey(username='paranoid', key=key)
+ self.assertEquals([], remain)
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+
+ def test_9_interactive_auth(self):
+ """
+ verify keyboard-interactive auth works.
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assert_(not event.isSet())
+ self.ts.start_server(event, server)
+ self.tc.ultra_debug = True
+ self.tc.connect(hostkey=public_host_key)
+
+ def handler(title, instructions, prompts):
+ self.got_title = title
+ self.got_instructions = instructions
+ self.got_prompts = prompts
+ return ['cat']
+ remain = self.tc.auth_interactive('commie', handler)
+ self.assertEquals(self.got_title, 'password')
+ self.assertEquals(self.got_prompts, [('Password', False)])
+ self.assertEquals([], remain)
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+
+ def test_A_interactive_auth_fallback(self):
+ """
+ verify that a password auth attempt will fallback to "interactive"
+ if password auth isn't supported but interactive is.
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assert_(not event.isSet())
+ self.ts.start_server(event, server)
+ self.tc.ultra_debug = True
+ self.tc.connect(hostkey=public_host_key)
+ remain = self.tc.auth_password('commie', 'cat')
+ self.assertEquals([], remain)
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+
+ def test_B_exec_command(self):
+ """
+ verify that exec_command() does something reasonable.
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assert_(not event.isSet())
+ self.ts.start_server(event, server)
+ self.tc.ultra_debug = True
+ self.tc.connect(hostkey=public_host_key)
+ self.tc.auth_password(username='slowdive', password='pygmalion')
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+
+ chan = self.tc.open_session()
+ schan = self.ts.accept(1.0)
+ self.assert_(not chan.exec_command('no'))
+
+ chan = self.tc.open_session()
+ self.assert_(chan.exec_command('yes'))
+ schan = self.ts.accept(1.0)
+ schan.send('Hello there.\n')
+ schan.send_stderr('This is on stderr.\n')
+ schan.close()
+
+ f = chan.makefile()
+ self.assertEquals('Hello there.\n', f.readline())
+ self.assertEquals('', f.readline())
+ f = chan.makefile_stderr()
+ self.assertEquals('This is on stderr.\n', f.readline())
+ self.assertEquals('', f.readline())
+
+ # now try it with combined stdout/stderr
+ chan = self.tc.open_session()
+ self.assert_(chan.exec_command('yes'))
+ schan = self.ts.accept(1.0)
+ schan.send('Hello there.\n')
+ schan.send_stderr('This is on stderr.\n')
+ schan.close()
+
+ chan.set_combine_stderr(True)
+ f = chan.makefile()
+ self.assertEquals('Hello there.\n', f.readline())
+ self.assertEquals('This is on stderr.\n', f.readline())
+ self.assertEquals('', f.readline())
+
+ def test_C_invoke_shell(self):
+ """
+ verify that invoke_shell() does something reasonable.
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assert_(not event.isSet())
+ self.ts.start_server(event, server)
+ self.tc.ultra_debug = True
+ self.tc.connect(hostkey=public_host_key)
+ self.tc.auth_password(username='slowdive', password='pygmalion')
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+
+ chan = self.tc.open_session()
+ self.assert_(chan.invoke_shell())
+ schan = self.ts.accept(1.0)
+ chan.send('communist j. cat\n')
+ f = schan.makefile()
+ self.assertEquals('communist j. cat\n', f.readline())
+ chan.close()
+ self.assertEquals('', f.readline())
+
+ def test_D_exit_status(self):
+ """
+ verify that get_exit_status() works.
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assert_(not event.isSet())
+ self.ts.start_server(event, server)
+ self.tc.ultra_debug = True
+ self.tc.connect(hostkey=public_host_key)
+ self.tc.auth_password(username='slowdive', password='pygmalion')
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+
+ chan = self.tc.open_session()
+ schan = self.ts.accept(1.0)
+ self.assert_(chan.exec_command('yes'))
+ schan.send('Hello there.\n')
+ # trigger an EOF
+ schan.shutdown_read()
+ schan.shutdown_write()
+ schan.send_exit_status(23)
+ schan.close()
+
+ f = chan.makefile()
+ self.assertEquals('Hello there.\n', f.readline())
+ self.assertEquals('', f.readline())
+ self.assertEquals(23, chan.recv_exit_status())
+ chan.close()
+
+ def test_E_select(self):
+ """
+ verify that select() on a channel works.
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.assert_(not event.isSet())
+ self.ts.start_server(event, server)
+ self.tc.ultra_debug = True
+ self.tc.connect(hostkey=public_host_key)
+ self.tc.auth_password(username='slowdive', password='pygmalion')
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+
+ chan = self.tc.open_session()
+ self.assert_(chan.invoke_shell())
+ schan = self.ts.accept(1.0)
+
+ # nothing should be ready
+ r, w, e = select.select([chan], [], [], 0.1)
+ self.assertEquals([], r)
+ self.assertEquals([], w)
+ self.assertEquals([], e)
+
+ schan.send('hello\n')
+
+ # something should be ready now (give it 1 second to appear)
+ for i in range(10):
+ r, w, e = select.select([chan], [], [], 0.1)
+ if chan in r:
+ break
+ time.sleep(0.1)
+ self.assertEquals([chan], r)
+ self.assertEquals([], w)
+ self.assertEquals([], e)
+
+ self.assertEquals('hello\n', chan.recv(6))
+
+ # and, should be dead again now
+ r, w, e = select.select([chan], [], [], 0.1)
+ self.assertEquals([], r)
+ self.assertEquals([], w)
+ self.assertEquals([], e)
+
+ schan.close()
+
+ # detect eof?
+ for i in range(10):
+ r, w, e = select.select([chan], [], [], 0.1)
+ if chan in r:
+ break
+ time.sleep(0.1)
+ self.assertEquals([chan], r)
+ self.assertEquals([], w)
+ self.assertEquals([], e)
+ self.assertEquals('', chan.recv(16))
+
+ chan.close()
+
+ def test_F_renegotiate(self):
+ """
+ verify that a transport can correctly renegotiate mid-stream.
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ event = threading.Event()
+ server = NullServer()
+ self.ts.start_server(event, server)
+ self.tc.connect(hostkey=public_host_key,
+ username='slowdive', password='pygmalion')
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+
+ self.tc.packetizer.REKEY_BYTES = 16384
+
+ chan = self.tc.open_session()
+ self.assert_(chan.exec_command('yes'))
+ schan = self.ts.accept(1.0)
+
+ self.assertEquals(self.tc.H, self.tc.session_id)
+ for i in range(20):
+ chan.send('x' * 1024)
+ chan.close()
+
+ # allow a few seconds for the rekeying to complete
+ for i in xrange(50):
+ if self.tc.H != self.tc.session_id:
+ break
+ time.sleep(0.1)
+ self.assertNotEquals(self.tc.H, self.tc.session_id)
+
+ schan.close()
+
+ def test_G_compression(self):
+ """
+ verify that zlib compression is basically working.
+ """
+ host_key = RSAKey.from_private_key_file('tests/test_rsa.key')
+ public_host_key = RSAKey(data=str(host_key))
+ self.ts.add_server_key(host_key)
+ self.ts.get_security_options().compression = ('zlib',)
+ self.tc.get_security_options().compression = ('zlib',)
+ event = threading.Event()
+ server = NullServer()
+ self.ts.start_server(event, server)
+ self.tc.connect(hostkey=public_host_key,
+ username='slowdive', password='pygmalion')
+ event.wait(1.0)
+ self.assert_(event.isSet())
+ self.assert_(self.ts.is_active())
+
+ chan = self.tc.open_session()
+ self.assert_(chan.exec_command('yes'))
+ schan = self.ts.accept(1.0)
+
+ bytes = self.tc.packetizer._Packetizer__sent_bytes
+ chan.send('x' * 1024)
+ bytes2 = self.tc.packetizer._Packetizer__sent_bytes
+ # tests show this is actually compressed to *52 bytes*! including packet overhead! nice!! :)
+ self.assert_(bytes2 - bytes < 1024)
+
+ chan.close()
+ schan.close()
diff --git a/tests/test_util.py b/tests/test_util.py
new file mode 100644
index 0000000..fa8c029
--- /dev/null
+++ b/tests/test_util.py
@@ -0,0 +1,80 @@
+#!/usr/bin/python
+
+# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
+#
+# This file is part of paramiko.
+#
+# Paramiko is free software; you can redistribute it and/or modify it under the
+# terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY
+# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Paramiko; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+
+"""
+Some unit tests for utility functions.
+"""
+
+import cStringIO
+import unittest
+from Crypto.Hash import SHA
+import paramiko.util
+
+
+test_config_file = """\
+Host *
+ User robey
+ IdentityFile =~/.ssh/id_rsa
+
+# comment
+Host *.example.com
+ \tUser bjork
+Port=3333
+Host *
+ \t \t Crazy something dumb
+Host spoo.example.com
+Crazy something else
+"""
+
+
+class UtilTest (unittest.TestCase):
+
+ K = 14730343317708716439807310032871972459448364195094179797249681733965528989482751523943515690110179031004049109375612685505881911274101441415545039654102474376472240501616988799699744135291070488314748284283496055223852115360852283821334858541043710301057312858051901453919067023103730011648890038847384890504L
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_1_parse_config(self):
+ global test_config_file
+ f = cStringIO.StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ self.assertEquals(config, [ {'identityfile': '~/.ssh/id_rsa', 'host': '*', 'user': 'robey',
+ 'crazy': 'something dumb '},
+ {'host': '*.example.com', 'user': 'bjork', 'port': '3333'},
+ {'host': 'spoo.example.com', 'crazy': 'something else'}])
+
+ def test_2_host_config(self):
+ global test_config_file
+ f = cStringIO.StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ c = paramiko.util.lookup_ssh_host_config('irc.danger.com', config)
+ self.assertEquals(c, {'identityfile': '~/.ssh/id_rsa', 'user': 'robey', 'crazy': 'something dumb '})
+ c = paramiko.util.lookup_ssh_host_config('irc.example.com', config)
+ self.assertEquals(c, {'identityfile': '~/.ssh/id_rsa', 'user': 'bjork', 'crazy': 'something dumb ', 'port': '3333'})
+ c = paramiko.util.lookup_ssh_host_config('spoo.example.com', config)
+ self.assertEquals(c, {'identityfile': '~/.ssh/id_rsa', 'user': 'bjork', 'crazy': 'something else', 'port': '3333'})
+
+ def test_3_generate_key_bytes(self):
+ x = paramiko.util.generate_key_bytes(SHA, 'ABCDEFGH', 'This is my secret passphrase.', 64)
+ hex = ''.join(['%02x' % ord(c) for c in x])
+ self.assertEquals(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b')