From 8d75181d9c2a45b555f27e5126173026987ddf0d Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Wed, 5 Jun 2013 15:48:57 +0200 Subject: Add 'verify' command. Addresses #8531. --- lib/chutney/TorNet.py | 27 +++++ lib/chutney/Traffic.py | 279 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 306 insertions(+) create mode 100755 lib/chutney/Traffic.py diff --git a/lib/chutney/TorNet.py b/lib/chutney/TorNet.py index dcaf7d8..3ec33b6 100644 --- a/lib/chutney/TorNet.py +++ b/lib/chutney/TorNet.py @@ -1,6 +1,7 @@ #!/usr/bin/python # # Copyright 2011 Nick Mathewson, Michael Stone +# Copyright 2013 The Tor Project # # You may do anything with this work that copyright law would normally # restrict, so long as you retain the above notice(s) and this license @@ -21,6 +22,7 @@ import errno import time import chutney.Templating +import chutney.Traffic def mkdir_p(d, mode=0777): """Create directory 'd' and all of its parents as needed. Unlike @@ -621,6 +623,31 @@ class Network(object): for c in controllers: n.check(listNonRunning=False) + def verify(self): + sys.stdout.write("Verifying data transmission: ") + sys.stdout.flush() + status = self._verify_traffic() + if status: + print("Success") + else: + print("Failure") + return status + + def _verify_traffic(self): + """Verify (parts of) the network by sending traffic through it + and verify what is received.""" + LISTEN_PORT = 4747 # FIXME: Do better! Note the default exit policy. + DATALEN = 10*1024 # Octets. + TIMEOUT = 3 # Seconds. + with open('/dev/urandom', 'r') as randfp: + tmpdata = randfp.read(DATALEN) + bind_to = ('localhost', LISTEN_PORT) + tt = chutney.Traffic.TrafficTester(bind_to, tmpdata, TIMEOUT) + for op in filter(lambda n: n._env['tag'] == 'c', self._nodes): + tt.add(chutney.Traffic.Source(tt, bind_to, tmpdata, + ('localhost', int(op._env['socksport'])))) + return tt.run() + def ConfigureNodes(nodelist): network = _THE_NETWORK diff --git a/lib/chutney/Traffic.py b/lib/chutney/Traffic.py new file mode 100755 index 0000000..4ab4fff --- /dev/null +++ b/lib/chutney/Traffic.py @@ -0,0 +1,279 @@ +#! /usr/bin/env python +# +# Copyright 2013 The Tor Project +# +# You may do anything with this work that copyright law would normally +# restrict, so long as you retain the above notice(s) and this license +# in all redistributed copies and derived works. There is no warranty. + +import socket +import select +import struct + +debug_flag = False + +def debug(s): + if debug_flag: + print("DEBUG: %s" % s) + +def socks_cmd(addr_port): + """ + SOCKSv4: https://en.wikipedia.org/wiki/SOCKS#Protocol + SOCKSv5: RFC1928, RFC1929 + """ + ver = 4 # Only SOCKSv4 for now. + cmd = 1 # Stream connection. + user = '\x00' + dnsname = '' + host, port = addr_port + try: + addr = socket.inet_aton(host) + except socket.error: + addr = '\x00\x00\x00\x01' + dnsname = '%s\x00' % host + return struct.pack('!BBH', ver, cmd, port) + addr + user + dnsname + +class TestSuite(object): + """Keep a tab on how many tests are pending, how many have failed + and how many have succeeded.""" + def __init__(self): + self.not_done = 0 + self.successes = 0 + self.failures = 0 + + def add(self): + self.not_done += 1 + + def success(self): + self.not_done -= 1 + self.successes += 1 + + def failure(self): + self.not_done -= 1 + self.failures += 1 + + def failure_count(self): + return self.failures + + def all_done(self): + return self.not_done == 0 + + def status(self): + return('%d/%d/%d' % (self.not_done, self.successes, self.failures)) + +class Peer(object): + LISTENER = 1 + SOURCE = 2 + SINK = 3 + + def __init__(self, ptype, tt, s=None): + self.type = ptype + self.tt = tt # TrafficTester + if s is not None: + self.s = s + else: + self.s = socket.socket() + self.s.setblocking(False) + + def fd(self): + return self.s.fileno() + def is_source(self): + return self.type == self.SOURCE + def is_sink(self): + return self.type == self.SINK + +class Listener(Peer): + def __init__(self, tt, endpoint): + super(Listener, self).__init__(Peer.LISTENER, tt) + self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.s.bind(endpoint) + self.s.listen(0) + + def accept(self): + newsock, endpoint = self.s.accept() + debug("new client from %s:%s (fd=%d)" % + (endpoint[0], endpoint[1], newsock.fileno())) + self.tt.add(Sink(self.tt, newsock)) + +class Sink(Peer): + def __init__(self, tt, s): + super(Sink, self).__init__(Peer.SINK, tt, s) + self.inbuf = '' + + def on_readable(self): + return self.verify(self.tt.data) + + def verify(self, data): + self.inbuf += self.s.recv(len(data) - len(self.inbuf)) + assert(len(self.inbuf) <= len(data)) + if len(self.inbuf) == len(data): + if self.inbuf != data: + return -1 # Failed verification. + else: + debug("successful verification") + return len(data) - len(self.inbuf) + +class Source(Peer): + NOT_CONNECTED = 0 + CONNECTING = 1 + CONNECTING_THROUGH_PROXY = 2 + CONNECTED = 5 + + def __init__(self, tt, server, buf, proxy=None): + super(Source, self).__init__(Peer.SOURCE, tt) + self.state = self.NOT_CONNECTED + self.data = buf + self.outbuf = '' + self.inbuf = '' + self.proxy = proxy + self.connect(server) + + def connect(self, endpoint): + self.dest = endpoint + self.state = self.CONNECTING + if self.proxy is None: + dest = self.dest + else: + dest = self.proxy + try: + self.s.connect(dest) + except socket.error, e: + if e[0] != 115: # EINPROGRESS + raise + + def on_readable(self): + if self.state == self.CONNECTING_THROUGH_PROXY: + self.inbuf += self.s.recv(8 - len(self.inbuf)) + if len(self.inbuf) == 8: + if ord(self.inbuf[0]) == 0 and ord(self.inbuf[1]) == 0x5a: + debug("proxy handshake successful (fd=%d)" % self.fd()) + self.state = self.CONNECTED + self.inbuf = '' + self.outbuf = self.data + else: + debug("proxy handshake failed (0x%x)! (fd=%d)" % + (ord(self.inbuf[1]), self.fd())) + self.state = self.NOT_CONNECTED + return -1 + return 8 - len(self.inbuf) + return 1 # Keep us around for writing. + + def want_to_write(self): + if self.state == self.CONNECTING: + return True + if len(self.outbuf) > 0: + return True + return False + + def on_writable(self): + if self.state == self.CONNECTING: + if self.proxy is None: + self.state = self.CONNECTED + self.outbuf = self.data + else: + self.state = self.CONNECTING_THROUGH_PROXY + self.outbuf = socks_cmd(self.dest) + try: + n = self.s.send(self.outbuf) + except socket.error, e: + if e[0] == 111: # ECONNREFUSED + debug("connection refused (fd=%d)" % self.fd()) + return -1 + raise + self.outbuf = self.outbuf[n:] + debug("successfully connected (fd=%d)" % self.fd()) + if self.state == self.CONNECTING_THROUGH_PROXY: + return 1 # Keep us around. + return len(self.outbuf) # When 0, we're being removed. + +class TrafficTester(): + def __init__(self, endpoint, data={}, timeout=3): + self.listener = Listener(self, endpoint) + self.pending_close = [] + self.timeout = timeout + self.tests = TestSuite() + self.data = data + debug("listener fd=%d" % self.listener.fd()) + self.peers = {} # fd:Peer + + def sinks(self): + return self.get_by_ptype(Peer.SINK) + def sources(self): + return self.get_by_ptype(Peer.SOURCE) + def get_by_ptype(self, ptype): + return filter(lambda p: p.type == ptype, self.peers.itervalues()) + + def add(self, peer): + self.peers[peer.fd()] = peer + if peer.is_source(): + self.tests.add() + + def remove(self, peer): + self.peers.pop(peer.fd()) + self.pending_close.append(peer.s) + + def run(self): + while True: + if self.tests.all_done() or self.timeout == 0: + break + rset = [self.listener.fd()] + list(self.peers) + wset = [p.fd() for p in + filter(lambda x: x.want_to_write(), self.sources())] + #debug("rset %s wset %s" % (rset, wset)) + sets = select.select(rset, wset, [], 1) + if all(len(s)==0 for s in sets): + self.timeout -= 1 + continue + + for fd in sets[0]: # readable fd's + if fd == self.listener.fd(): + self.listener.accept() + continue + p = self.peers[fd] + n = p.on_readable() + if n > 0: + #debug("need %d more octets from fd %d" % (n, fd)) + pass + elif n == 0: # Success. + self.tests.success() + self.remove(p) + else: # Failure. + self.tests.failure() + if p.is_sink(): + print("verification failed!") + self.remove(p) + + for fd in sets[1]: # writable fd's + p = self.peers.get(fd) # Might have been removed above. + if p is not None: + n = p.on_writable() + if n == 0: + self.remove(p) + elif n < 0: + self.tests.failure() + self.remove(p) + + self.listener.s.close() + for s in self.pending_close: + s.close() + return self.tests.all_done() and self.tests.failure_count() == 0 + +import sys + +def main(): + """Test the TrafficTester by sending and receiving some data.""" + DATA = "a foo is a bar" * 1000 + DATA_CORRUPT = "a foo is a baz" * 1000 + proxy = ('localhost', 9008) + bind_to = ('localhost', int(sys.argv[1])) + + tt = TrafficTester(bind_to, DATA) + tt.add(Source(tt, bind_to, DATA, proxy)) + success = tt.run() + + if success: + return 0 + return -1 + +if __name__ == '__main__': + sys.exit(main()) -- cgit v1.2.3 From 19abaf8052e8a561536ec102f1c4f624d2494de6 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Wed, 5 Jun 2013 15:48:57 +0200 Subject: Make chutney exit with -1 on failure. If the function implementing the command (the verb, in argv[1]) return False, exit with -1. Else exit with 0 as before. --- lib/chutney/TorNet.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/chutney/TorNet.py b/lib/chutney/TorNet.py index 3ec33b6..e79f0a8 100644 --- a/lib/chutney/TorNet.py +++ b/lib/chutney/TorNet.py @@ -673,7 +673,7 @@ def runConfigFile(verb, f): print "Error: I don't know how to %s." % verb return - getattr(network,verb)() + return getattr(network,verb)() def main(): global _BASE_ENVIRON @@ -687,7 +687,10 @@ def main(): sys.exit(1) f = open(sys.argv[2]) - runConfigFile(sys.argv[1], f) + result = runConfigFile(sys.argv[1], f) + if result is False: + return -1 + return 0 if __name__ == '__main__': - main() + sys.exit(main()) -- cgit v1.2.3 From dfa4cd3a77ff5e5b3b2dfefff167efad3f546a8e Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Wed, 5 Jun 2013 15:48:57 +0200 Subject: Use values from errno instead of literal constants. Now it might even have a chance of working on non-linuxes! --- lib/chutney/Traffic.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) mode change 100755 => 100644 lib/chutney/Traffic.py diff --git a/lib/chutney/Traffic.py b/lib/chutney/Traffic.py old mode 100755 new mode 100644 index 4ab4fff..efee8e2 --- a/lib/chutney/Traffic.py +++ b/lib/chutney/Traffic.py @@ -9,6 +9,7 @@ import socket import select import struct +import errno debug_flag = False @@ -138,7 +139,7 @@ class Source(Peer): try: self.s.connect(dest) except socket.error, e: - if e[0] != 115: # EINPROGRESS + if e[0] != errno.EINPROGRESS: raise def on_readable(self): @@ -176,7 +177,7 @@ class Source(Peer): try: n = self.s.send(self.outbuf) except socket.error, e: - if e[0] == 111: # ECONNREFUSED + if e[0] == errno.ECONNREFUSED: debug("connection refused (fd=%d)" % self.fd()) return -1 raise @@ -185,7 +186,7 @@ class Source(Peer): if self.state == self.CONNECTING_THROUGH_PROXY: return 1 # Keep us around. return len(self.outbuf) # When 0, we're being removed. - + class TrafficTester(): def __init__(self, endpoint, data={}, timeout=3): self.listener = Listener(self, endpoint) -- cgit v1.2.3 From c987c77d183b462c3b960039abca020ad43b5133 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Wed, 5 Jun 2013 15:48:57 +0200 Subject: When testing traffic, bind to 127.0.0.1 rather than localhost. Proxy handshake fails with 0x5b on FreeBSD. I've seen it on Debian too but never chased it down. --- lib/chutney/TorNet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/chutney/TorNet.py b/lib/chutney/TorNet.py index e79f0a8..4ebcc2d 100644 --- a/lib/chutney/TorNet.py +++ b/lib/chutney/TorNet.py @@ -641,7 +641,7 @@ class Network(object): TIMEOUT = 3 # Seconds. with open('/dev/urandom', 'r') as randfp: tmpdata = randfp.read(DATALEN) - bind_to = ('localhost', LISTEN_PORT) + bind_to = ('127.0.0.1', LISTEN_PORT) tt = chutney.Traffic.TrafficTester(bind_to, tmpdata, TIMEOUT) for op in filter(lambda n: n._env['tag'] == 'c', self._nodes): tt.add(chutney.Traffic.Source(tt, bind_to, tmpdata, -- cgit v1.2.3 From dc66b42be6a19cb5bb31ee1108c7d8a3c769491a Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Wed, 5 Jun 2013 15:48:57 +0200 Subject: Report "successfully connected" at most once. --- lib/chutney/Traffic.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/chutney/Traffic.py b/lib/chutney/Traffic.py index efee8e2..694cd87 100644 --- a/lib/chutney/Traffic.py +++ b/lib/chutney/Traffic.py @@ -151,6 +151,7 @@ class Source(Peer): self.state = self.CONNECTED self.inbuf = '' self.outbuf = self.data + debug("successfully connected (fd=%d)" % self.fd()) else: debug("proxy handshake failed (0x%x)! (fd=%d)" % (ord(self.inbuf[1]), self.fd())) @@ -171,6 +172,7 @@ class Source(Peer): if self.proxy is None: self.state = self.CONNECTED self.outbuf = self.data + debug("successfully connected (fd=%d)" % self.fd()) else: self.state = self.CONNECTING_THROUGH_PROXY self.outbuf = socks_cmd(self.dest) @@ -182,7 +184,6 @@ class Source(Peer): return -1 raise self.outbuf = self.outbuf[n:] - debug("successfully connected (fd=%d)" % self.fd()) if self.state == self.CONNECTING_THROUGH_PROXY: return 1 # Keep us around. return len(self.outbuf) # When 0, we're being removed. -- cgit v1.2.3 From acb1aa3c2987eab96f86df87c26ddeaa7c646d45 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Wed, 5 Jun 2013 15:48:57 +0200 Subject: Exit with 255 since that's more explicit. Rationale behind exit(-1) is that it turns into highest available number which hopefully won't collide with a wrapping shell script trying to exit with different codes depending on error (like 1, 2, 3). --- lib/chutney/Traffic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/chutney/Traffic.py b/lib/chutney/Traffic.py index 694cd87..9b0a1f7 100644 --- a/lib/chutney/Traffic.py +++ b/lib/chutney/Traffic.py @@ -275,7 +275,7 @@ def main(): if success: return 0 - return -1 + return 255 if __name__ == '__main__': sys.exit(main()) -- cgit v1.2.3 From 69301e1cb215576ff116b285f9098c243d1eeaf5 Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Wed, 5 Jun 2013 15:48:57 +0200 Subject: Add some documentation and comments. --- lib/chutney/Traffic.py | 49 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/lib/chutney/Traffic.py b/lib/chutney/Traffic.py index 9b0a1f7..ab9a578 100644 --- a/lib/chutney/Traffic.py +++ b/lib/chutney/Traffic.py @@ -6,19 +6,38 @@ # restrict, so long as you retain the above notice(s) and this license # in all redistributed copies and derived works. There is no warranty. +# Do select/read/write for binding to a port, connecting to it and +# write, read what's written and verify it. You can connect over a +# SOCKS proxy (like Tor). +# +# You can create a TrafficTester and give it an IP address/host and +# port to bind to. If a Source is created and added to the +# TrafficTester, it will connect to the address/port it was given at +# instantiation and send its data. A Source can be configured to +# connect over a SOCKS proxy. When everything is set up, you can +# invoke TrafficTester.run() to start running. The TrafficTester will +# accept the incoming connection and read from it, verifying the data. +# +# For example code, see main() below. + import socket import select import struct import errno +# Set debug_flag=True in order to debug this program or to get hints +# about what's going wrong in your system. debug_flag = False def debug(s): + "Print a debug message on stdout if debug_flag is True." if debug_flag: print("DEBUG: %s" % s) def socks_cmd(addr_port): """ + Return a SOCKS command for connecting to addr_port. + SOCKSv4: https://en.wikipedia.org/wiki/SOCKS#Protocol SOCKSv5: RFC1928, RFC1929 """ @@ -63,6 +82,7 @@ class TestSuite(object): return('%d/%d/%d' % (self.not_done, self.successes, self.failures)) class Peer(object): + "Base class for Listener, Source and Sink." LISTENER = 1 SOURCE = 2 SINK = 3 @@ -84,6 +104,7 @@ class Peer(object): return self.type == self.SINK class Listener(Peer): + "A TCP listener, binding, listening and accepting new connections." def __init__(self, tt, endpoint): super(Listener, self).__init__(Peer.LISTENER, tt) self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -97,11 +118,17 @@ class Listener(Peer): self.tt.add(Sink(self.tt, newsock)) class Sink(Peer): + "A data sink, reading from its peer and verifying the data." def __init__(self, tt, s): super(Sink, self).__init__(Peer.SINK, tt, s) self.inbuf = '' def on_readable(self): + """Invoked when the socket becomes readable. + Return 0 on finished, successful verification. + -1 on failed verification + >0 if more data needs to be read + """ return self.verify(self.tt.data) def verify(self, data): @@ -115,6 +142,8 @@ class Sink(Peer): return len(data) - len(self.inbuf) class Source(Peer): + """A data source, connecting to a TCP server, optionally over a + SOCKS proxy, sending data.""" NOT_CONNECTED = 0 CONNECTING = 1 CONNECTING_THROUGH_PROXY = 2 @@ -143,6 +172,10 @@ class Source(Peer): raise def on_readable(self): + """Invoked when the socket becomes readable. + Return -1 on failure + >0 if more data needs to be read or written + """ if self.state == self.CONNECTING_THROUGH_PROXY: self.inbuf += self.s.recv(8 - len(self.inbuf)) if len(self.inbuf) == 8: @@ -168,6 +201,11 @@ class Source(Peer): return False def on_writable(self): + """Invoked when the socket becomes writable. + Return 0 when done writing + -1 on failure (like connection refused) + >0 if more data needs to be written + """ if self.state == self.CONNECTING: if self.proxy is None: self.state = self.CONNECTED @@ -189,6 +227,13 @@ class Source(Peer): return len(self.outbuf) # When 0, we're being removed. class TrafficTester(): + """ + Hang on select.select() and dispatch to Sources and Sinks. + Time out after self.timeout seconds. + Keep track of successful and failed data verification using a + TestSuite. + Return True if all tests succeed, else False. + """ def __init__(self, endpoint, data={}, timeout=3): self.listener = Listener(self, endpoint) self.pending_close = [] @@ -246,8 +291,8 @@ class TrafficTester(): self.remove(p) for fd in sets[1]: # writable fd's - p = self.peers.get(fd) # Might have been removed above. - if p is not None: + p = self.peers.get(fd) + if p is not None: # Might have been removed above. n = p.on_writable() if n == 0: self.remove(p) -- cgit v1.2.3 From 2b948b5631ba6c5acfc381050520dc32dfac9a2e Mon Sep 17 00:00:00 2001 From: Linus Nordberg Date: Wed, 5 Jun 2013 15:48:57 +0200 Subject: Return 1 from Source.on_readable() when connected over proxy. We used to return 8 by accident (8 minus length of emptied inbuf). This is more explicit. --- lib/chutney/Traffic.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/chutney/Traffic.py b/lib/chutney/Traffic.py index ab9a578..7202c3d 100644 --- a/lib/chutney/Traffic.py +++ b/lib/chutney/Traffic.py @@ -185,11 +185,13 @@ class Source(Peer): self.inbuf = '' self.outbuf = self.data debug("successfully connected (fd=%d)" % self.fd()) + return 1 # Keep us around for writing. else: debug("proxy handshake failed (0x%x)! (fd=%d)" % (ord(self.inbuf[1]), self.fd())) self.state = self.NOT_CONNECTED return -1 + assert(8 - len(self.inbuf) > 0) return 8 - len(self.inbuf) return 1 # Keep us around for writing. -- cgit v1.2.3