aboutsummaryrefslogtreecommitdiff
path: root/dummyserver/handlers.py
blob: 5d6e2e6b978fc8d3a611c36fd9efdaf9eec5e479 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
from __future__ import print_function

import gzip
import json
import logging
import sys
import time
import zlib

from io import BytesIO
from tornado.wsgi import HTTPRequest

try:
    from urllib.parse import urlsplit
except ImportError:
    from urlparse import urlsplit

log = logging.getLogger(__name__)


class Response(object):
    def __init__(self, body='', status='200 OK', headers=None):
        if not isinstance(body, bytes):
            body = body.encode('utf8')

        self.body = body
        self.status = status
        self.headers = headers or [("Content-type", "text/plain")]

    def __call__(self, environ, start_response):
        start_response(self.status, self.headers)
        return [self.body]


class WSGIHandler(object):
    pass


class TestingApp(WSGIHandler):
    """
    Simple app that performs various operations, useful for testing an HTTP
    library.

    Given any path, it will attempt to convert it will load a corresponding
    local method if it exists. Status code 200 indicates success, 400 indicates
    failure. Each method has its own conditions for success/failure.
    """
    def __call__(self, environ, start_response):
        req = HTTPRequest(environ)

        req.params = {}
        for k, v in req.arguments.items():
            req.params[k] = next(iter(v))

        path = req.path[:]
        if not path.startswith('/'):
            path = urlsplit(path).path

        target = path[1:].replace('/', '_')
        method = getattr(self, target, self.index)
        resp = method(req)

        if dict(resp.headers).get('Connection') == 'close':
            # FIXME: Can we kill the connection somehow?
            pass

        return resp(environ, start_response)

    def index(self, _request):
        "Render simple message"
        return Response("Dummy server!")

    def source_address(self, request):
        """Return the requester's IP address."""
        return Response(request.remote_ip)

    def set_up(self, request):
        test_type = request.params.get('test_type')
        test_id = request.params.get('test_id')
        if test_id:
            print('\nNew test %s: %s' % (test_type, test_id))
        else:
            print('\nNew test %s' % test_type)
        return Response("Dummy server is ready!")

    def specific_method(self, request):
        "Confirm that the request matches the desired method type"
        method = request.params.get('method')
        if method and not isinstance(method, str):
            method = method.decode('utf8')

        if request.method != method:
            return Response("Wrong method: %s != %s" %
                            (method, request.method), status='400 Bad Request')
        return Response()

    def upload(self, request):
        "Confirm that the uploaded file conforms to specification"
        # FIXME: This is a huge broken mess
        param = request.params.get('upload_param', 'myfile').decode('ascii')
        filename = request.params.get('upload_filename', '').decode('utf-8')
        size = int(request.params.get('upload_size', '0'))
        files_ = request.files.get(param)

        if len(files_) != 1:
            return Response("Expected 1 file for '%s', not %d" %(param, len(files_)),
                                                    status='400 Bad Request')
        file_ = files_[0]

        data = file_['body']
        if int(size) != len(data):
            return Response("Wrong size: %d != %d" %
                            (size, len(data)), status='400 Bad Request')

        if filename != file_['filename']:
            return Response("Wrong filename: %s != %s" %
                            (filename, file_.filename),
                            status='400 Bad Request')

        return Response()

    def redirect(self, request):
        "Perform a redirect to ``target``"
        target = request.params.get('target', '/')
        headers = [('Location', target)]
        return Response(status='303 See Other', headers=headers)

    def keepalive(self, request):
        if request.params.get('close', b'0') == b'1':
            headers = [('Connection', 'close')]
            return Response('Closing', headers=headers)

        headers = [('Connection', 'keep-alive')]
        return Response('Keeping alive', headers=headers)

    def sleep(self, request):
        "Sleep for a specified amount of ``seconds``"
        seconds = float(request.params.get('seconds', '1'))
        time.sleep(seconds)
        return Response()

    def echo(self, request):
        "Echo back the params"
        if request.method == 'GET':
            return Response(request.query)

        return Response(request.body)

    def encodingrequest(self, request):
        "Check for UA accepting gzip/deflate encoding"
        data = b"hello, world!"
        encoding = request.headers.get('Accept-Encoding', '')
        headers = None
        if encoding == 'gzip':
            headers = [('Content-Encoding', 'gzip')]
            file_ = BytesIO()
            zipfile = gzip.GzipFile('', mode='w', fileobj=file_)
            zipfile.write(data)
            zipfile.close()
            data = file_.getvalue()
        elif encoding == 'deflate':
            headers = [('Content-Encoding', 'deflate')]
            data = zlib.compress(data)
        elif encoding == 'garbage-gzip':
            headers = [('Content-Encoding', 'gzip')]
            data = 'garbage'
        elif encoding == 'garbage-deflate':
            headers = [('Content-Encoding', 'deflate')]
            data = 'garbage'
        return Response(data, headers=headers)

    def headers(self, request):
        return Response(json.dumps(request.headers))

    def shutdown(self, request):
        sys.exit()


# RFC2231-aware replacement of internal tornado function
def _parse_header(line):
    r"""Parse a Content-type like header.

    Return the main content-type and a dictionary of options.

    >>> d = _parse_header("CD: fd; foo=\"bar\"; file*=utf-8''T%C3%A4st")[1]
    >>> d['file'] == 'T\u00e4st'
    True
    >>> d['foo']
    'bar'
    """
    import tornado.httputil
    import email.utils
    from urllib3.packages import six
    if not six.PY3:
        line = line.encode('utf-8')
    parts = tornado.httputil._parseparam(';' + line)
    key = next(parts)
    # decode_params treats first argument special, but we already stripped key
    params = [('Dummy', 'value')]
    for p in parts:
        i = p.find('=')
        if i >= 0:
            name = p[:i].strip().lower()
            value = p[i + 1:].strip()
            params.append((name, value))
    params = email.utils.decode_params(params)
    params.pop(0) # get rid of the dummy again
    pdict = {}
    for name, value in params:
        print(repr(value))
        value = email.utils.collapse_rfc2231_value(value)
        if len(value) >= 2 and value[0] == '"' and value[-1] == '"':
            value = value[1:-1]
        pdict[name] = value
    return key, pdict

# TODO: make the following conditional as soon as we know a version
#       which does not require this fix.
#       See https://github.com/facebook/tornado/issues/868
if True:
    import tornado.httputil
    tornado.httputil._parse_header = _parse_header