From ed280d5ac360e2af796e9bd973d7b4df89f0c449 Mon Sep 17 00:00:00 2001 From: "Jeremy T. Bouse" Date: Fri, 27 Nov 2009 16:20:12 -0500 Subject: Imported Upstream version 1.7.4 --- tests/test_client.py | 214 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 tests/test_client.py (limited to 'tests/test_client.py') diff --git a/tests/test_client.py b/tests/test_client.py new file mode 100644 index 0000000..59cd67c --- /dev/null +++ b/tests/test_client.py @@ -0,0 +1,214 @@ +# Copyright (C) 2003-2007 Robey Pointer +# +# This file is part of paramiko. +# +# Paramiko is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; either version 2.1 of the License, or (at your option) +# any later version. +# +# Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Paramiko; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + +""" +Some unit tests for SSHClient. +""" + +import socket +import threading +import time +import unittest +import weakref +from binascii import hexlify + +import paramiko + + +class NullServer (paramiko.ServerInterface): + + def get_allowed_auths(self, username): + if username == 'slowdive': + return 'publickey,password' + return 'publickey' + + def check_auth_password(self, username, password): + if (username == 'slowdive') and (password == 'pygmalion'): + return paramiko.AUTH_SUCCESSFUL + return paramiko.AUTH_FAILED + + def check_auth_publickey(self, username, key): + if (key.get_name() == 'ssh-dss') and (hexlify(key.get_fingerprint()) == '4478f0b9a23cc5182009ff755bc1d26c'): + return paramiko.AUTH_SUCCESSFUL + return paramiko.AUTH_FAILED + + def check_channel_request(self, kind, chanid): + return paramiko.OPEN_SUCCEEDED + + def check_channel_exec_request(self, channel, command): + if command != 'yes': + return False + return True + + +class SSHClientTest (unittest.TestCase): + + def setUp(self): + self.sockl = socket.socket() + self.sockl.bind(('localhost', 0)) + self.sockl.listen(1) + self.addr, self.port = self.sockl.getsockname() + self.event = threading.Event() + thread = threading.Thread(target=self._run) + thread.start() + + def tearDown(self): + if hasattr(self, 'tc'): + self.tc.close() + self.ts.close() + self.socks.close() + self.sockl.close() + + def _run(self): + self.socks, addr = self.sockl.accept() + self.ts = paramiko.Transport(self.socks) + host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') + self.ts.add_server_key(host_key) + server = NullServer() + self.ts.start_server(self.event, server) + + + def test_1_client(self): + """ + verify that the SSHClient stuff works too. + """ + host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = paramiko.RSAKey(data=str(host_key)) + + self.tc = paramiko.SSHClient() + self.tc.get_host_keys().add(self.addr, 'ssh-rsa', public_host_key) + self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion') + + self.event.wait(1.0) + self.assert_(self.event.isSet()) + self.assert_(self.ts.is_active()) + self.assertEquals('slowdive', self.ts.get_username()) + self.assertEquals(True, self.ts.is_authenticated()) + + stdin, stdout, stderr = self.tc.exec_command('yes') + schan = self.ts.accept(1.0) + + schan.send('Hello there.\n') + schan.send_stderr('This is on stderr.\n') + schan.close() + + self.assertEquals('Hello there.\n', stdout.readline()) + self.assertEquals('', stdout.readline()) + self.assertEquals('This is on stderr.\n', stderr.readline()) + self.assertEquals('', stderr.readline()) + + stdin.close() + stdout.close() + stderr.close() + + def test_2_client_dsa(self): + """ + verify that SSHClient works with a DSA key. + """ + host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = paramiko.RSAKey(data=str(host_key)) + + self.tc = paramiko.SSHClient() + self.tc.get_host_keys().add(self.addr, 'ssh-rsa', public_host_key) + self.tc.connect(self.addr, self.port, username='slowdive', key_filename='tests/test_dss.key') + + self.event.wait(1.0) + self.assert_(self.event.isSet()) + self.assert_(self.ts.is_active()) + self.assertEquals('slowdive', self.ts.get_username()) + self.assertEquals(True, self.ts.is_authenticated()) + + stdin, stdout, stderr = self.tc.exec_command('yes') + schan = self.ts.accept(1.0) + + schan.send('Hello there.\n') + schan.send_stderr('This is on stderr.\n') + schan.close() + + self.assertEquals('Hello there.\n', stdout.readline()) + self.assertEquals('', stdout.readline()) + self.assertEquals('This is on stderr.\n', stderr.readline()) + self.assertEquals('', stderr.readline()) + + stdin.close() + stdout.close() + stderr.close() + + def test_3_multiple_key_files(self): + """ + verify that SSHClient accepts and tries multiple key files. + """ + host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = paramiko.RSAKey(data=str(host_key)) + + self.tc = paramiko.SSHClient() + self.tc.get_host_keys().add(self.addr, 'ssh-rsa', public_host_key) + self.tc.connect(self.addr, self.port, username='slowdive', key_filename=[ 'tests/test_rsa.key', 'tests/test_dss.key' ]) + + self.event.wait(1.0) + self.assert_(self.event.isSet()) + self.assert_(self.ts.is_active()) + self.assertEquals('slowdive', self.ts.get_username()) + self.assertEquals(True, self.ts.is_authenticated()) + + def test_4_auto_add_policy(self): + """ + verify that SSHClient's AutoAddPolicy works. + """ + host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = paramiko.RSAKey(data=str(host_key)) + + self.tc = paramiko.SSHClient() + self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + self.assertEquals(0, len(self.tc.get_host_keys())) + self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion') + + self.event.wait(1.0) + self.assert_(self.event.isSet()) + self.assert_(self.ts.is_active()) + self.assertEquals('slowdive', self.ts.get_username()) + self.assertEquals(True, self.ts.is_authenticated()) + self.assertEquals(1, len(self.tc.get_host_keys())) + self.assertEquals(public_host_key, self.tc.get_host_keys()[self.addr]['ssh-rsa']) + + def test_5_cleanup(self): + """ + verify that when an SSHClient is collected, its transport (and the + transport's packetizer) is closed. + """ + host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') + public_host_key = paramiko.RSAKey(data=str(host_key)) + + self.tc = paramiko.SSHClient() + self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + self.assertEquals(0, len(self.tc.get_host_keys())) + self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion') + + self.event.wait(1.0) + self.assert_(self.event.isSet()) + self.assert_(self.ts.is_active()) + + p = weakref.ref(self.tc._transport.packetizer) + self.assert_(p() is not None) + del self.tc + # hrm, sometimes p isn't cleared right away. why is that? + st = time.time() + while (time.time() - st < 5.0) and (p() is not None): + time.sleep(0.1) + self.assert_(p() is None) + -- cgit v1.2.3