aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/test_auth.py4
-rw-r--r--tests/test_client.py8
-rwxr-xr-xtests/test_file.py6
-rw-r--r--tests/test_gssapi.py4
-rw-r--r--tests/test_kex_gss.py6
-rwxr-xr-xtests/test_sftp.py7
-rw-r--r--tests/test_ssh_gss.py6
-rw-r--r--tests/test_transport.py28
-rw-r--r--tests/test_util.py40
9 files changed, 71 insertions, 38 deletions
diff --git a/tests/test_auth.py b/tests/test_auth.py
index 1d972d5..ec78e3c 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -118,12 +118,12 @@ class AuthTest (unittest.TestCase):
self.ts.add_server_key(host_key)
self.event = threading.Event()
self.server = NullServer()
- self.assertTrue(not self.event.isSet())
+ self.assertTrue(not self.event.is_set())
self.ts.start_server(self.event, self.server)
def verify_finished(self):
self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
+ self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
def test_1_bad_auth_type(self):
diff --git a/tests/test_client.py b/tests/test_client.py
index 28d1cb4..3d2e75c 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -128,7 +128,7 @@ class SSHClientTest (unittest.TestCase):
# Authentication successful?
self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
+ self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
@@ -226,7 +226,7 @@ class SSHClientTest (unittest.TestCase):
self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
+ self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.ts.get_username())
self.assertEqual(True, self.ts.is_authenticated())
@@ -281,7 +281,7 @@ class SSHClientTest (unittest.TestCase):
self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
+ self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
p = weakref.ref(self.tc._transport.packetizer)
@@ -316,7 +316,7 @@ class SSHClientTest (unittest.TestCase):
self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')
self.event.wait(1.0)
- self.assertTrue(self.event.isSet())
+ self.assertTrue(self.event.is_set())
self.assertTrue(self.ts.is_active())
self.assertTrue(self.tc._transport is not None)
diff --git a/tests/test_file.py b/tests/test_file.py
index 22a34ac..a6ff69e 100755
--- a/tests/test_file.py
+++ b/tests/test_file.py
@@ -70,13 +70,17 @@ class BufferedFileTest (unittest.TestCase):
def test_2_readline(self):
f = LoopbackFile('r+U')
- f.write(b'First line.\nSecond line.\r\nThird line.\nFinal line non-terminated.')
+ f.write(b'First line.\nSecond line.\r\nThird line.\n' +
+ b'Fourth line.\nFinal line non-terminated.')
+
self.assertEqual(f.readline(), 'First line.\n')
# universal newline mode should convert this linefeed:
self.assertEqual(f.readline(), 'Second line.\n')
# truncated line:
self.assertEqual(f.readline(7), 'Third l')
self.assertEqual(f.readline(), 'ine.\n')
+ # newline should be detected and only the fourth line returned
+ self.assertEqual(f.readline(39), 'Fourth line.\n')
self.assertEqual(f.readline(), 'Final line non-terminated.')
self.assertEqual(f.readline(), '')
f.close()
diff --git a/tests/test_gssapi.py b/tests/test_gssapi.py
index a328dd6..96c268d 100644
--- a/tests/test_gssapi.py
+++ b/tests/test_gssapi.py
@@ -27,15 +27,13 @@ import socket
class GSSAPITest(unittest.TestCase):
-
+ @staticmethod
def init(hostname=None, srv_mode=False):
global krb5_mech, targ_name, server_mode
krb5_mech = "1.2.840.113554.1.2.2"
targ_name = hostname
server_mode = srv_mode
- init = staticmethod(init)
-
def test_1_pyasn1(self):
"""
Test the used methods of pyasn1.
diff --git a/tests/test_kex_gss.py b/tests/test_kex_gss.py
index 8769d09..3bf788d 100644
--- a/tests/test_kex_gss.py
+++ b/tests/test_kex_gss.py
@@ -58,14 +58,12 @@ class NullServer (paramiko.ServerInterface):
class GSSKexTest(unittest.TestCase):
-
+ @staticmethod
def init(username, hostname):
global krb5_principal, targ_name
krb5_principal = username
targ_name = hostname
- init = staticmethod(init)
-
def setUp(self):
self.username = krb5_principal
self.hostname = socket.getfqdn(targ_name)
@@ -111,7 +109,7 @@ class GSSKexTest(unittest.TestCase):
gss_auth=True, gss_kex=True)
self.event.wait(1.0)
- self.assert_(self.event.isSet())
+ self.assert_(self.event.is_set())
self.assert_(self.ts.is_active())
self.assertEquals(self.username, self.ts.get_username())
self.assertEquals(True, self.ts.is_authenticated())
diff --git a/tests/test_sftp.py b/tests/test_sftp.py
index 72c7ba0..cb8f7f8 100755
--- a/tests/test_sftp.py
+++ b/tests/test_sftp.py
@@ -97,7 +97,7 @@ def get_sftp():
class SFTPTest (unittest.TestCase):
-
+ @staticmethod
def init(hostname, username, keyfile, passwd):
global sftp, tc
@@ -129,8 +129,8 @@ class SFTPTest (unittest.TestCase):
sys.stderr.write('\n')
sys.exit(1)
sftp = paramiko.SFTP.from_transport(t)
- init = staticmethod(init)
+ @staticmethod
def init_loopback():
global sftp, tc
@@ -150,12 +150,11 @@ class SFTPTest (unittest.TestCase):
event.wait(1.0)
sftp = paramiko.SFTP.from_transport(tc)
- init_loopback = staticmethod(init_loopback)
+ @staticmethod
def set_big_file_test(onoff):
global g_big_file_test
g_big_file_test = onoff
- set_big_file_test = staticmethod(set_big_file_test)
def setUp(self):
global FOLDER
diff --git a/tests/test_ssh_gss.py b/tests/test_ssh_gss.py
index 595081b..e20d348 100644
--- a/tests/test_ssh_gss.py
+++ b/tests/test_ssh_gss.py
@@ -57,14 +57,12 @@ class NullServer (paramiko.ServerInterface):
class GSSAuthTest(unittest.TestCase):
-
+ @staticmethod
def init(username, hostname):
global krb5_principal, targ_name
krb5_principal = username
targ_name = hostname
- init = staticmethod(init)
-
def setUp(self):
self.username = krb5_principal
self.hostname = socket.getfqdn(targ_name)
@@ -104,7 +102,7 @@ class GSSAuthTest(unittest.TestCase):
gss_auth=True)
self.event.wait(1.0)
- self.assert_(self.event.isSet())
+ self.assert_(self.event.is_set())
self.assert_(self.ts.is_active())
self.assertEquals(self.username, self.ts.get_username())
self.assertEquals(True, self.ts.is_authenticated())
diff --git a/tests/test_transport.py b/tests/test_transport.py
index 50b1d86..5cf9a86 100644
--- a/tests/test_transport.py
+++ b/tests/test_transport.py
@@ -35,7 +35,7 @@ from paramiko import Transport, SecurityOptions, ServerInterface, RSAKey, DSSKey
from paramiko import AUTH_FAILED, AUTH_SUCCESSFUL
from paramiko import OPEN_SUCCEEDED, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
from paramiko.common import MSG_KEXINIT, cMSG_CHANNEL_WINDOW_ADJUST, \
- MIN_PACKET_SIZE, MAX_WINDOW_SIZE, \
+ MIN_PACKET_SIZE, MIN_WINDOW_SIZE, MAX_WINDOW_SIZE, \
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_PACKET_SIZE
from paramiko.py3compat import bytes
from paramiko.message import Message
@@ -136,12 +136,12 @@ class TransportTest(unittest.TestCase):
event = threading.Event()
self.server = NullServer()
- self.assertTrue(not event.isSet())
+ self.assertTrue(not event.is_set())
self.ts.start_server(event, self.server)
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
- self.assertTrue(event.isSet())
+ self.assertTrue(event.is_set())
self.assertTrue(self.ts.is_active())
def test_1_security_options(self):
@@ -180,7 +180,7 @@ class TransportTest(unittest.TestCase):
self.ts.add_server_key(host_key)
event = threading.Event()
server = NullServer()
- self.assertTrue(not event.isSet())
+ self.assertTrue(not event.is_set())
self.assertEqual(None, self.tc.get_username())
self.assertEqual(None, self.ts.get_username())
self.assertEqual(False, self.tc.is_authenticated())
@@ -189,7 +189,7 @@ class TransportTest(unittest.TestCase):
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
- self.assertTrue(event.isSet())
+ self.assertTrue(event.is_set())
self.assertTrue(self.ts.is_active())
self.assertEqual('slowdive', self.tc.get_username())
self.assertEqual('slowdive', self.ts.get_username())
@@ -205,13 +205,13 @@ class TransportTest(unittest.TestCase):
self.ts.add_server_key(host_key)
event = threading.Event()
server = NullServer()
- self.assertTrue(not event.isSet())
+ self.assertTrue(not event.is_set())
self.socks.send(LONG_BANNER)
self.ts.start_server(event, server)
self.tc.connect(hostkey=public_host_key,
username='slowdive', password='pygmalion')
event.wait(1.0)
- self.assertTrue(event.isSet())
+ self.assertTrue(event.is_set())
self.assertTrue(self.ts.is_active())
def test_4_special(self):
@@ -680,7 +680,7 @@ class TransportTest(unittest.TestCase):
def run(self):
try:
for i in range(1, 1+self.iterations):
- if self.done_event.isSet():
+ if self.done_event.is_set():
break
self.watchdog_event.set()
#print i, "SEND"
@@ -699,7 +699,7 @@ class TransportTest(unittest.TestCase):
def run(self):
try:
- while not self.done_event.isSet():
+ while not self.done_event.is_set():
if self.chan.recv_ready():
chan.recv(65536)
self.watchdog_event.set()
@@ -753,12 +753,12 @@ class TransportTest(unittest.TestCase):
# Act as a watchdog timer, checking
deadlocked = False
- while not deadlocked and not done_event.isSet():
+ while not deadlocked and not done_event.is_set():
for event in (st.watchdog_event, rt.watchdog_event):
event.wait(timeout)
- if done_event.isSet():
+ if done_event.is_set():
break
- if not event.isSet():
+ if not event.is_set():
deadlocked = True
break
event.clear()
@@ -779,7 +779,7 @@ class TransportTest(unittest.TestCase):
"""
verify that we conform to the rfc of packet and window sizes.
"""
- for val, correct in [(32767, MIN_PACKET_SIZE),
+ for val, correct in [(4095, MIN_PACKET_SIZE),
(None, DEFAULT_MAX_PACKET_SIZE),
(2**32, MAX_WINDOW_SIZE)]:
self.assertEqual(self.tc._sanitize_packet_size(val), correct)
@@ -788,7 +788,7 @@ class TransportTest(unittest.TestCase):
"""
verify that we conform to the rfc of packet and window sizes.
"""
- for val, correct in [(32767, MIN_PACKET_SIZE),
+ for val, correct in [(32767, MIN_WINDOW_SIZE),
(None, DEFAULT_WINDOW_SIZE),
(2**32, MAX_WINDOW_SIZE)]:
self.assertEqual(self.tc._sanitize_window_size(val), correct)
diff --git a/tests/test_util.py b/tests/test_util.py
index 35e1576..bfdc525 100644
--- a/tests/test_util.py
+++ b/tests/test_util.py
@@ -27,8 +27,8 @@ from hashlib import sha1
import unittest
import paramiko.util
-from paramiko.util import lookup_ssh_host_config as host_config
-from paramiko.py3compat import StringIO, byte_ord
+from paramiko.util import lookup_ssh_host_config as host_config, safe_string
+from paramiko.py3compat import StringIO, byte_ord, b
test_config_file = """\
Host *
@@ -349,6 +349,11 @@ IdentityFile something_%l_using_fqdn
config.parse(config_file)
self.assertEqual(config.lookup("abcqwerty")["hostname"], "127.0.0.1")
+ def test_14_get_hostnames(self):
+ f = StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ self.assertEqual(config.get_hostnames(), set(['*', '*.example.com', 'spoo.example.com']))
+
def test_quoted_host_names(self):
test_config_file = """\
Host "param pam" param "pam"
@@ -448,3 +453,34 @@ Host param3 parara
)
for host in incorrect_data:
self.assertRaises(Exception, conf._get_hosts, host)
+
+ def test_safe_string(self):
+ vanilla = b("vanilla")
+ has_bytes = b("has \7\3 bytes")
+ safe_vanilla = safe_string(vanilla)
+ safe_has_bytes = safe_string(has_bytes)
+ expected_bytes = b("has %07%03 bytes")
+ err = "{0!r} != {1!r}"
+ assert safe_vanilla == vanilla, err.format(safe_vanilla, vanilla)
+ assert safe_has_bytes == expected_bytes, \
+ err.format(safe_has_bytes, expected_bytes)
+
+ def test_proxycommand_none_issue_418(self):
+ test_config_file = """
+Host proxycommand-standard-none
+ ProxyCommand None
+
+Host proxycommand-with-equals-none
+ ProxyCommand=None
+ """
+ for host, values in {
+ 'proxycommand-standard-none': {'hostname': 'proxycommand-standard-none'},
+ 'proxycommand-with-equals-none': {'hostname': 'proxycommand-with-equals-none'}
+ }.items():
+
+ f = StringIO(test_config_file)
+ config = paramiko.util.parse_ssh_config(f)
+ self.assertEqual(
+ paramiko.util.lookup_ssh_host_config(host, config),
+ values
+ )