diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/loop.py | 8 | ||||
-rw-r--r-- | tests/test_buffered_pipe.py | 8 | ||||
-rw-r--r-- | tests/test_client.py | 8 | ||||
-rwxr-xr-x | tests/test_sftp.py | 45 | ||||
-rw-r--r-- | tests/test_transport.py | 7 | ||||
-rw-r--r-- | tests/test_util.py | 221 | ||||
-rw-r--r-- | tests/util.py | 10 |
7 files changed, 252 insertions, 55 deletions
diff --git a/tests/loop.py b/tests/loop.py index bdc2f2d..ffa8e3c 100644 --- a/tests/loop.py +++ b/tests/loop.py @@ -62,12 +62,8 @@ class LoopSocket (object): self.__cv.wait(self.__timeout) if len(self.__in_buffer) == 0: raise socket.timeout - if n < self.__in_buffer: - out = self.__in_buffer[:n] - self.__in_buffer = self.__in_buffer[n:] - else: - out = self.__in_buffer - self.__in_buffer = '' + out = self.__in_buffer[:n] + self.__in_buffer = self.__in_buffer[n:] return out finally: self.__lock.release() diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py index f285d05..b9d91f6 100644 --- a/tests/test_buffered_pipe.py +++ b/tests/test_buffered_pipe.py @@ -26,6 +26,8 @@ import unittest from paramiko.buffered_pipe import BufferedPipe, PipeTimeout from paramiko import pipe +from util import ParamikoTest + def delay_thread(pipe): pipe.feed('a') @@ -39,11 +41,7 @@ def close_thread(pipe): pipe.close() -class BufferedPipeTest (unittest.TestCase): - - assertTrue = unittest.TestCase.failUnless # for Python 2.3 and below - assertFalse = unittest.TestCase.failIf # for Python 2.3 and below - +class BufferedPipeTest(ParamikoTest): def test_1_buffered_pipe(self): p = BufferedPipe() self.assert_(not p.read_ready()) diff --git a/tests/test_client.py b/tests/test_client.py index 2f9b9a7..08ef1f9 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -68,11 +68,9 @@ class SSHClientTest (unittest.TestCase): thread.start() def tearDown(self): - if hasattr(self, 'tc'): - self.tc.close() - self.ts.close() - self.socks.close() - self.sockl.close() + for attr in "tc ts socks sockl".split(): + if hasattr(self, attr): + getattr(self, attr).close() def _run(self): self.socks, addr = self.sockl.accept() diff --git a/tests/test_sftp.py b/tests/test_sftp.py index 2eadabc..b1697ea 100755 --- a/tests/test_sftp.py +++ b/tests/test_sftp.py @@ -23,15 +23,15 @@ a real actual sftp server is contacted, and a new folder is created there to do test file operations in (so no existing files will be harmed). """ +from __future__ import with_statement + from binascii import hexlify -import logging import os -import random -import struct +import warnings import sys import threading -import time import unittest +import StringIO import paramiko from stub_sftp import StubServer, StubSFTPServer @@ -188,6 +188,17 @@ class SFTPTest (unittest.TestCase): finally: sftp.remove(FOLDER + '/duck.txt') + def test_3_sftp_file_can_be_used_as_context_manager(self): + """ + verify that an opened file can be used as a context manager + """ + try: + with sftp.open(FOLDER + '/duck.txt', 'w') as f: + f.write(ARTICLE) + self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483) + finally: + sftp.remove(FOLDER + '/duck.txt') + def test_4_append(self): """ verify that a file can be opened for append, and tell() still works. @@ -214,7 +225,7 @@ class SFTPTest (unittest.TestCase): """ f = sftp.open(FOLDER + '/first.txt', 'w') try: - f.write('content!\n'); + f.write('content!\n') f.close() sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt') try: @@ -425,7 +436,7 @@ class SFTPTest (unittest.TestCase): self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt') f = sftp.open(FOLDER + '/link.txt', 'r') - self.assertEqual(f.readlines(), [ 'original\n' ]) + self.assertEqual(f.readlines(), ['original\n']) f.close() cwd = sftp.normalize('.') @@ -553,7 +564,6 @@ class SFTPTest (unittest.TestCase): """ verify that get/put work. """ - import os, warnings warnings.filterwarnings('ignore', 'tempnam.*') localname = os.tempnam() @@ -618,7 +628,7 @@ class SFTPTest (unittest.TestCase): try: f = sftp.open(FOLDER + '/unusual.txt', 'wx') self.fail('expected exception') - except IOError, x: + except IOError: pass finally: sftp.unlink(FOLDER + '/unusual.txt') @@ -658,12 +668,12 @@ class SFTPTest (unittest.TestCase): f.close() try: f = sftp.open(FOLDER + '/zero', 'r') - data = f.readv([(0, 12)]) + f.readv([(0, 12)]) f.close() f = sftp.open(FOLDER + '/zero', 'r') f.prefetch() - data = f.read(100) + f.read(100) f.close() finally: sftp.unlink(FOLDER + '/zero') @@ -672,7 +682,6 @@ class SFTPTest (unittest.TestCase): """ verify that get/put work without confirmation. """ - import os, warnings warnings.filterwarnings('ignore', 'tempnam.*') localname = os.tempnam() @@ -684,7 +693,7 @@ class SFTPTest (unittest.TestCase): def progress_callback(x, y): saved_progress.append((x, y)) res = sftp.put(localname, FOLDER + '/bunny.txt', progress_callback, False) - + self.assertEquals(SFTPAttributes().attr, res.attr) f = sftp.open(FOLDER + '/bunny.txt', 'r') @@ -717,3 +726,15 @@ class SFTPTest (unittest.TestCase): finally: sftp.remove(FOLDER + '/append.txt') + def test_putfo_empty_file(self): + """ + Send an empty file and confirm it is sent. + """ + target = FOLDER + '/empty file.txt' + stream = StringIO.StringIO() + try: + attrs = sftp.putfo(stream, target) + # the returned attributes should not be null + self.assertNotEqual(attrs, None) + finally: + sftp.remove(target) diff --git a/tests/test_transport.py b/tests/test_transport.py index cea4a1d..1c57d18 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -36,6 +36,7 @@ from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED from paramiko.common import MSG_KEXINIT, MSG_CHANNEL_WINDOW_ADJUST from paramiko.message import Message from loop import LoopSocket +from util import ParamikoTest LONG_BANNER = """\ @@ -105,11 +106,7 @@ class NullServer (ServerInterface): return OPEN_SUCCEEDED -class TransportTest (unittest.TestCase): - - assertTrue = unittest.TestCase.failUnless # for Python 2.3 and below - assertFalse = unittest.TestCase.failIf # for Python 2.3 and below - +class TransportTest(ParamikoTest): def setUp(self): self.socks = LoopSocket() self.sockc = LoopSocket() diff --git a/tests/test_util.py b/tests/test_util.py index 256c3d7..efda9b2 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -22,11 +22,14 @@ Some unit tests for utility functions. from binascii import hexlify import cStringIO +import errno import os import unittest from Crypto.Hash import SHA import paramiko.util +from paramiko.util import lookup_ssh_host_config as host_config +from util import ParamikoTest test_config_file = """\ Host * @@ -57,17 +60,7 @@ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\ from paramiko import * -class UtilTest (unittest.TestCase): - - assertTrue = unittest.TestCase.failUnless # for Python 2.3 and below - assertFalse = unittest.TestCase.failIf # for Python 2.3 and below - - def setUp(self): - pass - - def tearDown(self): - pass - +class UtilTest(ParamikoTest): def test_1_import(self): """ verify that all the classes can be imported from paramiko. @@ -111,21 +104,37 @@ class UtilTest (unittest.TestCase): f = cStringIO.StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) self.assertEquals(config._config, - [ {'identityfile': '~/.ssh/id_rsa', 'host': '*', 'user': 'robey', - 'crazy': 'something dumb '}, - {'host': '*.example.com', 'user': 'bjork', 'port': '3333'}, - {'host': 'spoo.example.com', 'crazy': 'something else'}]) + [{'host': ['*'], 'config': {}}, {'host': ['*'], 'config': {'identityfile': ['~/.ssh/id_rsa'], 'user': 'robey'}}, + {'host': ['*.example.com'], 'config': {'user': 'bjork', 'port': '3333'}}, + {'host': ['*'], 'config': {'crazy': 'something dumb '}}, + {'host': ['spoo.example.com'], 'config': {'crazy': 'something else'}}]) def test_3_host_config(self): global test_config_file f = cStringIO.StringIO(test_config_file) config = paramiko.util.parse_ssh_config(f) - c = paramiko.util.lookup_ssh_host_config('irc.danger.com', config) - self.assertEquals(c, {'identityfile': '~/.ssh/id_rsa', 'user': 'robey', 'crazy': 'something dumb '}) - c = paramiko.util.lookup_ssh_host_config('irc.example.com', config) - self.assertEquals(c, {'identityfile': '~/.ssh/id_rsa', 'user': 'bjork', 'crazy': 'something dumb ', 'port': '3333'}) - c = paramiko.util.lookup_ssh_host_config('spoo.example.com', config) - self.assertEquals(c, {'identityfile': '~/.ssh/id_rsa', 'user': 'bjork', 'crazy': 'something else', 'port': '3333'}) + + for host, values in { + 'irc.danger.com': {'crazy': 'something dumb ', + 'hostname': 'irc.danger.com', + 'user': 'robey'}, + 'irc.example.com': {'crazy': 'something dumb ', + 'hostname': 'irc.example.com', + 'user': 'robey', + 'port': '3333'}, + 'spoo.example.com': {'crazy': 'something dumb ', + 'hostname': 'spoo.example.com', + 'user': 'robey', + 'port': '3333'} + }.items(): + values = dict(values, + hostname=host, + identityfile=[os.path.expanduser("~/.ssh/id_rsa")] + ) + self.assertEquals( + paramiko.util.lookup_ssh_host_config(host, config), + values + ) def test_4_generate_key_bytes(self): x = paramiko.util.generate_key_bytes(SHA, 'ABCDEFGH', 'This is my secret passphrase.', 64) @@ -151,4 +160,172 @@ class UtilTest (unittest.TestCase): # just verify that we can pull out 32 bytes and not get an exception. x = rng.read(32) self.assertEquals(len(x), 32) - + + def test_7_host_config_expose_issue_33(self): + test_config_file = """ +Host www13.* + Port 22 + +Host *.example.com + Port 2222 + +Host * + Port 3333 + """ + f = cStringIO.StringIO(test_config_file) + config = paramiko.util.parse_ssh_config(f) + host = 'www13.example.com' + self.assertEquals( + paramiko.util.lookup_ssh_host_config(host, config), + {'hostname': host, 'port': '22'} + ) + + def test_8_eintr_retry(self): + self.assertEquals('foo', paramiko.util.retry_on_signal(lambda: 'foo')) + + # Variables that are set by raises_intr + intr_errors_remaining = [3] + call_count = [0] + def raises_intr(): + call_count[0] += 1 + if intr_errors_remaining[0] > 0: + intr_errors_remaining[0] -= 1 + raise IOError(errno.EINTR, 'file', 'interrupted system call') + self.assertTrue(paramiko.util.retry_on_signal(raises_intr) is None) + self.assertEquals(0, intr_errors_remaining[0]) + self.assertEquals(4, call_count[0]) + + def raises_ioerror_not_eintr(): + raise IOError(errno.ENOENT, 'file', 'file not found') + self.assertRaises(IOError, + lambda: paramiko.util.retry_on_signal(raises_ioerror_not_eintr)) + + def raises_other_exception(): + raise AssertionError('foo') + self.assertRaises(AssertionError, + lambda: paramiko.util.retry_on_signal(raises_other_exception)) + + def test_9_proxycommand_config_equals_parsing(self): + """ + ProxyCommand should not split on equals signs within the value. + """ + conf = """ +Host space-delimited + ProxyCommand foo bar=biz baz + +Host equals-delimited + ProxyCommand=foo bar=biz baz +""" + f = cStringIO.StringIO(conf) + config = paramiko.util.parse_ssh_config(f) + for host in ('space-delimited', 'equals-delimited'): + self.assertEquals( + host_config(host, config)['proxycommand'], + 'foo bar=biz baz' + ) + + def test_10_proxycommand_interpolation(self): + """ + ProxyCommand should perform interpolation on the value + """ + config = paramiko.util.parse_ssh_config(cStringIO.StringIO(""" +Host specific + Port 37 + ProxyCommand host %h port %p lol + +Host portonly + Port 155 + +Host * + Port 25 + ProxyCommand host %h port %p +""")) + for host, val in ( + ('foo.com', "host foo.com port 25"), + ('specific', "host specific port 37 lol"), + ('portonly', "host portonly port 155"), + ): + self.assertEquals( + host_config(host, config)['proxycommand'], + val + ) + + def test_11_host_config_test_negation(self): + test_config_file = """ +Host www13.* !*.example.com + Port 22 + +Host *.example.com !www13.* + Port 2222 + +Host www13.* + Port 8080 + +Host * + Port 3333 + """ + f = cStringIO.StringIO(test_config_file) + config = paramiko.util.parse_ssh_config(f) + host = 'www13.example.com' + self.assertEquals( + paramiko.util.lookup_ssh_host_config(host, config), + {'hostname': host, 'port': '8080'} + ) + + def test_12_host_config_test_proxycommand(self): + test_config_file = """ +Host proxy-with-equal-divisor-and-space +ProxyCommand = foo=bar + +Host proxy-with-equal-divisor-and-no-space +ProxyCommand=foo=bar + +Host proxy-without-equal-divisor +ProxyCommand foo=bar:%h-%p + """ + for host, values in { + 'proxy-with-equal-divisor-and-space' :{'hostname': 'proxy-with-equal-divisor-and-space', + 'proxycommand': 'foo=bar'}, + 'proxy-with-equal-divisor-and-no-space':{'hostname': 'proxy-with-equal-divisor-and-no-space', + 'proxycommand': 'foo=bar'}, + 'proxy-without-equal-divisor' :{'hostname': 'proxy-without-equal-divisor', + 'proxycommand': + 'foo=bar:proxy-without-equal-divisor-22'} + }.items(): + + f = cStringIO.StringIO(test_config_file) + config = paramiko.util.parse_ssh_config(f) + self.assertEquals( + paramiko.util.lookup_ssh_host_config(host, config), + values + ) + + def test_11_host_config_test_identityfile(self): + test_config_file = """ + +IdentityFile id_dsa0 + +Host * +IdentityFile id_dsa1 + +Host dsa2 +IdentityFile id_dsa2 + +Host dsa2* +IdentityFile id_dsa22 + """ + for host, values in { + 'foo' :{'hostname': 'foo', + 'identityfile': ['id_dsa0', 'id_dsa1']}, + 'dsa2' :{'hostname': 'dsa2', + 'identityfile': ['id_dsa0', 'id_dsa1', 'id_dsa2', 'id_dsa22']}, + 'dsa22' :{'hostname': 'dsa22', + 'identityfile': ['id_dsa0', 'id_dsa1', 'id_dsa22']} + }.items(): + + f = cStringIO.StringIO(test_config_file) + config = paramiko.util.parse_ssh_config(f) + self.assertEquals( + paramiko.util.lookup_ssh_host_config(host, config), + values + ) diff --git a/tests/util.py b/tests/util.py new file mode 100644 index 0000000..2e0be08 --- /dev/null +++ b/tests/util.py @@ -0,0 +1,10 @@ +import unittest + + +class ParamikoTest(unittest.TestCase): + # for Python 2.3 and below + if not hasattr(unittest.TestCase, 'assertTrue'): + assertTrue = unittest.TestCase.failUnless + if not hasattr(unittest.TestCase, 'assertFalse'): + assertFalse = unittest.TestCase.failIf + |