diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/loop.py | 104 | ||||
-rw-r--r-- | tests/stub_sftp.py | 192 | ||||
-rw-r--r-- | tests/test_dss.key | 12 | ||||
-rw-r--r-- | tests/test_dss_password.key | 15 | ||||
-rw-r--r-- | tests/test_file.py | 153 | ||||
-rw-r--r-- | tests/test_kex.py | 183 | ||||
-rw-r--r-- | tests/test_message.py | 102 | ||||
-rw-r--r-- | tests/test_packetizer.py | 70 | ||||
-rw-r--r-- | tests/test_pkey.py | 140 | ||||
-rw-r--r-- | tests/test_rsa.key | 15 | ||||
-rw-r--r-- | tests/test_rsa_password.key | 18 | ||||
-rw-r--r-- | tests/test_sftp.py | 740 | ||||
-rw-r--r-- | tests/test_transport.py | 573 | ||||
-rw-r--r-- | tests/test_util.py | 80 |
14 files changed, 2397 insertions, 0 deletions
diff --git a/tests/loop.py b/tests/loop.py new file mode 100644 index 0000000..ad5f7ca --- /dev/null +++ b/tests/loop.py @@ -0,0 +1,104 @@ +#!/usr/bin/python + +# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net> +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +... +""" + +import threading, socket + + +class LoopSocket (object): + """ + A LoopSocket looks like a normal socket, but all data written to it is + delivered on the read-end of another LoopSocket, and vice versa. It's + like a software "socketpair". + """ + + def __init__(self): + self.__in_buffer = '' + self.__lock = threading.Lock() + self.__cv = threading.Condition(self.__lock) + self.__timeout = None + self.__mate = None + + def close(self): + self.__unlink() + try: + self.__lock.acquire() + self.__in_buffer = '' + finally: + self.__lock.release() + + def send(self, data): + if self.__mate is None: + # EOF + raise EOFError() + self.__mate.__feed(data) + return len(data) + + def recv(self, n): + self.__lock.acquire() + try: + if self.__mate is None: + # EOF + return '' + if len(self.__in_buffer) == 0: + self.__cv.wait(self.__timeout) + if len(self.__in_buffer) == 0: + raise socket.timeout + if n < self.__in_buffer: + out = self.__in_buffer[:n] + self.__in_buffer = self.__in_buffer[n:] + else: + out = self.__in_buffer + self.__in_buffer = '' + return out + finally: + self.__lock.release() + + def settimeout(self, n): + self.__timeout = n + + def link(self, other): + self.__mate = other + self.__mate.__mate = self + + def __feed(self, data): + self.__lock.acquire() + try: + self.__in_buffer += data + self.__cv.notifyAll() + finally: + self.__lock.release() + + def __unlink(self): + m = None + self.__lock.acquire() + try: + if self.__mate is not None: + m = self.__mate + self.__mate = None + finally: + self.__lock.release() + if m is not None: + m.__unlink() + + diff --git a/tests/stub_sftp.py b/tests/stub_sftp.py new file mode 100644 index 0000000..4b8b9c3 --- /dev/null +++ b/tests/stub_sftp.py @@ -0,0 +1,192 @@ +#!/usr/bin/python + +# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net> +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +A stub SFTP server for loopback SFTP testing. +""" + +import os +from paramiko import ServerInterface, SFTPServerInterface, SFTPServer, SFTPAttributes, \ + SFTPHandle, SFTP_OK, AUTH_SUCCESSFUL, OPEN_SUCCEEDED + + +class StubServer (ServerInterface): + def check_auth_password(self, username, password): + # all are allowed + return AUTH_SUCCESSFUL + + def check_channel_request(self, kind, chanid): + return OPEN_SUCCEEDED + + +class StubSFTPHandle (SFTPHandle): + def stat(self): + try: + return SFTPAttributes.from_stat(os.fstat(self.readfile.fileno())) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + + def chattr(self, attr): + # python doesn't have equivalents to fchown or fchmod, so we have to + # use the stored filename + try: + SFTPServer.set_file_attr(self.filename, attr) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + + +class StubSFTPServer (SFTPServerInterface): + # assume current folder is a fine root + # (the tests always create and eventualy delete a subfolder, so there shouldn't be any mess) + ROOT = os.getcwd() + + def _realpath(self, path): + return self.ROOT + self.canonicalize(path) + + def list_folder(self, path): + path = self._realpath(path) + try: + out = [ ] + flist = os.listdir(path) + for fname in flist: + attr = SFTPAttributes.from_stat(os.stat(os.path.join(path, fname))) + attr.filename = fname + out.append(attr) + return out + except OSError, e: + return SFTPServer.convert_errno(e.errno) + + def stat(self, path): + path = self._realpath(path) + try: + return SFTPAttributes.from_stat(os.stat(path)) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + + def lstat(self, path): + path = self._realpath(path) + try: + return SFTPAttributes.from_stat(os.lstat(path)) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + + def open(self, path, flags, attr): + path = self._realpath(path) + try: + fd = os.open(path, flags) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + if (flags & os.O_CREAT) and (attr is not None): + SFTPServer.set_file_attr(path, attr) + if flags & os.O_WRONLY: + fstr = 'w' + elif flags & os.O_RDWR: + fstr = 'r+' + else: + # O_RDONLY (== 0) + fstr = 'r' + try: + f = os.fdopen(fd, fstr) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + fobj = StubSFTPHandle() + fobj.filename = path + fobj.readfile = f + fobj.writefile = f + return fobj + + def remove(self, path): + path = self._realpath(path) + try: + os.remove(path) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def rename(self, oldpath, newpath): + oldpath = self._realpath(oldpath) + newpath = self._realpath(newpath) + try: + os.rename(oldpath, newpath) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def mkdir(self, path, attr): + path = self._realpath(path) + try: + os.mkdir(path) + if attr is not None: + SFTPServer.set_file_attr(path, attr) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def rmdir(self, path): + path = self._realpath(path) + try: + os.rmdir(path) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def chattr(self, path, attr): + path = self._realpath(path) + try: + SFTPServer.set_file_attr(path, attr) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def symlink(self, target_path, path): + path = self._realpath(path) + if (len(target_path) > 0) and (target_path[0] == '/'): + # absolute symlink + target_path = os.path.join(self.ROOT, target_path[1:]) + if target_path[:2] == '//': + # bug in os.path.join + target_path = target_path[1:] + else: + # compute relative to path + abspath = os.path.join(os.path.dirname(path), target_path) + if abspath[:len(self.ROOT)] != self.ROOT: + # this symlink isn't going to work anyway -- just break it immediately + target_path = '<error>' + try: + os.symlink(target_path, path) + except: + return SFTPServer.convert_errno(e.errno) + return SFTP_OK + + def readlink(self, path): + path = self._realpath(path) + try: + symlink = os.readlink(path) + except OSError, e: + return SFTPServer.convert_errno(e.errno) + # if it's absolute, remove the root + if os.path.isabs(symlink): + if symlink[:len(self.ROOT)] == self.ROOT: + symlink = symlink[len(self.ROOT):] + if (len(symlink) == 0) or (symlink[0] != '/'): + symlink = '/' + symlink + else: + symlink = '<error>' + return symlink diff --git a/tests/test_dss.key b/tests/test_dss.key new file mode 100644 index 0000000..e10807f --- /dev/null +++ b/tests/test_dss.key @@ -0,0 +1,12 @@ +-----BEGIN DSA PRIVATE KEY----- +MIIBuwIBAAKBgQDngaYDZ30c6/7cJgEEbtl8FgKdwhba1Z7oOrOn4MI/6C42G1bY +wMuqZf4dBCglsdq39SHrcjbE8Vq54gPSOh3g4+uV9Rcg5IOoPLbwp2jQfF6f1FIb +sx7hrDCIqUcQccPSxetPBKmXI9RN8rZLaFuQeTnI65BKM98Ruwvq6SI2LwIVAPDP +hSeawaJI27mKqOfe5PPBSmyHAoGBAJMXxXmPD9sGaQ419DIpmZecJKBUAy9uXD8x +gbgeDpwfDaFJP8owByCKREocPFfi86LjCuQkyUKOfjYMN6iHIf1oEZjB8uJAatUr +FzI0ArXtUqOhwTLwTyFuUojE5own2WYsOAGByvgfyWjsGhvckYNhI4ODpNdPlxQ8 +ZamaPGPsAoGARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmn +jO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacI +BlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgECFGI9QPSc +h9pT9XHqn+1rZ4bK+QGA +-----END DSA PRIVATE KEY----- diff --git a/tests/test_dss_password.key b/tests/test_dss_password.key new file mode 100644 index 0000000..e2a9bc5 --- /dev/null +++ b/tests/test_dss_password.key @@ -0,0 +1,15 @@ +-----BEGIN DSA PRIVATE KEY----- +Proc-Type: 4,ENCRYPTED +DEK-Info: DES-EDE3-CBC,78DAEB836ED0A646 + +ldWkq9OMlXqWmjIqppNnmNPIUj5uVT12LkBosTApTbibTme3kIJb1uDeG2BShVfY ++vDOTUE9koGPDLsxW1t5At+EVyIDK8aIO0uHteXM5AbBX20LLUWRbRVqZhsMxqQh +3H3XlHiN+QhaWcb4fFuu18a8SkimTFpDnZuffoCDl/zh/B7XieARTLA805K/ZgVB +BBwflkR2BE053XHrJAIx9BEUlLP76Fo18rvjLZOSeu3s+VnnhqUb5FCt5h50a46u +YXQBbo2r9Zo1ilGMNEXJO0gk5hwGVmTySz53NkPA5HmWt8NIzv5jQHMDy7N+ZykF +uwpP1R5M/ZIFY4Y5h/lvn6IJjQ7VySRPIbpN8o2YJv2OD1Ja80n3tU8Mg77o3o4d +NwKm7cCjlq+FuIBdOsSgsB8FPQRUhW+jpFDxmWN64DM2cEg6RUdptby7WmMp0HwK +1qyEfxHjLMuDVlD7lASIDBrRlUjPtXEH1DzIYQuYaRZaixFoZ7EY+X73TwmrKFEU +US9ZnQZtRtroRqGwR4fz4wQQsjTl/AmOijlBmi29taJccJsT/THrLQ5plOEd8OMv +9FsaPJXBU85gaRKo3JZtrw== +-----END DSA PRIVATE KEY----- diff --git a/tests/test_file.py b/tests/test_file.py new file mode 100644 index 0000000..250821c --- /dev/null +++ b/tests/test_file.py @@ -0,0 +1,153 @@ +#!/usr/bin/python + +# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net> +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +Some unit tests for the BufferedFile abstraction. +""" + +import unittest +from paramiko.file import BufferedFile + + +class LoopbackFile (BufferedFile): + """ + BufferedFile object that you can write data into, and then read it back. + """ + def __init__(self, mode='r', bufsize=-1): + BufferedFile.__init__(self) + self._set_mode(mode, bufsize) + self.buffer = '' + + def _read(self, size): + if len(self.buffer) == 0: + return None + if size > len(self.buffer): + size = len(self.buffer) + data = self.buffer[:size] + self.buffer = self.buffer[size:] + return data + + def _write(self, data): + self.buffer += data + return len(data) + + +class BufferedFileTest (unittest.TestCase): + + def test_1_simple(self): + f = LoopbackFile('r') + try: + f.write('hi') + self.assert_(False, 'no exception on write to read-only file') + except: + pass + f.close() + + f = LoopbackFile('w') + try: + f.read(1) + self.assert_(False, 'no exception to read from write-only file') + except: + pass + f.close() + + def test_2_readline(self): + f = LoopbackFile('r+U') + f.write('First line.\nSecond line.\r\nThird line.\nFinal line non-terminated.') + self.assertEqual(f.readline(), 'First line.\n') + # universal newline mode should convert this linefeed: + self.assertEqual(f.readline(), 'Second line.\n') + # truncated line: + self.assertEqual(f.readline(7), 'Third l') + self.assertEqual(f.readline(), 'ine.\n') + self.assertEqual(f.readline(), 'Final line non-terminated.') + self.assertEqual(f.readline(), '') + f.close() + try: + f.readline() + self.assert_(False, 'no exception on readline of closed file') + except IOError: + pass + self.assert_('\n' in f.newlines) + self.assert_('\r\n' in f.newlines) + self.assert_('\r' not in f.newlines) + + def test_3_lf(self): + """ + try to trick the linefeed detector. + """ + f = LoopbackFile('r+U') + f.write('First line.\r') + self.assertEqual(f.readline(), 'First line.\n') + f.write('\nSecond.\r\n') + self.assertEqual(f.readline(), 'Second.\n') + f.close() + self.assertEqual(f.newlines, '\r\n') + + def test_4_write(self): + """ + verify that write buffering is on. + """ + f = LoopbackFile('r+', 1) + f.write('Complete line.\nIncomplete line.') + self.assertEqual(f.readline(), 'Complete line.\n') + self.assertEqual(f.readline(), '') + f.write('..\n') + self.assertEqual(f.readline(), 'Incomplete line...\n') + f.close() + + def test_5_flush(self): + """ + verify that flush will force a write. + """ + f = LoopbackFile('r+', 512) + f.write('Not\nquite\n512 bytes.\n') + self.assertEqual(f.read(1), '') + f.flush() + self.assertEqual(f.read(5), 'Not\nq') + self.assertEqual(f.read(10), 'uite\n512 b') + self.assertEqual(f.read(9), 'ytes.\n') + self.assertEqual(f.read(3), '') + f.close() + + def test_6_buffering(self): + """ + verify that flushing happens automatically on buffer crossing. + """ + f = LoopbackFile('r+', 16) + f.write('Too small.') + self.assertEqual(f.read(4), '') + f.write(' ') + self.assertEqual(f.read(4), '') + f.write('Enough.') + self.assertEqual(f.read(20), 'Too small. Enough.') + f.close() + + def test_7_read_all(self): + """ + verify that read(-1) returns everything left in the file. + """ + f = LoopbackFile('r+', 16) + f.write('The first thing you need to do is open your eyes. ') + f.write('Then, you need to close them again.\n') + s = f.read(-1) + self.assertEqual(s, 'The first thing you need to do is open your eyes. Then, you ' + + 'need to close them again.\n') + f.close() diff --git a/tests/test_kex.py b/tests/test_kex.py new file mode 100644 index 0000000..2680853 --- /dev/null +++ b/tests/test_kex.py @@ -0,0 +1,183 @@ +#!/usr/bin/python + +# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net> +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +Some unit tests for the key exchange protocols. +""" + +import unittest +import paramiko.util +from paramiko.kex_group1 import KexGroup1 +from paramiko.kex_gex import KexGex +from paramiko import Message + + +class FakeRandpool (object): + def stir(self): + pass + def get_bytes(self, n): + return chr(0xcc) * n + +class FakeKey (object): + def __str__(self): + return 'fake-key' + def sign_ssh_data(self, randpool, H): + return 'fake-sig' + +class FakeModulusPack (object): + P = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFFL + G = 2 + def get_modulus(self, min, ask, max): + return self.G, self.P + +class FakeTransport (object): + randpool = FakeRandpool() + local_version = 'SSH-2.0-paramiko_1.0' + remote_version = 'SSH-2.0-lame' + local_kex_init = 'local-kex-init' + remote_kex_init = 'remote-kex-init' + + def _send_message(self, m): + self._message = m + def _expect_packet(self, t): + self._expect = t + def _set_K_H(self, K, H): + self._K = K + self._H = H + def _verify_key(self, host_key, sig): + self._verify = (host_key, sig) + def _activate_outbound(self): + self._activated = True + def _log(self, level, s): + pass + def get_server_key(self): + return FakeKey() + def _get_modulus_pack(self): + return FakeModulusPack() + + +class KexTest (unittest.TestCase): + + K = 14730343317708716439807310032871972459448364195094179797249681733965528989482751523943515690110179031004049109375612685505881911274101441415545039654102474376472240501616988799699744135291070488314748284283496055223852115360852283821334858541043710301057312858051901453919067023103730011648890038847384890504L + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_1_group1_client(self): + transport = FakeTransport() + transport.server_mode = False + kex = KexGroup1(transport) + kex.start_kex() + x = '1E000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4' + self.assertEquals(x, paramiko.util.hexify(str(transport._message))) + self.assertEquals(paramiko.kex_group1._MSG_KEXDH_REPLY, transport._expect) + + # fake "reply" + msg = Message() + msg.add_string('fake-host-key') + msg.add_mpint(69) + msg.add_string('fake-sig') + msg.rewind() + kex.parse_next(paramiko.kex_group1._MSG_KEXDH_REPLY, msg) + H = '03079780F3D3AD0B3C6DB30C8D21685F367A86D2' + self.assertEquals(self.K, transport._K) + self.assertEquals(H, paramiko.util.hexify(transport._H)) + self.assertEquals(('fake-host-key', 'fake-sig'), transport._verify) + self.assert_(transport._activated) + + def test_2_group1_server(self): + transport = FakeTransport() + transport.server_mode = True + kex = KexGroup1(transport) + kex.start_kex() + self.assertEquals(paramiko.kex_group1._MSG_KEXDH_INIT, transport._expect) + + msg = Message() + msg.add_mpint(69) + msg.rewind() + kex.parse_next(paramiko.kex_group1._MSG_KEXDH_INIT, msg) + H = 'B16BF34DD10945EDE84E9C1EF24A14BFDC843389' + x = '1F0000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967' + self.assertEquals(self.K, transport._K) + self.assertEquals(H, paramiko.util.hexify(transport._H)) + self.assertEquals(x, paramiko.util.hexify(str(transport._message))) + self.assert_(transport._activated) + + def test_3_gex_client(self): + transport = FakeTransport() + transport.server_mode = False + kex = KexGex(transport) + kex.start_kex() + x = '22000004000000080000002000' + self.assertEquals(x, paramiko.util.hexify(str(transport._message))) + self.assertEquals(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, transport._expect) + + msg = Message() + msg.add_mpint(FakeModulusPack.P) + msg.add_mpint(FakeModulusPack.G) + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg) + x = '20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4' + self.assertEquals(x, paramiko.util.hexify(str(transport._message))) + self.assertEquals(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, transport._expect) + + msg = Message() + msg.add_string('fake-host-key') + msg.add_mpint(69) + msg.add_string('fake-sig') + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg) + H = 'A265563F2FA87F1A89BF007EE90D58BE2E4A4BD0' + self.assertEquals(self.K, transport._K) + self.assertEquals(H, paramiko.util.hexify(transport._H)) + self.assertEquals(('fake-host-key', 'fake-sig'), transport._verify) + self.assert_(transport._activated) + + def test_4_gex_server(self): + transport = FakeTransport() + transport.server_mode = True + kex = KexGex(transport) + kex.start_kex() + self.assertEquals(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, transport._expect) + + msg = Message() + msg.add_int(1024) + msg.add_int(2048) + msg.add_int(4096) + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg) + x = '1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102' + self.assertEquals(x, paramiko.util.hexify(str(transport._message))) + self.assertEquals(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, transport._expect) + + msg = Message() + msg.add_mpint(12345) + msg.rewind() + kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg) + K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581L + H = 'CE754197C21BF3452863B4F44D0B3951F12516EF' + x = '210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967' + self.assertEquals(K, transport._K) + self.assertEquals(H, paramiko.util.hexify(transport._H)) + self.assertEquals(x, paramiko.util.hexify(str(transport._message))) + self.assert_(transport._activated) diff --git a/tests/test_message.py b/tests/test_message.py new file mode 100644 index 0000000..441e3ce --- /dev/null +++ b/tests/test_message.py @@ -0,0 +1,102 @@ +#!/usr/bin/python + +# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net> +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +Some unit tests for ssh protocol message blocks. +""" + +import unittest +from paramiko.message import Message + + +class MessageTest (unittest.TestCase): + + __a = '\x00\x00\x00\x17\x07\x60\xe0\x90\x00\x00\x00\x01q\x00\x00\x00\x05hello\x00\x00\x03\xe8' + ('x' * 1000) + __b = '\x01\x00\xf3\x00\x3f\x00\x00\x00\x10huey,dewey,louie' + __c = '\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x01\x11\x00\x00\x00\x07\x00\xf5\xe4\xd3\xc2\xb1\x09\x00\x00\x00\x06\x9a\x1b\x2c\x3d\x4e\xf7' + __d = '\x00\x00\x00\x05\x00\x00\x00\x05\x11\x22\x33\x44\x55\x01\x00\x00\x00\x03cat\x00\x00\x00\x03a,b' + + def test_1_encode(self): + msg = Message() + msg.add_int(23) + msg.add_int(123789456) + msg.add_string('q') + msg.add_string('hello') + msg.add_string('x' * 1000) + self.assertEquals(str(msg), self.__a) + + msg = Message() + msg.add_boolean(True) + msg.add_boolean(False) + msg.add_byte('\xf3') + msg.add_bytes('\x00\x3f') + msg.add_list(['huey', 'dewey', 'louie']) + self.assertEquals(str(msg), self.__b) + + msg = Message() + msg.add_int64(5) + msg.add_int64(0xf5e4d3c2b109L) + msg.add_mpint(17) + msg.add_mpint(0xf5e4d3c2b109L) + msg.add_mpint(-0x65e4d3c2b109L) + self.assertEquals(str(msg), self.__c) + + def test_2_decode(self): + msg = Message(self.__a) + self.assertEquals(msg.get_int(), 23) + self.assertEquals(msg.get_int(), 123789456) + self.assertEquals(msg.get_string(), 'q') + self.assertEquals(msg.get_string(), 'hello') + self.assertEquals(msg.get_string(), 'x' * 1000) + + msg = Message(self.__b) + self.assertEquals(msg.get_boolean(), True) + self.assertEquals(msg.get_boolean(), False) + self.assertEquals(msg.get_byte(), '\xf3') + self.assertEquals(msg.get_bytes(2), '\x00\x3f') + self.assertEquals(msg.get_list(), ['huey', 'dewey', 'louie']) + + msg = Message(self.__c) + self.assertEquals(msg.get_int64(), 5) + self.assertEquals(msg.get_int64(), 0xf5e4d3c2b109L) + self.assertEquals(msg.get_mpint(), 17) + self.assertEquals(msg.get_mpint(), 0xf5e4d3c2b109L) + self.assertEquals(msg.get_mpint(), -0x65e4d3c2b109L) + + def test_3_add(self): + msg = Message() + msg.add(5) + msg.add(0x1122334455L) + msg.add(True) + msg.add('cat') + msg.add(['a', 'b']) + self.assertEquals(str(msg), self.__d) + + def test_4_misc(self): + msg = Message(self.__d) + self.assertEquals(msg.get_int(), 5) + self.assertEquals(msg.get_mpint(), 0x1122334455L) + self.assertEquals(msg.get_so_far(), self.__d[:13]) + self.assertEquals(msg.get_remainder(), self.__d[13:]) + msg.rewind() + self.assertEquals(msg.get_int(), 5) + self.assertEquals(msg.get_so_far(), self.__d[:4]) + self.assertEquals(msg.get_remainder(), self.__d[4:]) + diff --git a/tests/test_packetizer.py b/tests/test_packetizer.py new file mode 100644 index 0000000..8c992bd --- /dev/null +++ b/tests/test_packetizer.py @@ -0,0 +1,70 @@ +# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net> +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +Some unit tests for the ssh2 protocol in Transport. +""" + +import unittest +from loop import LoopSocket +from Crypto.Cipher import AES +from Crypto.Hash import SHA, HMAC +from paramiko import Message, Packetizer, util + +class PacketizerTest (unittest.TestCase): + + def test_1_write (self): + rsock = LoopSocket() + wsock = LoopSocket() + rsock.link(wsock) + p = Packetizer(wsock) + p.set_log(util.get_logger('paramiko.transport')) + p.set_hexdump(True) + cipher = AES.new('\x00' * 16, AES.MODE_CBC, '\x55' * 16) + p.set_outbound_cipher(cipher, 16, SHA, 12, '\x1f' * 20) + + # message has to be at least 16 bytes long, so we'll have at least one + # block of data encrypted that contains zero random padding bytes + m = Message() + m.add_byte(chr(100)) + m.add_int(100) + m.add_int(1) + m.add_int(900) + p.send_message(m) + data = rsock.recv(100) + # 32 + 12 bytes of MAC = 44 + self.assertEquals(44, len(data)) + self.assertEquals('\x43\x91\x97\xbd\x5b\x50\xac\x25\x87\xc2\xc4\x6b\xc7\xe9\x38\xc0', data[:16]) + + def test_2_read (self): + rsock = LoopSocket() + wsock = LoopSocket() + rsock.link(wsock) + p = Packetizer(rsock) + p.set_log(util.get_logger('paramiko.transport')) + p.set_hexdump(True) + cipher = AES.new('\x00' * 16, AES.MODE_CBC, '\x55' * 16) + p.set_inbound_cipher(cipher, 16, SHA, 12, '\x1f' * 20) + + wsock.send('C\x91\x97\xbd[P\xac%\x87\xc2\xc4k\xc7\xe98\xc0' + \ + '\x90\xd2\x16V\rqsa8|L=\xfb\x97}\xe2n\x03\xb1\xa0\xc2\x1c\xd6AAL\xb4Y') + cmd, m = p.read_message() + self.assertEquals(100, cmd) + self.assertEquals(100, m.get_int()) + self.assertEquals(1, m.get_int()) + self.assertEquals(900, m.get_int()) diff --git a/tests/test_pkey.py b/tests/test_pkey.py new file mode 100644 index 0000000..e56edb1 --- /dev/null +++ b/tests/test_pkey.py @@ -0,0 +1,140 @@ +# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net> +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +Some unit tests for public/private key objects. +""" + +import unittest +from paramiko import RSAKey, DSSKey, Message, util, randpool + +# from openssh's ssh-keygen +PUB_RSA = 'ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAIEA049W6geFpmsljTwfvI1UmKWWJPNFI74+vNKTk4dmzkQY2yAMs6FhlvhlI8ysU4oj71ZsRYMecHbBbxdN79+JRFVYTKaLqjwGENeTd+yv4q+V2PvZv3fLnzApI3l7EJCqhWwJUHJ1jAkZzqDx0tyOL4uoZpww3nmE0kb3y21tH4c=' +PUB_DSS = 'ssh-dss AAAAB3NzaC1kc3MAAACBAOeBpgNnfRzr/twmAQRu2XwWAp3CFtrVnug6s6fgwj/oLjYbVtjAy6pl/h0EKCWx2rf1IetyNsTxWrniA9I6HeDj65X1FyDkg6g8tvCnaNB8Xp/UUhuzHuGsMIipRxBxw9LF608EqZcj1E3ytktoW5B5OcjrkEoz3xG7C+rpIjYvAAAAFQDwz4UnmsGiSNu5iqjn3uTzwUpshwAAAIEAkxfFeY8P2wZpDjX0MimZl5wkoFQDL25cPzGBuB4OnB8NoUk/yjAHIIpEShw8V+LzouMK5CTJQo5+Ngw3qIch/WgRmMHy4kBq1SsXMjQCte1So6HBMvBPIW5SiMTmjCfZZiw4AYHK+B/JaOwaG9yRg2Ejg4Ok10+XFDxlqZo8Y+wAAACARmR7CCPjodxASvRbIyzaVpZoJ/Z6x7dAumV+ysrV1BVYd0lYukmnjO1kKBWApqpH1ve9XDQYN8zgxM4b16L21kpoWQnZtXrY3GZ4/it9kUgyB7+NwacIBlXa8cMDL7Q/69o0d54U0X/NeX5QxuYR6OMJlrkQB7oiW/P/1mwjQgE=' +FINGER_RSA = '1024 60:73:38:44:cb:51:86:65:7f:de:da:a2:2b:5a:57:d5' +FINGER_DSS = '1024 44:78:f0:b9:a2:3c:c5:18:20:09:ff:75:5b:c1:d2:6c' +SIGNED_RSA = '20:d7:8a:31:21:cb:f7:92:12:f2:a4:89:37:f5:78:af:e6:16:b6:25:b9:97:3d:a2:cd:5f:ca:20:21:73:4c:ad:34:73:8f:20:77:28:e2:94:15:08:d8:91:40:7a:85:83:bf:18:37:95:dc:54:1a:9b:88:29:6c:73:ca:38:b4:04:f1:56:b9:f2:42:9d:52:1b:29:29:b4:4f:fd:c9:2d:af:47:d2:40:76:30:f3:63:45:0c:d9:1d:43:86:0f:1c:70:e2:93:12:34:f3:ac:c5:0a:2f:14:50:66:59:f1:88:ee:c1:4a:e9:d1:9c:4e:46:f0:0e:47:6f:38:74:f1:44:a8' + + +class KeyTest (unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_1_generate_key_bytes(self): + from Crypto.Hash import MD5 + key = util.generate_key_bytes(MD5, '\x01\x02\x03\x04', 'happy birthday', 30) + exp = util.unhexify('61E1F272F4C1C4561586BD322498C0E924672780F47BB37DDA7D54019E64') + self.assertEquals(exp, key) + + def test_2_load_rsa(self): + key = RSAKey.from_private_key_file('tests/test_rsa.key') + self.assertEquals('ssh-rsa', key.get_name()) + exp_rsa = FINGER_RSA.split()[1].replace(':', '') + my_rsa = util.hexify(key.get_fingerprint()).lower() + self.assertEquals(exp_rsa, my_rsa) + self.assertEquals(PUB_RSA.split()[1], key.get_base64()) + self.assertEquals(1024, key.get_bits()) + + def test_3_load_rsa_password(self): + key = RSAKey.from_private_key_file('tests/test_rsa_password.key', 'television') + self.assertEquals('ssh-rsa', key.get_name()) + exp_rsa = FINGER_RSA.split()[1].replace(':', '') + my_rsa = util.hexify(key.get_fingerprint()).lower() + self.assertEquals(exp_rsa, my_rsa) + self.assertEquals(PUB_RSA.split()[1], key.get_base64()) + self.assertEquals(1024, key.get_bits()) + + def test_4_load_dss(self): + key = DSSKey.from_private_key_file('tests/test_dss.key') + self.assertEquals('ssh-dss', key.get_name()) + exp_dss = FINGER_DSS.split()[1].replace(':', '') + my_dss = util.hexify(key.get_fingerprint()).lower() + self.assertEquals(exp_dss, my_dss) + self.assertEquals(PUB_DSS.split()[1], key.get_base64()) + self.assertEquals(1024, key.get_bits()) + + def test_5_load_dss_password(self): + key = DSSKey.from_private_key_file('tests/test_dss_password.key', 'television') + self.assertEquals('ssh-dss', key.get_name()) + exp_dss = FINGER_DSS.split()[1].replace(':', '') + my_dss = util.hexify(key.get_fingerprint()).lower() + self.assertEquals(exp_dss, my_dss) + self.assertEquals(PUB_DSS.split()[1], key.get_base64()) + self.assertEquals(1024, key.get_bits()) + + def test_6_compare_rsa(self): + # verify that the private & public keys compare equal + key = RSAKey.from_private_key_file('tests/test_rsa.key') + self.assertEquals(key, key) + pub = RSAKey(data=str(key)) + self.assert_(key.can_sign()) + self.assert_(not pub.can_sign()) + self.assertEquals(key, pub) + + def test_7_compare_dss(self): + # verify that the private & public keys compare equal + key = DSSKey.from_private_key_file('tests/test_dss.key') + self.assertEquals(key, key) + pub = DSSKey(data=str(key)) + self.assert_(key.can_sign()) + self.assert_(not pub.can_sign()) + self.assertEquals(key, pub) + + def test_8_sign_rsa(self): + # verify that the rsa private key can sign and verify + key = RSAKey.from_private_key_file('tests/test_rsa.key') + msg = key.sign_ssh_data(randpool, 'ice weasels') + self.assert_(type(msg) is Message) + msg.rewind() + self.assertEquals('ssh-rsa', msg.get_string()) + sig = ''.join([chr(int(x, 16)) for x in SIGNED_RSA.split(':')]) + self.assertEquals(sig, msg.get_string()) + msg.rewind() + pub = RSAKey(data=str(key)) + self.assert_(pub.verify_ssh_sig('ice weasels', msg)) + + def test_9_sign_dss(self): + # verify that the dss private key can sign and verify + key = DSSKey.from_private_key_file('tests/test_dss.key') + msg = key.sign_ssh_data(randpool, 'ice weasels') + self.assert_(type(msg) is Message) + msg.rewind() + self.assertEquals('ssh-dss', msg.get_string()) + # can't do the same test as we do for RSA, because DSS signatures + # are usually different each time. but we can test verification + # anyway so it's ok. + self.assertEquals(40, len(msg.get_string())) + msg.rewind() + pub = DSSKey(data=str(key)) + self.assert_(pub.verify_ssh_sig('ice weasels', msg)) + + def test_A_generate_rsa(self): + key = RSAKey.generate(1024) + msg = key.sign_ssh_data(randpool, 'jerri blank') + msg.rewind() + self.assert_(key.verify_ssh_sig('jerri blank', msg)) + + def test_B_generate_dss(self): + key = DSSKey.generate(1024) + msg = key.sign_ssh_data(randpool, 'jerri blank') + msg.rewind() + self.assert_(key.verify_ssh_sig('jerri blank', msg)) diff --git a/tests/test_rsa.key b/tests/test_rsa.key new file mode 100644 index 0000000..f50e9c5 --- /dev/null +++ b/tests/test_rsa.key @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICWgIBAAKBgQDTj1bqB4WmayWNPB+8jVSYpZYk80Ujvj680pOTh2bORBjbIAyz +oWGW+GUjzKxTiiPvVmxFgx5wdsFvF03v34lEVVhMpouqPAYQ15N37K/ir5XY+9m/ +d8ufMCkjeXsQkKqFbAlQcnWMCRnOoPHS3I4vi6hmnDDeeYTSRvfLbW0fhwIBIwKB +gBIiOqZYaoqbeD9OS9z2K9KR2atlTxGxOJPXiP4ESqP3NVScWNwyZ3NXHpyrJLa0 +EbVtzsQhLn6rF+TzXnOlcipFvjsem3iYzCpuChfGQ6SovTcOjHV9z+hnpXvQ/fon +soVRZY65wKnF7IAoUwTmJS9opqgrN6kRgCd3DASAMd1bAkEA96SBVWFt/fJBNJ9H +tYnBKZGw0VeHOYmVYbvMSstssn8un+pQpUm9vlG/bp7Oxd/m+b9KWEh2xPfv6zqU +avNwHwJBANqzGZa/EpzF4J8pGti7oIAPUIDGMtfIcmqNXVMckrmzQ2vTfqtkEZsA +4rE1IERRyiJQx6EJsz21wJmGV9WJQ5kCQQDwkS0uXqVdFzgHO6S++tjmjYcxwr3g +H0CoFYSgbddOT6miqRskOQF3DZVkJT3kyuBgU2zKygz52ukQZMqxCb1fAkASvuTv +qfpH87Qq5kQhNKdbbwbmd2NxlNabazPijWuphGTdW0VfJdWfklyS2Kr+iqrs/5wV +HhathJt636Eg7oIjAkA8ht3MQ+XSl9yIJIS8gVpbPxSw5OMfw0PjVE7tBdQruiSc +nvuQES5C9BMHjF39LZiGH1iLQy7FgdHyoP+eodI7 +-----END RSA PRIVATE KEY----- diff --git a/tests/test_rsa_password.key b/tests/test_rsa_password.key new file mode 100644 index 0000000..7713049 --- /dev/null +++ b/tests/test_rsa_password.key @@ -0,0 +1,18 @@ +-----BEGIN RSA PRIVATE KEY----- +Proc-Type: 4,ENCRYPTED +DEK-Info: DES-EDE3-CBC,DAA422E8A5A8EFB7 + ++nssHGmWl91IcmGiE6DdCIqGvAP04tuLh60wLjWBvdjtF9CjztPnF57xe+6pBk7o +YgF/Ry3ik9ZV9rHNcRXifDKM9crxtYlpUlkM2C0SP89sXaO0P1Q1yCnrtZUwDIKO +BNV8et5X7+AGMFsy/nmv0NFMrbpoG03Dppsloecd29NTRlIXwxHRFyHxy6BdEib/ +Dn0mEVbwg3dTvKrd/sODWR9hRwpDGM9nkEbUNJCh7vMwFKkIZZF8yqFvmGckuO5C +HZkDJ6RkEDYrSZJAavQaiOPF5bu3cHughRfnrIKVrQuTTDiWjwX9Ny8e4p4k7dy7 +rLpbPhtxUOUbpOF7T1QxljDi1Tcq3Ebk3kN/ZLPRFnDrJfyUx+m9BXmAa78Wxs/l +KaS8DTkYykd3+EGOeJFjZg2bvgqil4V+5JIt/+MQ5pZ/ui7i4GcH2bvZyGAbrXzP +3LipSAdN5RG+fViLe3HUtfCx4ZAgtU78TWJrLk2FwKQGglFxKLnswp+IKZb09rZV +uxmG4pPLUnH+mMYdiy5ugzj+5C8iZ0/IstpHVmO6GWROfedpJ82eMztTOtdhfMep +8Z3HwAwkDtksL7Gq9klb0Wq5+uRlBWetixddAvnmqXNzYhaANWcAF/2a2Hz06Rb0 +e6pe/g0Ek5KV+6YI+D+oEblG0Sr+d4NtxtDTmIJKNVkmzlhI2s53bHp6txCb5JWJ +S8mKLPBBBzaNXYd3odDvGXguuxUntWSsD11KyR6B9DXMIfWQW5dT7hp5kTMGlXWJ +lD2hYab13DCCuAkwVTdpzhHYLZyxLYoSu05W6z8SAOs= +-----END RSA PRIVATE KEY----- diff --git a/tests/test_sftp.py b/tests/test_sftp.py new file mode 100644 index 0000000..993899a --- /dev/null +++ b/tests/test_sftp.py @@ -0,0 +1,740 @@ +# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net> +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +some unit tests to make sure sftp works. + +a real actual sftp server is contacted, and a new folder is created there to +do test file operations in (so no existing files will be harmed). +""" + +import logging +import os +import random +import sys +import threading +import time +import unittest + +import paramiko +from stub_sftp import StubServer, StubSFTPServer +from loop import LoopSocket + +ARTICLE = ''' +Insulin sensitivity and liver insulin receptor structure in ducks from two +genera + +T. Constans, B. Chevalier, M. Derouet and J. Simon +Station de Recherches Avicoles, Institut National de la Recherche Agronomique, +Nouzilly, France. + +Insulin sensitivity and liver insulin receptor structure were studied in +5-wk-old ducks from two genera (Muscovy and Pekin). In the fasting state, both +duck types were equally resistant to exogenous insulin compared with chicken. +Despite the low potency of duck insulin, the number of insulin receptors was +lower in Muscovy duck and similar in Pekin duck and chicken liver membranes. +After 125I-insulin cross-linking, the size of the alpha-subunit of the +receptors from the three species was 135,000. Wheat germ agglutinin-purified +receptors from the three species were contaminated by an active and unusual +adenosinetriphosphatase (ATPase) contaminant (highest activity in Muscovy +duck). Sequential purification of solubilized receptor from both duck types on +lentil and then wheat germ agglutinin lectins led to a fraction of receptors +very poor in ATPase activity that exhibited a beta-subunit size (95,000) and +tyrosine kinase activity similar to those of ATPase-free chicken insulin +receptors. Therefore the ducks from the two genera exhibit an alpha-beta- +structure for liver insulin receptors and a clear difference in the number of +liver insulin receptors. Their sensitivity to insulin is, however, similarly +decreased compared with chicken. +''' + +FOLDER = os.environ.get('TEST_FOLDER', 'temp-testing000') + +sftp = None +tc = None +g_big_file_test = True + + +class SFTPTest (unittest.TestCase): + + def init(hostname, username, keyfile, passwd): + global sftp, tc + + t = paramiko.Transport(hostname) + tc = t + try: + key = paramiko.RSAKey.from_private_key_file(keyfile, passwd) + except paramiko.PasswordRequiredException: + sys.stderr.write('\n\nparamiko.RSAKey.from_private_key_file REQUIRES PASSWORD.\n') + sys.stderr.write('You have two options:\n') + sys.stderr.write('* Use the "-K" option to point to a different (non-password-protected)\n') + sys.stderr.write(' private key file.\n') + sys.stderr.write('* Use the "-P" option to provide the password needed to unlock this private\n') + sys.stderr.write(' key.\n') + sys.stderr.write('\n') + sys.exit(1) + try: + t.connect(username=username, pkey=key) + except paramiko.SSHException: + t.close() + sys.stderr.write('\n\nparamiko.Transport.connect FAILED.\n') + sys.stderr.write('There are several possible reasons why it might fail so quickly:\n\n') + sys.stderr.write('* The host to connect to (%s) is not a valid SSH server.\n' % hostname) + sys.stderr.write(' (Use the "-H" option to change the host.)\n') + sys.stderr.write('* The username to auth as (%s) is invalid.\n' % username) + sys.stderr.write(' (Use the "-U" option to change the username.)\n') + sys.stderr.write('* The private key given (%s) is not accepted by the server.\n' % keyfile) + sys.stderr.write(' (Use the "-K" option to provide a different key file.)\n') + sys.stderr.write('\n') + sys.exit(1) + sftp = paramiko.SFTP.from_transport(t) + init = staticmethod(init) + + def init_loopback(): + global sftp, tc + + socks = LoopSocket() + sockc = LoopSocket() + sockc.link(socks) + tc = paramiko.Transport(sockc) + ts = paramiko.Transport(socks) + + host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') + ts.add_server_key(host_key) + event = threading.Event() + server = StubServer() + ts.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer) + ts.start_server(event, server) + tc.connect(username='slowdive', password='pygmalion') + event.wait(1.0) + + sftp = paramiko.SFTP.from_transport(tc) + init_loopback = staticmethod(init_loopback) + + def set_big_file_test(onoff): + global g_big_file_test + g_big_file_test = onoff + set_big_file_test = staticmethod(set_big_file_test) + + def setUp(self): + global FOLDER + for i in xrange(1000): + FOLDER = FOLDER[:-3] + '%03d' % i + try: + sftp.mkdir(FOLDER) + break + except (IOError, OSError): + pass + + def tearDown(self): + sftp.rmdir(FOLDER) + + def test_1_file(self): + """ + verify that we can create a file. + """ + f = sftp.open(FOLDER + '/test', 'w') + try: + self.assertEqual(f.stat().st_size, 0) + f.close() + finally: + sftp.remove(FOLDER + '/test') + + def test_2_close(self): + """ + verify that closing the sftp session doesn't do anything bad, and that + a new one can be opened. + """ + global sftp + sftp.close() + try: + sftp.open(FOLDER + '/test2', 'w') + self.fail('expected exception') + except: + pass + sftp = paramiko.SFTP.from_transport(tc) + + def test_3_write(self): + """ + verify that a file can be created and written, and the size is correct. + """ + f = sftp.open(FOLDER + '/duck.txt', 'w') + try: + f.write(ARTICLE) + f.close() + self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483) + finally: + sftp.remove(FOLDER + '/duck.txt') + + def test_4_append(self): + """ + verify that a file can be opened for append, and tell() still works. + """ + f = sftp.open(FOLDER + '/append.txt', 'w') + try: + f.write('first line\nsecond line\n') + self.assertEqual(f.tell(), 23) + f.close() + + f = sftp.open(FOLDER + '/append.txt', 'a+') + f.write('third line!!!\n') + self.assertEqual(f.tell(), 37) + self.assertEqual(f.stat().st_size, 37) + f.seek(-26, f.SEEK_CUR) + self.assertEqual(f.readline(), 'second line\n') + f.close() + finally: + sftp.remove(FOLDER + '/append.txt') + + def test_5_rename(self): + """ + verify that renaming a file works. + """ + f = sftp.open(FOLDER + '/first.txt', 'w') + try: + f.write('content!\n'); + f.close() + sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt') + try: + f = sftp.open(FOLDER + '/first.txt', 'r') + self.assert_(False, 'no exception on reading nonexistent file') + except IOError: + pass + f = sftp.open(FOLDER + '/second.txt', 'r') + f.seek(-6, f.SEEK_END) + self.assertEqual(f.read(4), 'tent') + f.close() + finally: + try: + sftp.remove(FOLDER + '/first.txt') + except: + pass + try: + sftp.remove(FOLDER + '/second.txt') + except: + pass + + def test_6_folder(self): + """ + create a temporary folder, verify that we can create a file in it, then + remove the folder and verify that we can't create a file in it anymore. + """ + sftp.mkdir(FOLDER + '/subfolder') + f = sftp.open(FOLDER + '/subfolder/test', 'w') + f.close() + sftp.remove(FOLDER + '/subfolder/test') + sftp.rmdir(FOLDER + '/subfolder') + try: + f = sftp.open(FOLDER + '/subfolder/test') + # shouldn't be able to create that file + self.assert_(False, 'no exception at dummy file creation') + except IOError: + pass + + def test_7_listdir(self): + """ + verify that a folder can be created, a bunch of files can be placed in it, + and those files show up in sftp.listdir. + """ + try: + f = sftp.open(FOLDER + '/duck.txt', 'w') + f.close() + + f = sftp.open(FOLDER + '/fish.txt', 'w') + f.close() + + f = sftp.open(FOLDER + '/tertiary.py', 'w') + f.close() + + x = sftp.listdir(FOLDER) + self.assertEqual(len(x), 3) + self.assert_('duck.txt' in x) + self.assert_('fish.txt' in x) + self.assert_('tertiary.py' in x) + self.assert_('random' not in x) + finally: + sftp.remove(FOLDER + '/duck.txt') + sftp.remove(FOLDER + '/fish.txt') + sftp.remove(FOLDER + '/tertiary.py') + + def test_8_setstat(self): + """ + verify that the setstat functions (chown, chmod, utime) work. + """ + f = sftp.open(FOLDER + '/special', 'w') + try: + f.close() + + stat = sftp.stat(FOLDER + '/special') + sftp.chmod(FOLDER + '/special', (stat.st_mode & ~0777) | 0600) + self.assertEqual(sftp.stat(FOLDER + '/special').st_mode & 0777, 0600) + + mtime = stat.st_mtime - 3600 + atime = stat.st_atime - 1800 + sftp.utime(FOLDER + '/special', (atime, mtime)) + nstat = sftp.stat(FOLDER + '/special') + self.assertEqual(nstat.st_mtime, mtime) + self.assertEqual(nstat.st_atime, atime) + + # can't really test chown, since we'd have to know a valid uid. + finally: + sftp.remove(FOLDER + '/special') + + def test_9_readline_seek(self): + """ + create a text file and write a bunch of text into it. then count the lines + in the file, and seek around to retreive particular lines. this should + verify that read buffering and 'tell' work well together, and that read + buffering is reset on 'seek'. + """ + try: + f = sftp.open(FOLDER + '/duck.txt', 'w') + f.write(ARTICLE) + f.close() + + f = sftp.open(FOLDER + '/duck.txt', 'r+') + line_number = 0 + loc = 0 + pos_list = [] + for line in f: + line_number += 1 + pos_list.append(loc) + loc = f.tell() + f.seek(pos_list[6], f.SEEK_SET) + self.assertEqual(f.readline(), 'Nouzilly, France.\n') + f.seek(pos_list[17], f.SEEK_SET) + self.assertEqual(f.readline()[:4], 'duck') + f.seek(pos_list[10], f.SEEK_SET) + self.assertEqual(f.readline(), 'duck types were equally resistant to exogenous insulin compared with chicken.\n') + f.close() + finally: + sftp.remove(FOLDER + '/duck.txt') + + def test_A_write_seek(self): + """ + create a text file, seek back and change part of it, and verify that the + changes worked. + """ + f = sftp.open(FOLDER + '/testing.txt', 'w') + try: + f.write('hello kitty.\n') + f.seek(-5, f.SEEK_CUR) + f.write('dd') + f.close() + + self.assertEqual(sftp.stat(FOLDER + '/testing.txt').st_size, 13) + f = sftp.open(FOLDER + '/testing.txt', 'r') + data = f.read(20) + f.close() + self.assertEqual(data, 'hello kiddy.\n') + finally: + sftp.remove(FOLDER + '/testing.txt') + + def test_B_symlink(self): + """ + create a symlink and then check that lstat doesn't follow it. + """ + f = sftp.open(FOLDER + '/original.txt', 'w') + try: + f.write('original\n') + f.close() + sftp.symlink('original.txt', FOLDER + '/link.txt') + self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt') + + f = sftp.open(FOLDER + '/link.txt', 'r') + self.assertEqual(f.readlines(), [ 'original\n' ]) + f.close() + + cwd = sftp.normalize('.') + if cwd[-1] == '/': + cwd = cwd[:-1] + abs_path = cwd + '/' + FOLDER + '/original.txt' + sftp.symlink(abs_path, FOLDER + '/link2.txt') + self.assertEqual(abs_path, sftp.readlink(FOLDER + '/link2.txt')) + + self.assertEqual(sftp.lstat(FOLDER + '/link.txt').st_size, 12) + self.assertEqual(sftp.stat(FOLDER + '/link.txt').st_size, 9) + # the sftp server may be hiding extra path members from us, so the + # length may be longer than we expect: + self.assert_(sftp.lstat(FOLDER + '/link2.txt').st_size >= len(abs_path)) + self.assertEqual(sftp.stat(FOLDER + '/link2.txt').st_size, 9) + self.assertEqual(sftp.stat(FOLDER + '/original.txt').st_size, 9) + finally: + try: + sftp.remove(FOLDER + '/link.txt') + except: + pass + try: + sftp.remove(FOLDER + '/link2.txt') + except: + pass + try: + sftp.remove(FOLDER + '/original.txt') + except: + pass + + def test_C_flush_seek(self): + """ + verify that buffered writes are automatically flushed on seek. + """ + f = sftp.open(FOLDER + '/happy.txt', 'w', 1) + try: + f.write('full line.\n') + f.write('partial') + f.seek(9, f.SEEK_SET) + f.write('?\n') + f.close() + + f = sftp.open(FOLDER + '/happy.txt', 'r') + self.assertEqual(f.readline(), 'full line?\n') + self.assertEqual(f.read(7), 'partial') + f.close() + finally: + try: + sftp.remove(FOLDER + '/happy.txt') + except: + pass + + def test_D_lots_of_files(self): + """ + create a bunch of files over the same session. + """ + global g_big_file_test + if not g_big_file_test: + return + numfiles = 100 + try: + for i in range(numfiles): + f = sftp.open('%s/file%d.txt' % (FOLDER, i), 'w', 1) + f.write('this is file #%d.\n' % i) + f.close() + sftp.chmod('%s/file%d.txt' % (FOLDER, i), 0660) + + # now make sure every file is there, by creating a list of filenmes + # and reading them in random order. + numlist = range(numfiles) + while len(numlist) > 0: + r = numlist[random.randint(0, len(numlist) - 1)] + f = sftp.open('%s/file%d.txt' % (FOLDER, r)) + self.assertEqual(f.readline(), 'this is file #%d.\n' % r) + f.close() + numlist.remove(r) + finally: + for i in range(numfiles): + try: + sftp.remove('%s/file%d.txt' % (FOLDER, i)) + except: + pass + + def test_E_big_file(self): + """ + write a 1MB file with no buffering. + """ + global g_big_file_test + if not g_big_file_test: + return + kblob = (1024 * 'x') + start = time.time() + try: + f = sftp.open('%s/hongry.txt' % FOLDER, 'w') + for n in range(1024): + f.write(kblob) + if n % 128 == 0: + sys.stderr.write('.') + f.close() + sys.stderr.write(' ') + + self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + end = time.time() + sys.stderr.write('%ds ' % round(end - start)) + + start = time.time() + f = sftp.open('%s/hongry.txt' % FOLDER, 'r') + for n in range(1024): + data = f.read(1024) + self.assertEqual(data, kblob) + f.close() + + end = time.time() + sys.stderr.write('%ds ' % round(end - start)) + finally: + sftp.remove('%s/hongry.txt' % FOLDER) + + def test_F_big_file_pipelined(self): + """ + write a 1MB file, with no linefeeds, using pipelining. + """ + global g_big_file_test + if not g_big_file_test: + return + kblob = (1024 * 'x') + start = time.time() + try: + f = sftp.open('%s/hongry.txt' % FOLDER, 'w') + f.set_pipelined(True) + for n in range(1024): + f.write(kblob) + if n % 128 == 0: + sys.stderr.write('.') + f.close() + sys.stderr.write(' ') + + self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + end = time.time() + sys.stderr.write('%ds ' % round(end - start)) + + start = time.time() + f = sftp.open('%s/hongry.txt' % FOLDER, 'r') + f.prefetch() + for n in range(1024): + data = f.read(1024) + self.assertEqual(data, kblob) + f.close() + + end = time.time() + sys.stderr.write('%ds ' % round(end - start)) + finally: + sftp.remove('%s/hongry.txt' % FOLDER) + + def test_G_lots_of_prefetching(self): + """ + prefetch a 1MB file a bunch of times, discarding the file object + without using it, to verify that paramiko doesn't get confused. + """ + global g_big_file_test + if not g_big_file_test: + return + kblob = (1024 * 'x') + try: + f = sftp.open('%s/hongry.txt' % FOLDER, 'w') + f.set_pipelined(True) + for n in range(1024): + f.write(kblob) + if n % 128 == 0: + sys.stderr.write('.') + f.close() + sys.stderr.write(' ') + + self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + + for i in range(10): + f = sftp.open('%s/hongry.txt' % FOLDER, 'r') + f.prefetch() + f = sftp.open('%s/hongry.txt' % FOLDER, 'r') + f.prefetch() + for n in range(1024): + data = f.read(1024) + self.assertEqual(data, kblob) + if n % 128 == 0: + sys.stderr.write('.') + f.close() + sys.stderr.write(' ') + finally: + sftp.remove('%s/hongry.txt' % FOLDER) + + def test_H_big_file_big_buffer(self): + """ + write a 1MB file, with no linefeeds, and a big buffer. + """ + global g_big_file_test + if not g_big_file_test: + return + mblob = (1024 * 1024 * 'x') + try: + f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) + f.write(mblob) + f.close() + + self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + finally: + sftp.remove('%s/hongry.txt' % FOLDER) + + def test_I_big_file_renegotiate(self): + """ + write a 1MB file, forcing key renegotiation in the middle. + """ + global g_big_file_test + if not g_big_file_test: + return + t = sftp.sock.get_transport() + t.packetizer.REKEY_BYTES = 512 * 1024 + k32blob = (32 * 1024 * 'x') + try: + f = sftp.open('%s/hongry.txt' % FOLDER, 'w', 128 * 1024) + for i in xrange(32): + f.write(k32blob) + f.close() + + self.assertEqual(sftp.stat('%s/hongry.txt' % FOLDER).st_size, 1024 * 1024) + self.assertNotEquals(t.H, t.session_id) + finally: + sftp.remove('%s/hongry.txt' % FOLDER) + t.packetizer.REKEY_BYTES = pow(2, 30) + + def test_J_realpath(self): + """ + test that realpath is returning something non-empty and not an + error. + """ + pwd = sftp.normalize('.') + self.assert_(len(pwd) > 0) + f = sftp.normalize('./' + FOLDER) + self.assert_(len(f) > 0) + self.assertEquals(os.path.join(pwd, FOLDER), f) + + def test_K_mkdir(self): + """ + verify that mkdir/rmdir work. + """ + try: + sftp.mkdir(FOLDER + '/subfolder') + except: + self.assert_(False, 'exception creating subfolder') + try: + sftp.mkdir(FOLDER + '/subfolder') + self.assert_(False, 'no exception overwriting subfolder') + except IOError: + pass + try: + sftp.rmdir(FOLDER + '/subfolder') + except: + self.assert_(False, 'exception removing subfolder') + try: + sftp.rmdir(FOLDER + '/subfolder') + self.assert_(False, 'no exception removing nonexistent subfolder') + except IOError: + pass + + def test_L_chdir(self): + """ + verify that chdir/getcwd work. + """ + root = sftp.normalize('.') + if root[-1] != '/': + root += '/' + try: + sftp.mkdir(FOLDER + '/alpha') + sftp.chdir(FOLDER + '/alpha') + sftp.mkdir('beta') + self.assertEquals(root + FOLDER + '/alpha', sftp.getcwd()) + self.assertEquals(['beta'], sftp.listdir('.')) + + sftp.chdir('beta') + f = sftp.open('fish', 'w') + f.write('hello\n') + f.close() + sftp.chdir('..') + self.assertEquals(['fish'], sftp.listdir('beta')) + sftp.chdir('..') + self.assertEquals(['fish'], sftp.listdir('alpha/beta')) + finally: + sftp.chdir(root) + try: + sftp.unlink(FOLDER + '/alpha/beta/fish') + except: + pass + try: + sftp.rmdir(FOLDER + '/alpha/beta') + except: + pass + try: + sftp.rmdir(FOLDER + '/alpha') + except: + pass + + def test_M_get_put(self): + """ + verify that get/put work. + """ + import os, warnings + warnings.filterwarnings('ignore', 'tempnam.*') + + localname = os.tempnam() + text = 'All I wanted was a plastic bunny rabbit.\n' + f = open(localname, 'w') + f.write(text) + f.close() + sftp.put(localname, FOLDER + '/bunny.txt') + + f = sftp.open(FOLDER + '/bunny.txt', 'r') + self.assertEquals(text, f.read(128)) + f.close() + + os.unlink(localname) + localname = os.tempnam() + sftp.get(FOLDER + '/bunny.txt', localname) + + f = open(localname, 'r') + self.assertEquals(text, f.read(128)) + f.close() + + os.unlink(localname) + sftp.unlink(FOLDER + '/bunny.txt') + + def test_N_check(self): + """ + verify that file.check() works against our own server. + (it's an sftp extension that we support, and may be the only ones who + support it.) + """ + f = sftp.open(FOLDER + '/kitty.txt', 'w') + f.write('here kitty kitty' * 64) + f.close() + + try: + f = sftp.open(FOLDER + '/kitty.txt', 'r') + sum = f.check('sha1') + self.assertEquals('91059CFC6615941378D413CB5ADAF4C5EB293402', paramiko.util.hexify(sum)) + sum = f.check('md5', 0, 512) + self.assertEquals('93DE4788FCA28D471516963A1FE3856A', paramiko.util.hexify(sum)) + sum = f.check('md5', 0, 0, 510) + self.assertEquals('EB3B45B8CD55A0707D99B177544A319F373183D241432BB2157AB9E46358C4AC90370B5CADE5D90336FC1716F90B36D6', + paramiko.util.hexify(sum)) + finally: + sftp.unlink(FOLDER + '/kitty.txt') + + def test_O_x_flag(self): + """ + verify that the 'x' flag works when opening a file. + """ + f = sftp.open(FOLDER + '/unusual.txt', 'wx') + f.close() + + try: + try: + f = sftp.open(FOLDER + '/unusual.txt', 'wx') + self.fail('expected exception') + except IOError, x: + pass + finally: + sftp.unlink(FOLDER + '/unusual.txt') + + def test_P_utf8(self): + """ + verify that unicode strings are encoded into utf8 correctly. + """ + f = sftp.open(FOLDER + '/something', 'w') + f.write('okay') + f.close() + + try: + sftp.rename(FOLDER + '/something', FOLDER + u'/\u00fcnic\u00f8de') + sftp.open(FOLDER + '/\xc3\xbcnic\xc3\xb8\x64\x65', 'r') + except Exception, e: + self.fail('exception ' + e) + sftp.unlink(FOLDER + '/\xc3\xbcnic\xc3\xb8\x64\x65') + diff --git a/tests/test_transport.py b/tests/test_transport.py new file mode 100644 index 0000000..5fcc786 --- /dev/null +++ b/tests/test_transport.py @@ -0,0 +1,573 @@ +# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net> +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +Some unit tests for the ssh2 protocol in Transport. +""" + +import sys, time, threading, unittest +import select +from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey, \ + SSHException, BadAuthenticationType, InteractiveQuery, util +from paramiko import AUTH_FAILED, AUTH_PARTIALLY_SUCCESSFUL, AUTH_SUCCESSFUL +from paramiko import OPEN_SUCCEEDED +from loop import LoopSocket + + +class NullServer (ServerInterface): + paranoid_did_password = False + paranoid_did_public_key = False + paranoid_key = DSSKey.from_private_key_file('tests/test_dss.key') + + def get_allowed_auths(self, username): + if username == 'slowdive': + return 'publickey,password' + if username == 'paranoid': + if not self.paranoid_did_password and not self.paranoid_did_public_key: + return 'publickey,password' + elif self.paranoid_did_password: + return 'publickey' + else: + return 'password' + if username == 'commie': + return 'keyboard-interactive' + return 'publickey' + + def check_auth_password(self, username, password): + if (username == 'slowdive') and (password == 'pygmalion'): + return AUTH_SUCCESSFUL + if (username == 'paranoid') and (password == 'paranoid'): + # 2-part auth (even openssh doesn't support this) + self.paranoid_did_password = True + if self.paranoid_did_public_key: + return AUTH_SUCCESSFUL + return AUTH_PARTIALLY_SUCCESSFUL + return AUTH_FAILED + + def check_auth_publickey(self, username, key): + if (username == 'paranoid') and (key == self.paranoid_key): + # 2-part auth + self.paranoid_did_public_key = True + if self.paranoid_did_password: + return AUTH_SUCCESSFUL + return AUTH_PARTIALLY_SUCCESSFUL + return AUTH_FAILED + + def check_auth_interactive(self, username, submethods): + if username == 'commie': + self.username = username + return InteractiveQuery('password', 'Please enter a password.', ('Password', False)) + return AUTH_FAILED + + def check_auth_interactive_response(self, responses): + if self.username == 'commie': + if (len(responses) == 1) and (responses[0] == 'cat'): + return AUTH_SUCCESSFUL + return AUTH_FAILED + + def check_channel_request(self, kind, chanid): + return OPEN_SUCCEEDED + + def check_channel_exec_request(self, channel, command): + if command != 'yes': + return False + return True + + def check_channel_shell_request(self, channel): + return True + + def check_global_request(self, kind, msg): + self._global_request = kind + return False + + +class TransportTest (unittest.TestCase): + + def setUp(self): + self.socks = LoopSocket() + self.sockc = LoopSocket() + self.sockc.link(self.socks) + self.tc = Transport(self.sockc) + self.ts = Transport(self.socks) + + def tearDown(self): + self.tc.close() + self.ts.close() + self.socks.close() + self.sockc.close() + + def test_1_security_options(self): + o = self.tc.get_security_options() + self.assertEquals(type(o), SecurityOptions) + self.assert_(('aes256-cbc', 'blowfish-cbc') != o.ciphers) + o.ciphers = ('aes256-cbc', 'blowfish-cbc') + self.assertEquals(('aes256-cbc', 'blowfish-cbc'), o.ciphers) + try: + o.ciphers = ('aes256-cbc', 'made-up-cipher') + self.assert_(False) + except ValueError: + pass + try: + o.ciphers = 23 + self.assert_(False) + except TypeError: + pass + + def test_2_compute_key(self): + self.tc.K = 123281095979686581523377256114209720774539068973101330872763622971399429481072519713536292772709507296759612401802191955568143056534122385270077606457721553469730659233569339356140085284052436697480759510519672848743794433460113118986816826624865291116513647975790797391795651716378444844877749505443714557929L + self.tc.H = util.unhexify('0C8307CDE6856FF30BA93684EB0F04C2520E9ED3') + self.tc.session_id = self.tc.H + key = self.tc._compute_key('C', 32) + self.assertEquals('207E66594CA87C44ECCBA3B3CD39FDDB378E6FDB0F97C54B2AA0CFBF900CD995', + util.hexify(key)) + + def test_3_simple(self): + """ + verify that we can establish an ssh link with ourselves across the + loopback sockets. this is hardly "simple" but it's simpler than the + later tests. :) + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assert_(not event.isSet()) + self.assertEquals(None, self.tc.get_username()) + self.assertEquals(None, self.ts.get_username()) + self.assertEquals(False, self.tc.is_authenticated()) + self.assertEquals(False, self.ts.is_authenticated()) + self.ts.start_server(event, server) + self.tc.connect(hostkey=public_host_key, + username='slowdive', password='pygmalion') + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + self.assertEquals('slowdive', self.tc.get_username()) + self.assertEquals('slowdive', self.ts.get_username()) + self.assertEquals(True, self.tc.is_authenticated()) + self.assertEquals(True, self.ts.is_authenticated()) + + def test_4_special(self): + """ + verify that the client can demand odd handshake settings, and can + renegotiate keys in mid-stream. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assert_(not event.isSet()) + self.ts.start_server(event, server) + options = self.tc.get_security_options() + options.ciphers = ('aes256-cbc',) + options.digests = ('hmac-md5-96',) + self.tc.connect(hostkey=public_host_key, + username='slowdive', password='pygmalion') + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + self.assertEquals('aes256-cbc', self.tc.local_cipher) + self.assertEquals('aes256-cbc', self.tc.remote_cipher) + self.assertEquals(12, self.tc.packetizer.get_mac_size_out()) + self.assertEquals(12, self.tc.packetizer.get_mac_size_in()) + + self.tc.send_ignore(1024) + self.assert_(self.tc.renegotiate_keys()) + self.ts.send_ignore(1024) + + def test_5_keepalive(self): + """ + verify that the keepalive will be sent. + """ + self.tc.set_hexdump(True) + + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assert_(not event.isSet()) + self.ts.start_server(event, server) + self.tc.connect(hostkey=public_host_key, + username='slowdive', password='pygmalion') + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + + self.assertEquals(None, getattr(server, '_global_request', None)) + self.tc.set_keepalive(1) + time.sleep(2) + self.assertEquals('keepalive@lag.net', server._global_request) + + def test_6_bad_auth_type(self): + """ + verify that we get the right exception when an unsupported auth + type is requested. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assert_(not event.isSet()) + self.ts.start_server(event, server) + try: + self.tc.connect(hostkey=public_host_key, + username='unknown', password='error') + self.assert_(False) + except: + etype, evalue, etb = sys.exc_info() + self.assertEquals(BadAuthenticationType, etype) + self.assertEquals(['publickey'], evalue.allowed_types) + + def test_7_bad_password(self): + """ + verify that a bad password gets the right exception, and that a retry + with the right password works. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assert_(not event.isSet()) + self.ts.start_server(event, server) + self.tc.ultra_debug = True + self.tc.connect(hostkey=public_host_key) + try: + self.tc.auth_password(username='slowdive', password='error') + self.assert_(False) + except: + etype, evalue, etb = sys.exc_info() + self.assertEquals(SSHException, etype) + self.tc.auth_password(username='slowdive', password='pygmalion') + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + + def test_8_multipart_auth(self): + """ + verify that multipart auth works. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assert_(not event.isSet()) + self.ts.start_server(event, server) + self.tc.ultra_debug = True + self.tc.connect(hostkey=public_host_key) + remain = self.tc.auth_password(username='paranoid', password='paranoid') + self.assertEquals(['publickey'], remain) + key = DSSKey.from_private_key_file('tests/test_dss.key') + remain = self.tc.auth_publickey(username='paranoid', key=key) + self.assertEquals([], remain) + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + + def test_9_interactive_auth(self): + """ + verify keyboard-interactive auth works. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assert_(not event.isSet()) + self.ts.start_server(event, server) + self.tc.ultra_debug = True + self.tc.connect(hostkey=public_host_key) + + def handler(title, instructions, prompts): + self.got_title = title + self.got_instructions = instructions + self.got_prompts = prompts + return ['cat'] + remain = self.tc.auth_interactive('commie', handler) + self.assertEquals(self.got_title, 'password') + self.assertEquals(self.got_prompts, [('Password', False)]) + self.assertEquals([], remain) + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + + def test_A_interactive_auth_fallback(self): + """ + verify that a password auth attempt will fallback to "interactive" + if password auth isn't supported but interactive is. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assert_(not event.isSet()) + self.ts.start_server(event, server) + self.tc.ultra_debug = True + self.tc.connect(hostkey=public_host_key) + remain = self.tc.auth_password('commie', 'cat') + self.assertEquals([], remain) + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + + def test_B_exec_command(self): + """ + verify that exec_command() does something reasonable. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assert_(not event.isSet()) + self.ts.start_server(event, server) + self.tc.ultra_debug = True + self.tc.connect(hostkey=public_host_key) + self.tc.auth_password(username='slowdive', password='pygmalion') + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + + chan = self.tc.open_session() + schan = self.ts.accept(1.0) + self.assert_(not chan.exec_command('no')) + + chan = self.tc.open_session() + self.assert_(chan.exec_command('yes')) + schan = self.ts.accept(1.0) + schan.send('Hello there.\n') + schan.send_stderr('This is on stderr.\n') + schan.close() + + f = chan.makefile() + self.assertEquals('Hello there.\n', f.readline()) + self.assertEquals('', f.readline()) + f = chan.makefile_stderr() + self.assertEquals('This is on stderr.\n', f.readline()) + self.assertEquals('', f.readline()) + + # now try it with combined stdout/stderr + chan = self.tc.open_session() + self.assert_(chan.exec_command('yes')) + schan = self.ts.accept(1.0) + schan.send('Hello there.\n') + schan.send_stderr('This is on stderr.\n') + schan.close() + + chan.set_combine_stderr(True) + f = chan.makefile() + self.assertEquals('Hello there.\n', f.readline()) + self.assertEquals('This is on stderr.\n', f.readline()) + self.assertEquals('', f.readline()) + + def test_C_invoke_shell(self): + """ + verify that invoke_shell() does something reasonable. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assert_(not event.isSet()) + self.ts.start_server(event, server) + self.tc.ultra_debug = True + self.tc.connect(hostkey=public_host_key) + self.tc.auth_password(username='slowdive', password='pygmalion') + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + + chan = self.tc.open_session() + self.assert_(chan.invoke_shell()) + schan = self.ts.accept(1.0) + chan.send('communist j. cat\n') + f = schan.makefile() + self.assertEquals('communist j. cat\n', f.readline()) + chan.close() + self.assertEquals('', f.readline()) + + def test_D_exit_status(self): + """ + verify that get_exit_status() works. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assert_(not event.isSet()) + self.ts.start_server(event, server) + self.tc.ultra_debug = True + self.tc.connect(hostkey=public_host_key) + self.tc.auth_password(username='slowdive', password='pygmalion') + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + + chan = self.tc.open_session() + schan = self.ts.accept(1.0) + self.assert_(chan.exec_command('yes')) + schan.send('Hello there.\n') + # trigger an EOF + schan.shutdown_read() + schan.shutdown_write() + schan.send_exit_status(23) + schan.close() + + f = chan.makefile() + self.assertEquals('Hello there.\n', f.readline()) + self.assertEquals('', f.readline()) + self.assertEquals(23, chan.recv_exit_status()) + chan.close() + + def test_E_select(self): + """ + verify that select() on a channel works. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.assert_(not event.isSet()) + self.ts.start_server(event, server) + self.tc.ultra_debug = True + self.tc.connect(hostkey=public_host_key) + self.tc.auth_password(username='slowdive', password='pygmalion') + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + + chan = self.tc.open_session() + self.assert_(chan.invoke_shell()) + schan = self.ts.accept(1.0) + + # nothing should be ready + r, w, e = select.select([chan], [], [], 0.1) + self.assertEquals([], r) + self.assertEquals([], w) + self.assertEquals([], e) + + schan.send('hello\n') + + # something should be ready now (give it 1 second to appear) + for i in range(10): + r, w, e = select.select([chan], [], [], 0.1) + if chan in r: + break + time.sleep(0.1) + self.assertEquals([chan], r) + self.assertEquals([], w) + self.assertEquals([], e) + + self.assertEquals('hello\n', chan.recv(6)) + + # and, should be dead again now + r, w, e = select.select([chan], [], [], 0.1) + self.assertEquals([], r) + self.assertEquals([], w) + self.assertEquals([], e) + + schan.close() + + # detect eof? + for i in range(10): + r, w, e = select.select([chan], [], [], 0.1) + if chan in r: + break + time.sleep(0.1) + self.assertEquals([chan], r) + self.assertEquals([], w) + self.assertEquals([], e) + self.assertEquals('', chan.recv(16)) + + chan.close() + + def test_F_renegotiate(self): + """ + verify that a transport can correctly renegotiate mid-stream. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + event = threading.Event() + server = NullServer() + self.ts.start_server(event, server) + self.tc.connect(hostkey=public_host_key, + username='slowdive', password='pygmalion') + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + + self.tc.packetizer.REKEY_BYTES = 16384 + + chan = self.tc.open_session() + self.assert_(chan.exec_command('yes')) + schan = self.ts.accept(1.0) + + self.assertEquals(self.tc.H, self.tc.session_id) + for i in range(20): + chan.send('x' * 1024) + chan.close() + + # allow a few seconds for the rekeying to complete + for i in xrange(50): + if self.tc.H != self.tc.session_id: + break + time.sleep(0.1) + self.assertNotEquals(self.tc.H, self.tc.session_id) + + schan.close() + + def test_G_compression(self): + """ + verify that zlib compression is basically working. + """ + host_key = RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = RSAKey(data=str(host_key)) + self.ts.add_server_key(host_key) + self.ts.get_security_options().compression = ('zlib',) + self.tc.get_security_options().compression = ('zlib',) + event = threading.Event() + server = NullServer() + self.ts.start_server(event, server) + self.tc.connect(hostkey=public_host_key, + username='slowdive', password='pygmalion') + event.wait(1.0) + self.assert_(event.isSet()) + self.assert_(self.ts.is_active()) + + chan = self.tc.open_session() + self.assert_(chan.exec_command('yes')) + schan = self.ts.accept(1.0) + + bytes = self.tc.packetizer._Packetizer__sent_bytes + chan.send('x' * 1024) + bytes2 = self.tc.packetizer._Packetizer__sent_bytes + # tests show this is actually compressed to *52 bytes*! including packet overhead! nice!! :) + self.assert_(bytes2 - bytes < 1024) + + chan.close() + schan.close() diff --git a/tests/test_util.py b/tests/test_util.py new file mode 100644 index 0000000..fa8c029 --- /dev/null +++ b/tests/test_util.py @@ -0,0 +1,80 @@ +#!/usr/bin/python + +# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net> +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +Some unit tests for utility functions. +""" + +import cStringIO +import unittest +from Crypto.Hash import SHA +import paramiko.util + + +test_config_file = """\ +Host * + User robey + IdentityFile =~/.ssh/id_rsa + +# comment +Host *.example.com + \tUser bjork +Port=3333 +Host * + \t \t Crazy something dumb +Host spoo.example.com +Crazy something else +""" + + +class UtilTest (unittest.TestCase): + + K = 14730343317708716439807310032871972459448364195094179797249681733965528989482751523943515690110179031004049109375612685505881911274101441415545039654102474376472240501616988799699744135291070488314748284283496055223852115360852283821334858541043710301057312858051901453919067023103730011648890038847384890504L + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_1_parse_config(self): + global test_config_file + f = cStringIO.StringIO(test_config_file) + config = paramiko.util.parse_ssh_config(f) + self.assertEquals(config, [ {'identityfile': '~/.ssh/id_rsa', 'host': '*', 'user': 'robey', + 'crazy': 'something dumb '}, + {'host': '*.example.com', 'user': 'bjork', 'port': '3333'}, + {'host': 'spoo.example.com', 'crazy': 'something else'}]) + + def test_2_host_config(self): + global test_config_file + f = cStringIO.StringIO(test_config_file) + config = paramiko.util.parse_ssh_config(f) + c = paramiko.util.lookup_ssh_host_config('irc.danger.com', config) + self.assertEquals(c, {'identityfile': '~/.ssh/id_rsa', 'user': 'robey', 'crazy': 'something dumb '}) + c = paramiko.util.lookup_ssh_host_config('irc.example.com', config) + self.assertEquals(c, {'identityfile': '~/.ssh/id_rsa', 'user': 'bjork', 'crazy': 'something dumb ', 'port': '3333'}) + c = paramiko.util.lookup_ssh_host_config('spoo.example.com', config) + self.assertEquals(c, {'identityfile': '~/.ssh/id_rsa', 'user': 'bjork', 'crazy': 'something else', 'port': '3333'}) + + def test_3_generate_key_bytes(self): + x = paramiko.util.generate_key_bytes(SHA, 'ABCDEFGH', 'This is my secret passphrase.', 64) + hex = ''.join(['%02x' % ord(c) for c in x]) + self.assertEquals(hex, '9110e2f6793b69363e58173e9436b13a5a4b339005741d5c680e505f57d871347b4239f14fb5c46e857d5e100424873ba849ac699cea98d729e57b3e84378e8b') |