aboutsummaryrefslogtreecommitdiff
path: root/tests/test_kex.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_kex.py')
-rw-r--r--tests/test_kex.py101
1 files changed, 80 insertions, 21 deletions
diff --git a/tests/test_kex.py b/tests/test_kex.py
index 2680853..f304275 100644
--- a/tests/test_kex.py
+++ b/tests/test_kex.py
@@ -1,6 +1,4 @@
-#!/usr/bin/python
-
-# Copyright (C) 2003-2005 Robey Pointer <robey@lag.net>
+# Copyright (C) 2003-2007 Robey Pointer <robey@lag.net>
#
# This file is part of paramiko.
#
@@ -22,6 +20,7 @@
Some unit tests for the key exchange protocols.
"""
+from binascii import hexlify
import unittest
import paramiko.util
from paramiko.kex_group1 import KexGroup1
@@ -35,18 +34,21 @@ class FakeRandpool (object):
def get_bytes(self, n):
return chr(0xcc) * n
+
class FakeKey (object):
def __str__(self):
return 'fake-key'
def sign_ssh_data(self, randpool, H):
return 'fake-sig'
+
class FakeModulusPack (object):
P = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFFL
G = 2
def get_modulus(self, min, ask, max):
return self.G, self.P
+
class FakeTransport (object):
randpool = FakeRandpool()
local_version = 'SSH-2.0-paramiko_1.0'
@@ -56,7 +58,7 @@ class FakeTransport (object):
def _send_message(self, m):
self._message = m
- def _expect_packet(self, t):
+ def _expect_packet(self, *t):
self._expect = t
def _set_K_H(self, K, H):
self._K = K
@@ -89,8 +91,8 @@ class KexTest (unittest.TestCase):
kex = KexGroup1(transport)
kex.start_kex()
x = '1E000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4'
- self.assertEquals(x, paramiko.util.hexify(str(transport._message)))
- self.assertEquals(paramiko.kex_group1._MSG_KEXDH_REPLY, transport._expect)
+ self.assertEquals(x, hexlify(str(transport._message)).upper())
+ self.assertEquals((paramiko.kex_group1._MSG_KEXDH_REPLY,), transport._expect)
# fake "reply"
msg = Message()
@@ -101,7 +103,7 @@ class KexTest (unittest.TestCase):
kex.parse_next(paramiko.kex_group1._MSG_KEXDH_REPLY, msg)
H = '03079780F3D3AD0B3C6DB30C8D21685F367A86D2'
self.assertEquals(self.K, transport._K)
- self.assertEquals(H, paramiko.util.hexify(transport._H))
+ self.assertEquals(H, hexlify(transport._H).upper())
self.assertEquals(('fake-host-key', 'fake-sig'), transport._verify)
self.assert_(transport._activated)
@@ -110,7 +112,7 @@ class KexTest (unittest.TestCase):
transport.server_mode = True
kex = KexGroup1(transport)
kex.start_kex()
- self.assertEquals(paramiko.kex_group1._MSG_KEXDH_INIT, transport._expect)
+ self.assertEquals((paramiko.kex_group1._MSG_KEXDH_INIT,), transport._expect)
msg = Message()
msg.add_mpint(69)
@@ -119,8 +121,8 @@ class KexTest (unittest.TestCase):
H = 'B16BF34DD10945EDE84E9C1EF24A14BFDC843389'
x = '1F0000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967'
self.assertEquals(self.K, transport._K)
- self.assertEquals(H, paramiko.util.hexify(transport._H))
- self.assertEquals(x, paramiko.util.hexify(str(transport._message)))
+ self.assertEquals(H, hexlify(transport._H).upper())
+ self.assertEquals(x, hexlify(str(transport._message)).upper())
self.assert_(transport._activated)
def test_3_gex_client(self):
@@ -129,8 +131,8 @@ class KexTest (unittest.TestCase):
kex = KexGex(transport)
kex.start_kex()
x = '22000004000000080000002000'
- self.assertEquals(x, paramiko.util.hexify(str(transport._message)))
- self.assertEquals(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, transport._expect)
+ self.assertEquals(x, hexlify(str(transport._message)).upper())
+ self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect)
msg = Message()
msg.add_mpint(FakeModulusPack.P)
@@ -138,8 +140,8 @@ class KexTest (unittest.TestCase):
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
x = '20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4'
- self.assertEquals(x, paramiko.util.hexify(str(transport._message)))
- self.assertEquals(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, transport._expect)
+ self.assertEquals(x, hexlify(str(transport._message)).upper())
+ self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect)
msg = Message()
msg.add_string('fake-host-key')
@@ -149,16 +151,46 @@ class KexTest (unittest.TestCase):
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg)
H = 'A265563F2FA87F1A89BF007EE90D58BE2E4A4BD0'
self.assertEquals(self.K, transport._K)
- self.assertEquals(H, paramiko.util.hexify(transport._H))
+ self.assertEquals(H, hexlify(transport._H).upper())
self.assertEquals(('fake-host-key', 'fake-sig'), transport._verify)
self.assert_(transport._activated)
- def test_4_gex_server(self):
+ def test_4_gex_old_client(self):
+ transport = FakeTransport()
+ transport.server_mode = False
+ kex = KexGex(transport)
+ kex.start_kex(_test_old_style=True)
+ x = '1E00000800'
+ self.assertEquals(x, hexlify(str(transport._message)).upper())
+ self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_GROUP,), transport._expect)
+
+ msg = Message()
+ msg.add_mpint(FakeModulusPack.P)
+ msg.add_mpint(FakeModulusPack.G)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_GROUP, msg)
+ x = '20000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D4'
+ self.assertEquals(x, hexlify(str(transport._message)).upper())
+ self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_REPLY,), transport._expect)
+
+ msg = Message()
+ msg.add_string('fake-host-key')
+ msg.add_mpint(69)
+ msg.add_string('fake-sig')
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REPLY, msg)
+ H = '807F87B269EF7AC5EC7E75676808776A27D5864C'
+ self.assertEquals(self.K, transport._K)
+ self.assertEquals(H, hexlify(transport._H).upper())
+ self.assertEquals(('fake-host-key', 'fake-sig'), transport._verify)
+ self.assert_(transport._activated)
+
+ def test_5_gex_server(self):
transport = FakeTransport()
transport.server_mode = True
kex = KexGex(transport)
kex.start_kex()
- self.assertEquals(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, transport._expect)
+ self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect)
msg = Message()
msg.add_int(1024)
@@ -167,8 +199,8 @@ class KexTest (unittest.TestCase):
msg.rewind()
kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, msg)
x = '1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102'
- self.assertEquals(x, paramiko.util.hexify(str(transport._message)))
- self.assertEquals(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, transport._expect)
+ self.assertEquals(x, hexlify(str(transport._message)).upper())
+ self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect)
msg = Message()
msg.add_mpint(12345)
@@ -178,6 +210,33 @@ class KexTest (unittest.TestCase):
H = 'CE754197C21BF3452863B4F44D0B3951F12516EF'
x = '210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967'
self.assertEquals(K, transport._K)
- self.assertEquals(H, paramiko.util.hexify(transport._H))
- self.assertEquals(x, paramiko.util.hexify(str(transport._message)))
+ self.assertEquals(H, hexlify(transport._H).upper())
+ self.assertEquals(x, hexlify(str(transport._message)).upper())
+ self.assert_(transport._activated)
+
+ def test_6_gex_server_with_old_client(self):
+ transport = FakeTransport()
+ transport.server_mode = True
+ kex = KexGex(transport)
+ kex.start_kex()
+ self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST, paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD), transport._expect)
+
+ msg = Message()
+ msg.add_int(2048)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_REQUEST_OLD, msg)
+ x = '1F0000008100FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF0000000102'
+ self.assertEquals(x, hexlify(str(transport._message)).upper())
+ self.assertEquals((paramiko.kex_gex._MSG_KEXDH_GEX_INIT,), transport._expect)
+
+ msg = Message()
+ msg.add_mpint(12345)
+ msg.rewind()
+ kex.parse_next(paramiko.kex_gex._MSG_KEXDH_GEX_INIT, msg)
+ K = 67592995013596137876033460028393339951879041140378510871612128162185209509220726296697886624612526735888348020498716482757677848959420073720160491114319163078862905400020959196386947926388406687288901564192071077389283980347784184487280885335302632305026248574716290537036069329724382811853044654824945750581L
+ H = 'B41A06B2E59043CEFC1AE16EC31F1E2D12EC455B'
+ x = '210000000866616B652D6B6579000000807E2DDB1743F3487D6545F04F1C8476092FB912B013626AB5BCEB764257D88BBA64243B9F348DF7B41B8C814A995E00299913503456983FFB9178D3CD79EB6D55522418A8ABF65375872E55938AB99A84A0B5FC8A1ECC66A7C3766E7E0F80B7CE2C9225FC2DD683F4764244B72963BBB383F529DCF0C5D17740B8A2ADBE9208D40000000866616B652D736967'
+ self.assertEquals(K, transport._K)
+ self.assertEquals(H, hexlify(transport._H).upper())
+ self.assertEquals(x, hexlify(str(transport._message)).upper())
self.assert_(transport._activated)