summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/loop.py8
-rw-r--r--tests/test_buffered_pipe.py8
-rw-r--r--tests/test_client.py8
-rwxr-xr-xtests/test_sftp.py45
-rw-r--r--tests/test_transport.py7
-rw-r--r--tests/test_util.py221
-rw-r--r--tests/util.py10
7 files changed, 252 insertions, 55 deletions
diff --git a/tests/loop.py b/tests/loop.py
index bdc2f2d..ffa8e3c 100644
--- a/tests/loop.py
+++ b/tests/loop.py
@@ -62,12 +62,8 @@ class LoopSocket (object):
self.__cv.wait(self.__timeout)
if len(self.__in_buffer) == 0:
raise socket.timeout
- if n < self.__in_buffer:
- out = self.__in_buffer[:n]
- self.__in_buffer = self.__in_buffer[n:]
- else:
- out = self.__in_buffer
- self.__in_buffer = ''
+ out = self.__in_buffer[:n]
+ self.__in_buffer = self.__in_buffer[n:]
return out
finally:
self.__lock.release()
diff --git a/tests/test_buffered_pipe.py b/tests/test_buffered_pipe.py
index f285d05..b9d91f6 100644
--- a/tests/test_buffered_pipe.py
+++ b/tests/test_buffered_pipe.py
@@ -26,6 +26,8 @@ import unittest
from paramiko.buffered_pipe import BufferedPipe, PipeTimeout
from paramiko import pipe
+from util import ParamikoTest
+
def delay_thread(pipe):
pipe.feed('a')
@@ -39,11 +41,7 @@ def close_thread(pipe):
pipe.close()
-class BufferedPipeTest (unittest.TestCase):
-
- assertTrue = unittest.TestCase.failUnless # for Python 2.3 and below
- assertFalse = unittest.TestCase.failIf # for Python 2.3 and below
-
+class BufferedPipeTest(ParamikoTest):
def test_1_buffered_pipe(self):
p = BufferedPipe()
self.assert_(not p.read_ready())
diff --git a/tests/test_client.py b/tests/test_client.py
index 2f9b9a7..08ef1f9 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -68,11 +68,9 @@ class SSHClientTest (unittest.TestCase):
thread.start()
def tearDown(self):
- if hasattr(self, 'tc'):
- self.tc.close()
- self.ts.close()
- self.socks.close()
- self.sockl.close()
+ for attr in "tc ts socks sockl".split():
+ if hasattr(self, attr):
+ getattr(self, attr).close()
def _run(self):
self.socks, addr = self.sockl.accept()
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index 2eadabc..b1697ea 100755
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -23,15 +23,15 @@ a real actual sftp server is contacted, and a new folder is created there to
do test file operations in (so no existing files will be harmed).
"""
+from __future__ import with_statement
+
from binascii import hexlify
-import logging
import os
-import random
-import struct
+import warnings
import sys
import threading
-import time
import unittest
+import StringIO
import paramiko
from stub_sftp import StubServer, StubSFTPServer
@@ -188,6 +188,17 @@ class SFTPTest (unittest.TestCase):
finally:
sftp.remove(FOLDER + '/duck.txt')
+ def test_3_sftp_file_can_be_used_as_context_manager(self):
+ """
+ verify that an opened file can be used as a context manager
+ """
+ try:
+ with sftp.open(FOLDER + '/duck.txt', 'w') as f:
+ f.write(ARTICLE)
+ self.assertEqual(sftp.stat(FOLDER + '/duck.txt').st_size, 1483)
+ finally:
+ sftp.remove(FOLDER + '/duck.txt')
+
def test_4_append(self):
"""
verify that a file can be opened for append, and tell() still works.
@@ -214,7 +225,7 @@ class SFTPTest (unittest.TestCase):
"""
f = sftp.open(FOLDER + '/first.txt', 'w')
try:
- f.write('content!\n');
+ f.write('content!\n')
f.close()
sftp.rename(FOLDER + '/first.txt', FOLDER + '/second.txt')
try:
@@ -425,7 +436,7 @@ class SFTPTest (unittest.TestCase):
self.assertEqual(sftp.readlink(FOLDER + '/link.txt'), 'original.txt')
f = sftp.open(FOLDER + '/link.txt', 'r')
- self.assertEqual(f.readlines(), [ 'original\n' ])
+ self.assertEqual(f.readlines(), ['original\n'])
f.close()
cwd = sftp.normalize('.')
@@ -553,7 +564,6 @@ class SFTPTest (unittest.TestCase):
"""
verify that get/put work.
"""
- import os, warnings
warnings.filterwarnings('ignore', 'tempnam.*')
localname = os.tempnam()
@@ -618,7 +628,7 @@ class SFTPTest (unittest.TestCase):
try:
f = sftp.open(FOLDER + '/unusual.txt', 'wx')
self.fail('expected exception')
- except IOError, x:
+ except IOError:
pass
finally:
sftp.unlink(FOLDER + '/unusual.txt')
@@ -658,12 +668,12 @@ class SFTPTest (unittest.TestCase):
f.close()
try:
f = sftp.open(FOLDER + '/zero', 'r')
- data = f.readv([(0, 12)])
+ f.readv([(0, 12)])
f.close()
f = sftp.open(FOLDER + '/zero', 'r')
f.prefetch()
- data = f.read(100)
+ f.read(100)
f.close()
finally:
sftp.unlink(FOLDER + '/zero')
@@ -672,7 +682,6 @@ class SFTPTest (unittest.TestCase):
"""
verify that get/put work without confirmation.
"""
- import os, warnings
warnings.filterwarnings('ignore', 'tempnam.*')
localname = os.tempnam()
@@ -684,7 +693,7 @@ class SFTPTest (unittest.TestCase):
def progress_callback(x, y):
saved_progress.append((x, y))
res = sftp.put(localname, FOLDER + '/bunny.txt', progress_callback, False)
-
+
self.assertEquals(SFTPAttributes().attr, res.attr)
f = sftp.open(FOLDER + '/bunny.txt', 'r')
@@ -717,3 +726,15 @@ class SFTPTest (unittest.TestCase):
finally:
sftp.remove(FOLDER + '/append.txt')
+ def test_putfo_empty_file(self):
+ """
+ Send an empty file and confirm it is sent.
+ """
+ target = FOLDER + '/empty file.txt'
+ stream = StringIO.StringIO()
+ try:
+ attrs = sftp.putfo(stream, target)
+ # the returned attributes should not be null
+ self.assertNotEqual(attrs, None)
+ finally:
+ sftp.remove(target)
diff --git a/tests/test_transport.py b/tests/test_transport.py
index cea4a1d..1c57d18 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -36,6 +36,7 @@ from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
from paramiko.common import MSG_KEXINIT, MSG_CHANNEL_WINDOW_ADJUST
from paramiko.message import Message
from loop import LoopSocket
+from util import ParamikoTest
LONG_BANNER = """\
@@ -105,11 +106,7 @@ class NullServer (ServerInterface):
return OPEN_SUCCEEDED
-class TransportTest (unittest.TestCase):
-
- assertTrue = unittest.TestCase.failUnless # for Python 2.3 and below
- assertFalse = unittest.TestCase.failIf # for Python 2.3 and below
-
+class TransportTest(ParamikoTest):
def setUp(self):
self.socks = LoopSocket()
self.sockc = LoopSocket()
diff --git a/tests/test_util.py b/tests/test_util.py
index 256c3d7..efda9b2 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -22,11 +22,14 @@ Some unit tests for utility functions.
from binascii import hexlify
import cStringIO
+import errno
import os
import unittest
from Crypto.Hash import SHA
import paramiko.util
+from paramiko.util import lookup_ssh_host_config as host_config
+from util import ParamikoTest
test_config_file = """\
Host *
@@ -57,17 +60,7 @@ BGQ3GQ/Fc7SX6gkpXkwcZryoi4kNFhHu5LvHcZPdxXV1D+uTMfGS1eyd2Yz/DoNWXNAl8TI0cAsW\
from paramiko import *
-class UtilTest (unittest.TestCase):
-
- assertTrue = unittest.TestCase.failUnless # for Python 2.3 and below
- assertFalse = unittest.TestCase.failIf # for Python 2.3 and below
-
- def setUp(self):
- pass
-
- def tearDown(self):
- pass
-
+class UtilTest(ParamikoTest):
def test_1_import(self):
"""
verify that all the classes can be imported from paramiko.
@@ -111,21 +104,37 @@ class UtilTest (unittest.TestCase):
f = cStringIO.StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
self.assertEquals(config._config,
- [ {'identityfile': '~/.ssh/id_rsa', 'host': '*', 'user': 'robey',
- 'crazy': 'something dumb '},
- {'host': '*.example.com', 'user': 'bjork', 'port': '3333'},
- {'host': 'spoo.example.com', 'crazy': 'something else'}])
+ [{'host': ['*'], 'config': {}}, {'host': ['*'], 'config': {'identityfile': ['~/.ssh/id_rsa'], 'user': 'robey'}},
+ {'host': ['*.example.com'], 'config': {'user': 'bjork', 'port': '3333'}},
+ {'host': ['*'], 'config': {'crazy': 'something dumb '}},
+ {'host': ['spoo.example.com'], 'config': {'crazy': 'something else'}}])
def test_3_host_config(self):
global test_config_file
f = cStringIO.StringIO(test_config_file)
config = paramiko.util.parse_ssh_config(f)
- c = paramiko.util.lookup_ssh_host_config('irc.danger.com', config)
- self.assertEquals(c, {'identityfile': '~/.ssh/id_rsa', 'user': 'robey', 'crazy': 'something dumb '})
- c = paramiko.util.lookup_ssh_host_config('irc.example.com', config)
- self.assertEquals(c, {'identityfile': '~/.ssh/id_rsa', 'user': 'bjork', 'crazy': 'something dumb ', 'port': '3333'})
- c = paramiko.util.lookup_ssh_host_config('spoo.example.com', config)
- self.assertEquals(c, {'identityfile': '~/.ssh/id_rsa', 'user': 'bjork', 'crazy': 'something else', 'port': '3333'})
+
+ for host, values in {
+ 'irc.danger.com': {'crazy': 'something dumb ',
+ 'hostname': 'irc.danger.com',
+ 'user': 'robey'},
+ 'irc.example.com': {'crazy': 'something dumb ',
+ 'hostname': 'irc.example.com',
+ 'user': 'robey',
+ 'port': '3333'},
+ 'spoo.example.com': {'crazy': 'something dumb ',
+ 'hostname': 'spoo.example.com',
+ 'user': 'robey',
+ 'port': '3333'}
+ }.items():
+ values = dict(values,
+ hostname=host,
+ identityfile=[os.path.expanduser("~/.ssh/id_rsa")]
+ )
+ self.assertEquals(
+ paramiko.util.lookup_ssh_host_config(host, config),
+ values
+ )
def test_4_generate_key_bytes(self):
x = paramiko.util.generate_key_bytes(SHA, 'ABCDEFGH', 'This is my secret passphrase.', 64)
@@ -151,4 +160,172 @@ class UtilTest (unittest.TestCase):
# just verify that we can pull out 32 bytes and not get an exception.
x = rng.read(32)
self.assertEquals(len(x), 32)
-
+
+ def test_7_host_config_expose_issue_33(self):
+ test_config_file = """
+Host www13.*
+ Port 22
+
+Host *.example.com
+ Port 2222
+
+Host *
+ Port 3333
+ """
+ f = cStringIO.StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ host = 'www13.example.com'
+ self.assertEquals(
+ paramiko.util.lookup_ssh_host_config(host, config),
+ {'hostname': host, 'port': '22'}
+ )
+
+ def test_8_eintr_retry(self):
+ self.assertEquals('foo', paramiko.util.retry_on_signal(lambda: 'foo'))
+
+ # Variables that are set by raises_intr
+ intr_errors_remaining = [3]
+ call_count = [0]
+ def raises_intr():
+ call_count[0] += 1
+ if intr_errors_remaining[0] > 0:
+ intr_errors_remaining[0] -= 1
+ raise IOError(errno.EINTR, 'file', 'interrupted system call')
+ self.assertTrue(paramiko.util.retry_on_signal(raises_intr) is None)
+ self.assertEquals(0, intr_errors_remaining[0])
+ self.assertEquals(4, call_count[0])
+
+ def raises_ioerror_not_eintr():
+ raise IOError(errno.ENOENT, 'file', 'file not found')
+ self.assertRaises(IOError,
+ lambda: paramiko.util.retry_on_signal(raises_ioerror_not_eintr))
+
+ def raises_other_exception():
+ raise AssertionError('foo')
+ self.assertRaises(AssertionError,
+ lambda: paramiko.util.retry_on_signal(raises_other_exception))
+
+ def test_9_proxycommand_config_equals_parsing(self):
+ """
+ ProxyCommand should not split on equals signs within the value.
+ """
+ conf = """
+Host space-delimited
+ ProxyCommand foo bar=biz baz
+
+Host equals-delimited
+ ProxyCommand=foo bar=biz baz
+"""
+ f = cStringIO.StringIO(conf)
+ config = paramiko.util.parse_ssh_config(f)
+ for host in ('space-delimited', 'equals-delimited'):
+ self.assertEquals(
+ host_config(host, config)['proxycommand'],
+ 'foo bar=biz baz'
+ )
+
+ def test_10_proxycommand_interpolation(self):
+ """
+ ProxyCommand should perform interpolation on the value
+ """
+ config = paramiko.util.parse_ssh_config(cStringIO.StringIO("""
+Host specific
+ Port 37
+ ProxyCommand host %h port %p lol
+
+Host portonly
+ Port 155
+
+Host *
+ Port 25
+ ProxyCommand host %h port %p
+"""))
+ for host, val in (
+ ('foo.com', "host foo.com port 25"),
+ ('specific', "host specific port 37 lol"),
+ ('portonly', "host portonly port 155"),
+ ):
+ self.assertEquals(
+ host_config(host, config)['proxycommand'],
+ val
+ )
+
+ def test_11_host_config_test_negation(self):
+ test_config_file = """
+Host www13.* !*.example.com
+ Port 22
+
+Host *.example.com !www13.*
+ Port 2222
+
+Host www13.*
+ Port 8080
+
+Host *
+ Port 3333
+ """
+ f = cStringIO.StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ host = 'www13.example.com'
+ self.assertEquals(
+ paramiko.util.lookup_ssh_host_config(host, config),
+ {'hostname': host, 'port': '8080'}
+ )
+
+ def test_12_host_config_test_proxycommand(self):
+ test_config_file = """
+Host proxy-with-equal-divisor-and-space
+ProxyCommand = foo=bar
+
+Host proxy-with-equal-divisor-and-no-space
+ProxyCommand=foo=bar
+
+Host proxy-without-equal-divisor
+ProxyCommand foo=bar:%h-%p
+ """
+ for host, values in {
+ 'proxy-with-equal-divisor-and-space' :{'hostname': 'proxy-with-equal-divisor-and-space',
+ 'proxycommand': 'foo=bar'},
+ 'proxy-with-equal-divisor-and-no-space':{'hostname': 'proxy-with-equal-divisor-and-no-space',
+ 'proxycommand': 'foo=bar'},
+ 'proxy-without-equal-divisor' :{'hostname': 'proxy-without-equal-divisor',
+ 'proxycommand':
+ 'foo=bar:proxy-without-equal-divisor-22'}
+ }.items():
+
+ f = cStringIO.StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ self.assertEquals(
+ paramiko.util.lookup_ssh_host_config(host, config),
+ values
+ )
+
+ def test_11_host_config_test_identityfile(self):
+ test_config_file = """
+
+IdentityFile id_dsa0
+
+Host *
+IdentityFile id_dsa1
+
+Host dsa2
+IdentityFile id_dsa2
+
+Host dsa2*
+IdentityFile id_dsa22
+ """
+ for host, values in {
+ 'foo' :{'hostname': 'foo',
+ 'identityfile': ['id_dsa0', 'id_dsa1']},
+ 'dsa2' :{'hostname': 'dsa2',
+ 'identityfile': ['id_dsa0', 'id_dsa1', 'id_dsa2', 'id_dsa22']},
+ 'dsa22' :{'hostname': 'dsa22',
+ 'identityfile': ['id_dsa0', 'id_dsa1', 'id_dsa22']}
+ }.items():
+
+ f = cStringIO.StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ self.assertEquals(
+ paramiko.util.lookup_ssh_host_config(host, config),
+ values
+ )
diff --git a/tests/util.py b/tests/util.py
new file mode 100644
index 0000000..2e0be08
--- /dev/null
+++ b/tests/util.py
@@ -0,0 +1,10 @@
+import unittest
+
+
+class ParamikoTest(unittest.TestCase):
+ # for Python 2.3 and below
+ if not hasattr(unittest.TestCase, 'assertTrue'):
+ assertTrue = unittest.TestCase.failUnless
+ if not hasattr(unittest.TestCase, 'assertFalse'):
+ assertFalse = unittest.TestCase.failIf
+