diff options
Diffstat (limited to 'test/test_response.py')
-rw-r--r-- | test/test_response.py | 632 |
1 files changed, 632 insertions, 0 deletions
diff --git a/test/test_response.py b/test/test_response.py new file mode 100644 index 0000000..47d0521 --- /dev/null +++ b/test/test_response.py @@ -0,0 +1,632 @@ +import unittest + +from io import BytesIO, BufferedReader + +try: + import http.client as httplib +except ImportError: + import httplib +from urllib3.response import HTTPResponse +from urllib3.exceptions import DecodeError, ResponseNotChunked, ProtocolError + + +from base64 import b64decode + +# A known random (i.e, not-too-compressible) payload generated with: +# "".join(random.choice(string.printable) for i in xrange(512)) +# .encode("zlib").encode("base64") +# Randomness in tests == bad, and fixing a seed may not be sufficient. +ZLIB_PAYLOAD = b64decode(b"""\ +eJwFweuaoQAAANDfineQhiKLUiaiCzvuTEmNNlJGiL5QhnGpZ99z8luQfe1AHoMioB+QSWHQu/L+ +lzd7W5CipqYmeVTBjdgSATdg4l4Z2zhikbuF+EKn69Q0DTpdmNJz8S33odfJoVEexw/l2SS9nFdi +pis7KOwXzfSqarSo9uJYgbDGrs1VNnQpT9f8zAorhYCEZronZQF9DuDFfNK3Hecc+WHLnZLQptwk +nufw8S9I43sEwxsT71BiqedHo0QeIrFE01F/4atVFXuJs2yxIOak3bvtXjUKAA6OKnQJ/nNvDGKZ +Khe5TF36JbnKVjdcL1EUNpwrWVfQpFYJ/WWm2b74qNeSZeQv5/xBhRdOmKTJFYgO96PwrHBlsnLn +a3l0LwJsloWpMbzByU5WLbRE6X5INFqjQOtIwYz5BAlhkn+kVqJvWM5vBlfrwP42ifonM5yF4ciJ +auHVks62997mNGOsM7WXNG3P98dBHPo2NhbTvHleL0BI5dus2JY81MUOnK3SGWLH8HeWPa1t5KcW +S5moAj5HexY/g/F8TctpxwsvyZp38dXeLDjSQvEQIkF7XR3YXbeZgKk3V34KGCPOAeeuQDIgyVhV +nP4HF2uWHA==""") + + +class TestLegacyResponse(unittest.TestCase): + def test_getheaders(self): + headers = {'host': 'example.com'} + r = HTTPResponse(headers=headers) + self.assertEqual(r.getheaders(), headers) + + def test_getheader(self): + headers = {'host': 'example.com'} + r = HTTPResponse(headers=headers) + self.assertEqual(r.getheader('host'), 'example.com') + + +class TestResponse(unittest.TestCase): + def test_cache_content(self): + r = HTTPResponse('foo') + self.assertEqual(r.data, 'foo') + self.assertEqual(r._body, 'foo') + + def test_default(self): + r = HTTPResponse() + self.assertEqual(r.data, None) + + def test_none(self): + r = HTTPResponse(None) + self.assertEqual(r.data, None) + + def test_preload(self): + fp = BytesIO(b'foo') + + r = HTTPResponse(fp, preload_content=True) + + self.assertEqual(fp.tell(), len(b'foo')) + self.assertEqual(r.data, b'foo') + + def test_no_preload(self): + fp = BytesIO(b'foo') + + r = HTTPResponse(fp, preload_content=False) + + self.assertEqual(fp.tell(), 0) + self.assertEqual(r.data, b'foo') + self.assertEqual(fp.tell(), len(b'foo')) + + def test_decode_bad_data(self): + fp = BytesIO(b'\x00' * 10) + self.assertRaises(DecodeError, HTTPResponse, fp, headers={ + 'content-encoding': 'deflate' + }) + + def test_reference_read(self): + fp = BytesIO(b'foo') + r = HTTPResponse(fp, preload_content=False) + + self.assertEqual(r.read(1), b'f') + self.assertEqual(r.read(2), b'oo') + self.assertEqual(r.read(), b'') + self.assertEqual(r.read(), b'') + + def test_decode_deflate(self): + import zlib + data = zlib.compress(b'foo') + + fp = BytesIO(data) + r = HTTPResponse(fp, headers={'content-encoding': 'deflate'}) + + self.assertEqual(r.data, b'foo') + + def test_decode_deflate_case_insensitve(self): + import zlib + data = zlib.compress(b'foo') + + fp = BytesIO(data) + r = HTTPResponse(fp, headers={'content-encoding': 'DeFlAtE'}) + + self.assertEqual(r.data, b'foo') + + def test_chunked_decoding_deflate(self): + import zlib + data = zlib.compress(b'foo') + + fp = BytesIO(data) + r = HTTPResponse(fp, headers={'content-encoding': 'deflate'}, + preload_content=False) + + self.assertEqual(r.read(3), b'') + self.assertEqual(r.read(1), b'f') + self.assertEqual(r.read(2), b'oo') + self.assertEqual(r.read(), b'') + self.assertEqual(r.read(), b'') + + + def test_chunked_decoding_deflate2(self): + import zlib + compress = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS) + data = compress.compress(b'foo') + data += compress.flush() + + fp = BytesIO(data) + r = HTTPResponse(fp, headers={'content-encoding': 'deflate'}, + preload_content=False) + + self.assertEqual(r.read(1), b'') + self.assertEqual(r.read(1), b'f') + self.assertEqual(r.read(2), b'oo') + self.assertEqual(r.read(), b'') + self.assertEqual(r.read(), b'') + + + def test_chunked_decoding_gzip(self): + import zlib + compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS) + data = compress.compress(b'foo') + data += compress.flush() + + fp = BytesIO(data) + r = HTTPResponse(fp, headers={'content-encoding': 'gzip'}, + preload_content=False) + + self.assertEqual(r.read(11), b'') + self.assertEqual(r.read(1), b'f') + self.assertEqual(r.read(2), b'oo') + self.assertEqual(r.read(), b'') + self.assertEqual(r.read(), b'') + + + def test_body_blob(self): + resp = HTTPResponse(b'foo') + self.assertEqual(resp.data, b'foo') + self.assertTrue(resp.closed) + + def test_io(self): + import socket + + fp = BytesIO(b'foo') + resp = HTTPResponse(fp, preload_content=False) + + self.assertEqual(resp.closed, False) + self.assertEqual(resp.readable(), True) + self.assertEqual(resp.writable(), False) + self.assertRaises(IOError, resp.fileno) + + resp.close() + self.assertEqual(resp.closed, True) + + # Try closing with an `httplib.HTTPResponse`, because it has an + # `isclosed` method. + hlr = httplib.HTTPResponse(socket.socket()) + resp2 = HTTPResponse(hlr, preload_content=False) + self.assertEqual(resp2.closed, False) + resp2.close() + self.assertEqual(resp2.closed, True) + + #also try when only data is present. + resp3 = HTTPResponse('foodata') + self.assertRaises(IOError, resp3.fileno) + + resp3._fp = 2 + # A corner case where _fp is present but doesn't have `closed`, + # `isclosed`, or `fileno`. Unlikely, but possible. + self.assertEqual(resp3.closed, True) + self.assertRaises(IOError, resp3.fileno) + + def test_io_bufferedreader(self): + fp = BytesIO(b'foo') + resp = HTTPResponse(fp, preload_content=False) + br = BufferedReader(resp) + + self.assertEqual(br.read(), b'foo') + + br.close() + self.assertEqual(resp.closed, True) + + b = b'fooandahalf' + fp = BytesIO(b) + resp = HTTPResponse(fp, preload_content=False) + br = BufferedReader(resp, 5) + + br.read(1) # sets up the buffer, reading 5 + self.assertEqual(len(fp.read()), len(b) - 5) + + # This is necessary to make sure the "no bytes left" part of `readinto` + # gets tested. + while not br.closed: + br.read(5) + + def test_io_readinto(self): + # This test is necessary because in py2.6, `readinto` doesn't get called + # in `test_io_bufferedreader` like it does for all the other python + # versions. Probably this is because the `io` module in py2.6 is an + # old version that has a different underlying implementation. + + + fp = BytesIO(b'foo') + resp = HTTPResponse(fp, preload_content=False) + + barr = bytearray(3) + assert resp.readinto(barr) == 3 + assert b'foo' == barr + + # The reader should already be empty, so this should read nothing. + assert resp.readinto(barr) == 0 + assert b'foo' == barr + + def test_streaming(self): + fp = BytesIO(b'foo') + resp = HTTPResponse(fp, preload_content=False) + stream = resp.stream(2, decode_content=False) + + self.assertEqual(next(stream), b'fo') + self.assertEqual(next(stream), b'o') + self.assertRaises(StopIteration, next, stream) + + def test_streaming_tell(self): + fp = BytesIO(b'foo') + resp = HTTPResponse(fp, preload_content=False) + stream = resp.stream(2, decode_content=False) + + position = 0 + + position += len(next(stream)) + self.assertEqual(2, position) + self.assertEqual(position, resp.tell()) + + position += len(next(stream)) + self.assertEqual(3, position) + self.assertEqual(position, resp.tell()) + + self.assertRaises(StopIteration, next, stream) + + def test_gzipped_streaming(self): + import zlib + compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS) + data = compress.compress(b'foo') + data += compress.flush() + + fp = BytesIO(data) + resp = HTTPResponse(fp, headers={'content-encoding': 'gzip'}, + preload_content=False) + stream = resp.stream(2) + + self.assertEqual(next(stream), b'f') + self.assertEqual(next(stream), b'oo') + self.assertRaises(StopIteration, next, stream) + + def test_gzipped_streaming_tell(self): + import zlib + compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS) + uncompressed_data = b'foo' + data = compress.compress(uncompressed_data) + data += compress.flush() + + fp = BytesIO(data) + resp = HTTPResponse(fp, headers={'content-encoding': 'gzip'}, + preload_content=False) + stream = resp.stream() + + # Read everything + payload = next(stream) + self.assertEqual(payload, uncompressed_data) + + self.assertEqual(len(data), resp.tell()) + + self.assertRaises(StopIteration, next, stream) + + def test_deflate_streaming_tell_intermediate_point(self): + # Ensure that ``tell()`` returns the correct number of bytes when + # part-way through streaming compressed content. + import zlib + + NUMBER_OF_READS = 10 + + class MockCompressedDataReading(BytesIO): + """ + A ByteIO-like reader returning ``payload`` in ``NUMBER_OF_READS`` + calls to ``read``. + """ + + def __init__(self, payload, payload_part_size): + self.payloads = [ + payload[i*payload_part_size:(i+1)*payload_part_size] + for i in range(NUMBER_OF_READS+1)] + + assert b"".join(self.payloads) == payload + + def read(self, _): + # Amount is unused. + if len(self.payloads) > 0: + return self.payloads.pop(0) + return b"" + + uncompressed_data = zlib.decompress(ZLIB_PAYLOAD) + + payload_part_size = len(ZLIB_PAYLOAD) // NUMBER_OF_READS + fp = MockCompressedDataReading(ZLIB_PAYLOAD, payload_part_size) + resp = HTTPResponse(fp, headers={'content-encoding': 'deflate'}, + preload_content=False) + stream = resp.stream() + + parts_positions = [(part, resp.tell()) for part in stream] + end_of_stream = resp.tell() + + self.assertRaises(StopIteration, next, stream) + + parts, positions = zip(*parts_positions) + + # Check that the payload is equal to the uncompressed data + payload = b"".join(parts) + self.assertEqual(uncompressed_data, payload) + + # Check that the positions in the stream are correct + expected = [(i+1)*payload_part_size for i in range(NUMBER_OF_READS)] + self.assertEqual(expected, list(positions)) + + # Check that the end of the stream is in the correct place + self.assertEqual(len(ZLIB_PAYLOAD), end_of_stream) + + def test_deflate_streaming(self): + import zlib + data = zlib.compress(b'foo') + + fp = BytesIO(data) + resp = HTTPResponse(fp, headers={'content-encoding': 'deflate'}, + preload_content=False) + stream = resp.stream(2) + + self.assertEqual(next(stream), b'f') + self.assertEqual(next(stream), b'oo') + self.assertRaises(StopIteration, next, stream) + + def test_deflate2_streaming(self): + import zlib + compress = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS) + data = compress.compress(b'foo') + data += compress.flush() + + fp = BytesIO(data) + resp = HTTPResponse(fp, headers={'content-encoding': 'deflate'}, + preload_content=False) + stream = resp.stream(2) + + self.assertEqual(next(stream), b'f') + self.assertEqual(next(stream), b'oo') + self.assertRaises(StopIteration, next, stream) + + def test_empty_stream(self): + fp = BytesIO(b'') + resp = HTTPResponse(fp, preload_content=False) + stream = resp.stream(2, decode_content=False) + + self.assertRaises(StopIteration, next, stream) + + def test_mock_httpresponse_stream(self): + # Mock out a HTTP Request that does enough to make it through urllib3's + # read() and close() calls, and also exhausts and underlying file + # object. + class MockHTTPRequest(object): + self.fp = None + + def read(self, amt): + data = self.fp.read(amt) + if not data: + self.fp = None + + return data + + def close(self): + self.fp = None + + bio = BytesIO(b'foo') + fp = MockHTTPRequest() + fp.fp = bio + resp = HTTPResponse(fp, preload_content=False) + stream = resp.stream(2) + + self.assertEqual(next(stream), b'fo') + self.assertEqual(next(stream), b'o') + self.assertRaises(StopIteration, next, stream) + + def test_mock_transfer_encoding_chunked(self): + stream = [b"fo", b"o", b"bar"] + fp = MockChunkedEncodingResponse(stream) + r = httplib.HTTPResponse(MockSock) + r.fp = fp + resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'}) + + i = 0 + for c in resp.stream(): + self.assertEqual(c, stream[i]) + i += 1 + + def test_mock_gzipped_transfer_encoding_chunked_decoded(self): + """Show that we can decode the gizpped and chunked body.""" + def stream(): + # Set up a generator to chunk the gzipped body + import zlib + compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS) + data = compress.compress(b'foobar') + data += compress.flush() + for i in range(0, len(data), 2): + yield data[i:i+2] + + fp = MockChunkedEncodingResponse(list(stream())) + r = httplib.HTTPResponse(MockSock) + r.fp = fp + headers = {'transfer-encoding': 'chunked', 'content-encoding': 'gzip'} + resp = HTTPResponse(r, preload_content=False, headers=headers) + + data = b'' + for c in resp.stream(decode_content=True): + data += c + + self.assertEqual(b'foobar', data) + + def test_mock_transfer_encoding_chunked_custom_read(self): + stream = [b"foooo", b"bbbbaaaaar"] + fp = MockChunkedEncodingResponse(stream) + r = httplib.HTTPResponse(MockSock) + r.fp = fp + r.chunked = True + r.chunk_left = None + resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'}) + expected_response = [b'fo', b'oo', b'o', b'bb', b'bb', b'aa', b'aa', b'ar'] + response = list(resp.read_chunked(2)) + if getattr(self, "assertListEqual", False): + self.assertListEqual(expected_response, response) + else: + for index, item in enumerate(response): + v = expected_response[index] + self.assertEqual(item, v) + + def test_mock_transfer_encoding_chunked_unlmtd_read(self): + stream = [b"foooo", b"bbbbaaaaar"] + fp = MockChunkedEncodingResponse(stream) + r = httplib.HTTPResponse(MockSock) + r.fp = fp + r.chunked = True + r.chunk_left = None + resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'}) + if getattr(self, "assertListEqual", False): + self.assertListEqual(stream, list(resp.read_chunked())) + else: + for index, item in enumerate(resp.read_chunked()): + v = stream[index] + self.assertEqual(item, v) + + def test_read_not_chunked_response_as_chunks(self): + fp = BytesIO(b'foo') + resp = HTTPResponse(fp, preload_content=False) + r = resp.read_chunked() + self.assertRaises(ResponseNotChunked, next, r) + + def test_invalid_chunks(self): + stream = [b"foooo", b"bbbbaaaaar"] + fp = MockChunkedInvalidEncoding(stream) + r = httplib.HTTPResponse(MockSock) + r.fp = fp + r.chunked = True + r.chunk_left = None + resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'}) + self.assertRaises(ProtocolError, next, resp.read_chunked()) + + def test_chunked_response_without_crlf_on_end(self): + stream = [b"foo", b"bar", b"baz"] + fp = MockChunkedEncodingWithoutCRLFOnEnd(stream) + r = httplib.HTTPResponse(MockSock) + r.fp = fp + r.chunked = True + r.chunk_left = None + resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'}) + if getattr(self, "assertListEqual", False): + self.assertListEqual(stream, list(resp.stream())) + else: + for index, item in enumerate(resp.stream()): + v = stream[index] + self.assertEqual(item, v) + + def test_chunked_response_with_extensions(self): + stream = [b"foo", b"bar"] + fp = MockChunkedEncodingWithExtensions(stream) + r = httplib.HTTPResponse(MockSock) + r.fp = fp + r.chunked = True + r.chunk_left = None + resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'}) + if getattr(self, "assertListEqual", False): + self.assertListEqual(stream, list(resp.stream())) + else: + for index, item in enumerate(resp.stream()): + v = stream[index] + self.assertEqual(item, v) + + def test_get_case_insensitive_headers(self): + headers = {'host': 'example.com'} + r = HTTPResponse(headers=headers) + self.assertEqual(r.headers.get('host'), 'example.com') + self.assertEqual(r.headers.get('Host'), 'example.com') + + +class MockChunkedEncodingResponse(object): + + def __init__(self, content): + """ + content: collection of str, each str is a chunk in response + """ + self.content = content + self.index = 0 # This class iterates over self.content. + self.closed = False + self.cur_chunk = b'' + self.chunks_exhausted = False + + @staticmethod + def _encode_chunk(chunk): + # In the general case, we can't decode the chunk to unicode + length = '%X\r\n' % len(chunk) + return length.encode() + chunk + b'\r\n' + + def _pop_new_chunk(self): + if self.chunks_exhausted: + return b"" + try: + chunk = self.content[self.index] + except IndexError: + chunk = b'' + self.chunks_exhausted = True + else: + self.index += 1 + chunk = self._encode_chunk(chunk) + if not isinstance(chunk, bytes): + chunk = chunk.encode() + return chunk + + def pop_current_chunk(self, amt=-1, till_crlf=False): + if amt > 0 and till_crlf: + raise ValueError("Can't specify amt and till_crlf.") + if len(self.cur_chunk) <= 0: + self.cur_chunk = self._pop_new_chunk() + if till_crlf: + try: + i = self.cur_chunk.index(b"\r\n") + except ValueError: + # No CRLF in current chunk -- probably caused by encoder. + self.cur_chunk = b"" + return b"" + else: + chunk_part = self.cur_chunk[:i+2] + self.cur_chunk = self.cur_chunk[i+2:] + return chunk_part + elif amt <= -1: + chunk_part = self.cur_chunk + self.cur_chunk = b'' + return chunk_part + else: + try: + chunk_part = self.cur_chunk[:amt] + except IndexError: + chunk_part = self.cur_chunk + self.cur_chunk = b'' + else: + self.cur_chunk = self.cur_chunk[amt:] + return chunk_part + + def readline(self): + return self.pop_current_chunk(till_crlf=True) + + def read(self, amt=-1): + return self.pop_current_chunk(amt) + + def flush(self): + # Python 3 wants this method. + pass + + def close(self): + self.closed = True + + +class MockChunkedInvalidEncoding(MockChunkedEncodingResponse): + + def _encode_chunk(self, chunk): + return 'ZZZ\r\n%s\r\n' % chunk.decode() + + +class MockChunkedEncodingWithoutCRLFOnEnd(MockChunkedEncodingResponse): + + def _encode_chunk(self, chunk): + return '%X\r\n%s%s' % (len(chunk), chunk.decode(), + "\r\n" if len(chunk) > 0 else "") + + +class MockChunkedEncodingWithExtensions(MockChunkedEncodingResponse): + + def _encode_chunk(self, chunk): + return '%X;asd=qwe\r\n%s\r\n' % (len(chunk), chunk.decode()) + + +class MockSock(object): + @classmethod + def makefile(cls, *args, **kwargs): + return + + +if __name__ == '__main__': + unittest.main() |