aboutsummaryrefslogtreecommitdiff
path: root/dummyserver
diff options
context:
space:
mode:
authorSVN-Git Migration <python-modules-team@lists.alioth.debian.org>2015-10-08 13:19:30 -0700
committerSVN-Git Migration <python-modules-team@lists.alioth.debian.org>2015-10-08 13:19:30 -0700
commit0c183b9d52b45bac22a2ff9db0e6348b655f4ab2 (patch)
treeffddde52af2caca5d039f3c9f185694f394a39de /dummyserver
downloadpython-urllib3-0c183b9d52b45bac22a2ff9db0e6348b655f4ab2.tar
python-urllib3-0c183b9d52b45bac22a2ff9db0e6348b655f4ab2.tar.gz
Imported Upstream version 1.2.2
Diffstat (limited to 'dummyserver')
-rw-r--r--dummyserver/__init__.py0
-rw-r--r--dummyserver/handlers.py159
-rwxr-xr-xdummyserver/server.py113
-rw-r--r--dummyserver/testcase.py71
4 files changed, 343 insertions, 0 deletions
diff --git a/dummyserver/__init__.py b/dummyserver/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/dummyserver/__init__.py
diff --git a/dummyserver/handlers.py b/dummyserver/handlers.py
new file mode 100644
index 0000000..3e32881
--- /dev/null
+++ b/dummyserver/handlers.py
@@ -0,0 +1,159 @@
+from __future__ import print_function
+
+import gzip
+import logging
+import sys
+import time
+import zlib
+
+from io import BytesIO
+from tornado.wsgi import HTTPRequest
+
+try:
+ from urllib.parse import urlsplit
+except ImportError:
+ from urlparse import urlsplit
+
+log = logging.getLogger(__name__)
+
+
+class Response(object):
+ def __init__(self, body='', status='200 OK', headers=None):
+ if not isinstance(body, bytes):
+ body = body.encode('utf8')
+
+ self.body = body
+ self.status = status
+ self.headers = headers or [("Content-type", "text/plain")]
+
+ def __call__(self, environ, start_response):
+ start_response(self.status, self.headers)
+ return [self.body]
+
+
+class WSGIHandler(object):
+ pass
+
+
+class TestingApp(WSGIHandler):
+ """
+ Simple app that performs various operations, useful for testing an HTTP
+ library.
+
+ Given any path, it will attempt to convert it will load a corresponding
+ local method if it exists. Status code 200 indicates success, 400 indicates
+ failure. Each method has its own conditions for success/failure.
+ """
+ def __call__(self, environ, start_response):
+ req = HTTPRequest(environ)
+
+ req.params = {}
+ for k, v in req.arguments.items():
+ req.params[k] = next(iter(v))
+
+ path = req.path[:]
+ if not path.startswith('/'):
+ path = urlsplit(path).path
+
+ target = path[1:].replace('/', '_')
+ method = getattr(self, target, self.index)
+ resp = method(req)
+
+ if dict(resp.headers).get('Connection') == 'close':
+ # FIXME: Can we kill the connection somehow?
+ pass
+
+ return resp(environ, start_response)
+
+ def index(self, _request):
+ "Render simple message"
+ return Response("Dummy server!")
+
+ def set_up(self, request):
+ test_type = request.params.get('test_type')
+ test_id = request.params.get('test_id')
+ if test_id:
+ print('\nNew test %s: %s' % (test_type, test_id))
+ else:
+ print('\nNew test %s' % test_type)
+ return Response("Dummy server is ready!")
+
+ def specific_method(self, request):
+ "Confirm that the request matches the desired method type"
+ method = request.params.get('method')
+ if method and not isinstance(method, str):
+ method = method.decode('utf8')
+
+ if request.method != method:
+ return Response("Wrong method: %s != %s" %
+ (method, request.method), status='400')
+ return Response()
+
+ def upload(self, request):
+ "Confirm that the uploaded file conforms to specification"
+ # FIXME: This is a huge broken mess
+ param = request.params.get('upload_param', 'myfile').decode('ascii')
+ filename = request.params.get('upload_filename', '').decode('utf-8')
+ size = int(request.params.get('upload_size', '0'))
+ files_ = request.files.get(param)
+
+ if len(files_) != 1:
+ return Response("Expected 1 file for '%s', not %d" %(param, len(files_)),
+ status='400')
+ file_ = files_[0]
+
+ data = file_['body']
+ if int(size) != len(data):
+ return Response("Wrong size: %d != %d" %
+ (size, len(data)), status='400')
+
+ if filename != file_['filename']:
+ return Response("Wrong filename: %s != %s" %
+ (filename, file_.filename), status='400')
+
+ return Response()
+
+ def redirect(self, request):
+ "Perform a redirect to ``target``"
+ target = request.params.get('target', '/')
+ headers = [('Location', target)]
+ return Response(status='303', headers=headers)
+
+ def keepalive(self, request):
+ if request.params.get('close', '0') == '1':
+ headers = [('Connection', 'close')]
+ return Response('Closing', headers=headers)
+
+ headers = [('Connection', 'keep-alive')]
+ return Response('Keeping alive', headers=headers)
+
+ def sleep(self, request):
+ "Sleep for a specified amount of ``seconds``"
+ seconds = float(request.params.get('seconds', '1'))
+ time.sleep(seconds)
+ return Response()
+
+ def echo(self, request):
+ "Echo back the params"
+ if request.method == 'GET':
+ return Response(request.query)
+
+ return Response(request.body)
+
+ def encodingrequest(self, request):
+ "Check for UA accepting gzip/deflate encoding"
+ data = b"hello, world!"
+ encoding = request.headers.get('Accept-Encoding', '')
+ headers = None
+ if 'gzip' in encoding:
+ headers = [('Content-Encoding', 'gzip')]
+ file_ = BytesIO()
+ gzip.GzipFile('', mode='w', fileobj=file_).write(data)
+ data = file_.getvalue()
+ elif 'deflate' in encoding:
+ headers = [('Content-Encoding', 'deflate')]
+ data = zlib.compress(data)
+ return Response(data, headers=headers)
+
+ def shutdown(self, request):
+ sys.exit()
diff --git a/dummyserver/server.py b/dummyserver/server.py
new file mode 100755
index 0000000..529850f
--- /dev/null
+++ b/dummyserver/server.py
@@ -0,0 +1,113 @@
+#!/usr/bin/env python
+
+"""
+Dummy server used for unit testing.
+"""
+from __future__ import print_function
+
+import logging
+import os
+import sys
+import threading
+import socket
+
+import tornado.wsgi
+import tornado.httpserver
+import tornado.ioloop
+
+from dummyserver.handlers import TestingApp
+
+
+log = logging.getLogger(__name__)
+
+CERTS_PATH = os.path.join(os.path.dirname(__file__), 'certs')
+DEFAULT_CERTS = {
+ 'certfile': os.path.join(CERTS_PATH, 'server.crt'),
+ 'keyfile': os.path.join(CERTS_PATH, 'server.key'),
+}
+DEFAULT_CA = os.path.join(CERTS_PATH, 'cacert.pem')
+DEFAULT_CA_BAD = os.path.join(CERTS_PATH, 'client_bad.pem')
+
+
+# Different types of servers we have:
+
+
+class SocketServerThread(threading.Thread):
+ """
+ :param socket_handler: Callable which receives a socket argument for one
+ request.
+ :param ready_lock: Lock which gets released when the socket handler is
+ ready to receive requests.
+ """
+ def __init__(self, socket_handler, host='localhost', port=8081,
+ ready_lock=None):
+ threading.Thread.__init__(self)
+
+ self.socket_handler = socket_handler
+ self.host = host
+ self.port = port
+ self.ready_lock = ready_lock
+
+ def _start_server(self):
+ sock = socket.socket()
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind((self.host, self.port))
+
+ # Once listen() returns, the server socket is ready
+ sock.listen(1)
+
+ if self.ready_lock:
+ self.ready_lock.release()
+
+ self.socket_handler(sock)
+
+ def run(self):
+ self.server = self._start_server()
+
+
+class TornadoServerThread(threading.Thread):
+ def __init__(self, host='localhost', port=8081, scheme='http', certs=None):
+ threading.Thread.__init__(self)
+
+ self.host = host
+ self.port = port
+ self.scheme = scheme
+ self.certs = certs
+
+ def _start_server(self):
+ container = tornado.wsgi.WSGIContainer(TestingApp())
+
+ if self.scheme == 'https':
+ http_server = tornado.httpserver.HTTPServer(container,
+ ssl_options=self.certs)
+ else:
+ http_server = tornado.httpserver.HTTPServer(container)
+
+ http_server.listen(self.port)
+ return http_server
+
+ def run(self):
+ self.server = self._start_server()
+ self.ioloop = tornado.ioloop.IOLoop.instance()
+ self.ioloop.start()
+
+ def stop(self):
+ self.server.stop()
+ self.ioloop.stop()
+
+
+if __name__ == '__main__':
+ log.setLevel(logging.DEBUG)
+ log.addHandler(logging.StreamHandler(sys.stderr))
+
+ from urllib3 import get_host
+
+ url = "http://localhost:8081"
+ if len(sys.argv) > 1:
+ url = sys.argv[1]
+
+ print("Starting WGI server at: %s" % url)
+
+ scheme, host, port = get_host(url)
+ t = TornadoServerThread(scheme=scheme, host=host, port=port)
+ t.start()
diff --git a/dummyserver/testcase.py b/dummyserver/testcase.py
new file mode 100644
index 0000000..518d739
--- /dev/null
+++ b/dummyserver/testcase.py
@@ -0,0 +1,71 @@
+import unittest
+
+from threading import Lock
+
+from dummyserver.server import (
+ TornadoServerThread, SocketServerThread,
+ DEFAULT_CERTS,
+)
+
+
+# TODO: Change ports to auto-allocated?
+
+
+class SocketDummyServerTestCase(unittest.TestCase):
+ """
+ A simple socket-based server is created for this class that is good for
+ exactly one request.
+ """
+ scheme = 'http'
+ host = 'localhost'
+ port = 18080
+
+ @classmethod
+ def _start_server(cls, socket_handler):
+ ready_lock = Lock()
+ ready_lock.acquire()
+ cls.server_thread = SocketServerThread(socket_handler=socket_handler,
+ ready_lock=ready_lock,
+ host=cls.host, port=cls.port)
+ cls.server_thread.start()
+
+ # Lock gets released by thread above
+ ready_lock.acquire()
+
+
+class HTTPDummyServerTestCase(unittest.TestCase):
+ scheme = 'http'
+ host = 'localhost'
+ host_alt = '127.0.0.1' # Some tests need two hosts
+ port = 18081
+ certs = DEFAULT_CERTS
+
+ @classmethod
+ def _start_server(cls):
+ cls.server_thread = TornadoServerThread(host=cls.host, port=cls.port,
+ scheme=cls.scheme,
+ certs=cls.certs)
+ cls.server_thread.start()
+
+ # TODO: Loop-check here instead
+ import time
+ time.sleep(0.1)
+
+ @classmethod
+ def _stop_server(cls):
+ cls.server_thread.stop()
+
+ @classmethod
+ def setUpClass(cls):
+ cls._start_server()
+
+ @classmethod
+ def tearDownClass(cls):
+ cls._stop_server()
+
+
+class HTTPSDummyServerTestCase(HTTPDummyServerTestCase):
+ scheme = 'https'
+ host = 'localhost'
+ port = 18082
+ certs = DEFAULT_CERTS