diff options
Diffstat (limited to 'src/test/ntor_ref.py')
-rwxr-xr-x[-rw-r--r--] | src/test/ntor_ref.py | 72 |
1 files changed, 45 insertions, 27 deletions
diff --git a/src/test/ntor_ref.py b/src/test/ntor_ref.py index ade468da7..7d6e43e71 100644..100755 --- a/src/test/ntor_ref.py +++ b/src/test/ntor_ref.py @@ -1,3 +1,4 @@ +#!/usr/bin/python # Copyright 2012-2013, The Tor Project, Inc # See LICENSE for licensing information @@ -27,17 +28,25 @@ commands: """ import binascii -import curve25519 +try: + import curve25519 + curve25519mod = curve25519.keys +except ImportError: + curve25519 = None + import slownacl_curve25519 + curve25519mod = slownacl_curve25519 + import hashlib import hmac import subprocess +import sys # ********************************************************************** # Helpers and constants def HMAC(key,msg): "Return the HMAC-SHA256 of 'msg' using the key 'key'." - H = hmac.new(key, "", hashlib.sha256) + H = hmac.new(key, b"", hashlib.sha256) H.update(msg) return H.digest() @@ -59,31 +68,38 @@ G_LENGTH = 32 H_LENGTH = 32 PROTOID = b"ntor-curve25519-sha256-1" -M_EXPAND = PROTOID + ":key_expand" -T_MAC = PROTOID + ":mac" -T_KEY = PROTOID + ":key_extract" -T_VERIFY = PROTOID + ":verify" +M_EXPAND = PROTOID + b":key_expand" +T_MAC = PROTOID + b":mac" +T_KEY = PROTOID + b":key_extract" +T_VERIFY = PROTOID + b":verify" def H_mac(msg): return H(msg, tweak=T_MAC) def H_verify(msg): return H(msg, tweak=T_VERIFY) -class PrivateKey(curve25519.keys.Private): - """As curve25519.keys.Private, but doesn't regenerate its public key +class PrivateKey(curve25519mod.Private): + """As curve25519mod.Private, but doesn't regenerate its public key every time you ask for it. """ def __init__(self): - curve25519.keys.Private.__init__(self) + curve25519mod.Private.__init__(self) self._memo_public = None def get_public(self): if self._memo_public is None: - self._memo_public = curve25519.keys.Private.get_public(self) + self._memo_public = curve25519mod.Private.get_public(self) return self._memo_public # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -def kdf_rfc5869(key, salt, info, n): +if sys.version < '3': + def int2byte(i): + return chr(i) +else: + def int2byte(i): + return bytes([i]) + +def kdf_rfc5869(key, salt, info, n): prk = HMAC(key=salt, msg=key) @@ -91,7 +107,7 @@ def kdf_rfc5869(key, salt, info, n): last = b"" i = 1 while len(out) < n: - m = last + info + chr(i) + m = last + info + int2byte(i) last = h = HMAC(key=prk, msg=m) out += h i = i + 1 @@ -177,7 +193,7 @@ def server(seckey_b, my_node_id, message, keyBytes=72): badness = (keyid(seckey_b.get_public()) != message[NODE_ID_LENGTH:NODE_ID_LENGTH+H_LENGTH]) - pubkey_X = curve25519.keys.Public(message[NODE_ID_LENGTH+H_LENGTH:]) + pubkey_X = curve25519mod.Public(message[NODE_ID_LENGTH+H_LENGTH:]) seckey_y = PrivateKey() pubkey_Y = seckey_y.get_public() pubkey_B = seckey_b.get_public() @@ -200,7 +216,7 @@ def server(seckey_b, my_node_id, message, keyBytes=72): pubkey_Y.serialize() + pubkey_X.serialize() + PROTOID + - "Server") + b"Server") msg = pubkey_Y.serialize() + H_mac(auth_input) @@ -240,7 +256,7 @@ def client_part2(seckey_x, msg, node_id, pubkey_B, keyBytes=72): """ assert len(msg) == G_LENGTH + H_LENGTH - pubkey_Y = curve25519.keys.Public(msg[:G_LENGTH]) + pubkey_Y = curve25519mod.Public(msg[:G_LENGTH]) their_auth = msg[G_LENGTH:] pubkey_X = seckey_x.get_public() @@ -262,7 +278,7 @@ def client_part2(seckey_x, msg, node_id, pubkey_B, keyBytes=72): pubkey_B.serialize() + pubkey_Y.serialize() + pubkey_X.serialize() + PROTOID + - "Server") + b"Server") my_auth = H_mac(auth_input) @@ -276,7 +292,7 @@ def client_part2(seckey_x, msg, node_id, pubkey_B, keyBytes=72): # ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -def demo(node_id="iToldYouAboutStairs.", server_key=PrivateKey()): +def demo(node_id=b"iToldYouAboutStairs.", server_key=PrivateKey()): """ Try to handshake with ourself. """ @@ -286,6 +302,7 @@ def demo(node_id="iToldYouAboutStairs.", server_key=PrivateKey()): assert len(skeys) == 72 assert len(ckeys) == 72 assert skeys == ckeys + print("OK") # ====================================================================== def timing(): @@ -295,7 +312,7 @@ def timing(): import timeit t = timeit.Timer(stmt="ntor_ref.demo(N,SK)", setup="import ntor_ref,curve25519;N='ABCD'*5;SK=ntor_ref.PrivateKey()") - print t.timeit(number=1000) + print(t.timeit(number=1000)) # ====================================================================== @@ -306,7 +323,7 @@ def kdf_vectors(): import binascii def kdf_vec(inp): k = kdf(inp, T_KEY, M_EXPAND, 100) - print repr(inp), "\n\""+ binascii.b2a_hex(k)+ "\"" + print(repr(inp), "\n\""+ binascii.b2a_hex(k)+ "\"") kdf_vec("") kdf_vec("Tor") kdf_vec("AN ALARMING ITEM TO FIND ON YOUR CREDIT-RATING STATEMENT") @@ -319,13 +336,13 @@ def test_tor(): Call the test-ntor-cl command-line program to make sure we can interoperate with Tor's ntor program """ - enhex=binascii.b2a_hex + enhex=lambda s: binascii.b2a_hex(s) dehex=lambda s: binascii.a2b_hex(s.strip()) - PROG = "./src/test/test-ntor-cl" + PROG = b"./src/test/test-ntor-cl" def tor_client1(node_id, pubkey_B): " returns (msg, state) " - p = subprocess.Popen([PROG, "client1", enhex(node_id), + p = subprocess.Popen([PROG, b"client1", enhex(node_id), enhex(pubkey_B.serialize())], stdout=subprocess.PIPE) return map(dehex, p.stdout.readlines()) @@ -343,7 +360,7 @@ def test_tor(): return map(dehex, p.stdout.readlines()) - node_id = "thisisatornodeid$#%^" + node_id = b"thisisatornodeid$#%^" seckey_b = PrivateKey() pubkey_B = seckey_b.get_public() @@ -368,13 +385,14 @@ def test_tor(): assert c_keys == s_keys assert len(c_keys) == 90 - print "We just interoperated." + print("OK") # ====================================================================== if __name__ == '__main__': - import sys - if sys.argv[1] == 'gen_kdf_vectors': + if len(sys.argv) < 2: + print(__doc__) + elif sys.argv[1] == 'gen_kdf_vectors': kdf_vectors() elif sys.argv[1] == 'timing': timing() @@ -384,4 +402,4 @@ if __name__ == '__main__': test_tor() else: - print __doc__ + print(__doc__) |