diff options
Diffstat (limited to 'test/test_response.py')
-rw-r--r-- | test/test_response.py | 246 |
1 files changed, 240 insertions, 6 deletions
diff --git a/test/test_response.py b/test/test_response.py index 7d67c93..2e2be0e 100644 --- a/test/test_response.py +++ b/test/test_response.py @@ -2,8 +2,12 @@ import unittest from io import BytesIO, BufferedReader +try: + import http.client as httplib +except ImportError: + import httplib from urllib3.response import HTTPResponse -from urllib3.exceptions import DecodeError +from urllib3.exceptions import DecodeError, ResponseNotChunked from base64 import b64decode @@ -73,6 +77,15 @@ class TestResponse(unittest.TestCase): 'content-encoding': 'deflate' }) + def test_reference_read(self): + fp = BytesIO(b'foo') + r = HTTPResponse(fp, preload_content=False) + + self.assertEqual(r.read(1), b'f') + self.assertEqual(r.read(2), b'oo') + self.assertEqual(r.read(), b'') + self.assertEqual(r.read(), b'') + def test_decode_deflate(self): import zlib data = zlib.compress(b'foo') @@ -102,6 +115,9 @@ class TestResponse(unittest.TestCase): self.assertEqual(r.read(3), b'') self.assertEqual(r.read(1), b'f') self.assertEqual(r.read(2), b'oo') + self.assertEqual(r.read(), b'') + self.assertEqual(r.read(), b'') + def test_chunked_decoding_deflate2(self): import zlib @@ -116,6 +132,9 @@ class TestResponse(unittest.TestCase): self.assertEqual(r.read(1), b'') self.assertEqual(r.read(1), b'f') self.assertEqual(r.read(2), b'oo') + self.assertEqual(r.read(), b'') + self.assertEqual(r.read(), b'') + def test_chunked_decoding_gzip(self): import zlib @@ -130,6 +149,9 @@ class TestResponse(unittest.TestCase): self.assertEqual(r.read(11), b'') self.assertEqual(r.read(1), b'f') self.assertEqual(r.read(2), b'oo') + self.assertEqual(r.read(), b'') + self.assertEqual(r.read(), b'') + def test_body_blob(self): resp = HTTPResponse(b'foo') @@ -138,10 +160,6 @@ class TestResponse(unittest.TestCase): def test_io(self): import socket - try: - from http.client import HTTPResponse as OldHTTPResponse - except: - from httplib import HTTPResponse as OldHTTPResponse fp = BytesIO(b'foo') resp = HTTPResponse(fp, preload_content=False) @@ -156,7 +174,7 @@ class TestResponse(unittest.TestCase): # Try closing with an `httplib.HTTPResponse`, because it has an # `isclosed` method. - hlr = OldHTTPResponse(socket.socket()) + hlr = httplib.HTTPResponse(socket.socket()) resp2 = HTTPResponse(hlr, preload_content=False) self.assertEqual(resp2.closed, False) resp2.close() @@ -388,11 +406,227 @@ class TestResponse(unittest.TestCase): self.assertEqual(next(stream), b'o') self.assertRaises(StopIteration, next, stream) + def test_mock_transfer_encoding_chunked(self): + stream = [b"fo", b"o", b"bar"] + fp = MockChunkedEncodingResponse(stream) + r = httplib.HTTPResponse(MockSock) + r.fp = fp + resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'}) + + i = 0 + for c in resp.stream(): + self.assertEqual(c, stream[i]) + i += 1 + + def test_mock_gzipped_transfer_encoding_chunked_decoded(self): + """Show that we can decode the gizpped and chunked body.""" + def stream(): + # Set up a generator to chunk the gzipped body + import zlib + compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS) + data = compress.compress(b'foobar') + data += compress.flush() + for i in range(0, len(data), 2): + yield data[i:i+2] + + fp = MockChunkedEncodingResponse(list(stream())) + r = httplib.HTTPResponse(MockSock) + r.fp = fp + headers = {'transfer-encoding': 'chunked', 'content-encoding': 'gzip'} + resp = HTTPResponse(r, preload_content=False, headers=headers) + + data = b'' + for c in resp.stream(decode_content=True): + data += c + + self.assertEqual(b'foobar', data) + + def test_mock_transfer_encoding_chunked_custom_read(self): + stream = [b"foooo", b"bbbbaaaaar"] + fp = MockChunkedEncodingResponse(stream) + r = httplib.HTTPResponse(MockSock) + r.fp = fp + r.chunked = True + r.chunk_left = None + resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'}) + expected_response = [b'fo', b'oo', b'o', b'bb', b'bb', b'aa', b'aa', b'ar'] + response = list(resp.read_chunked(2)) + if getattr(self, "assertListEqual", False): + self.assertListEqual(expected_response, response) + else: + for index, item in enumerate(response): + v = expected_response[index] + self.assertEqual(item, v) + + def test_mock_transfer_encoding_chunked_unlmtd_read(self): + stream = [b"foooo", b"bbbbaaaaar"] + fp = MockChunkedEncodingResponse(stream) + r = httplib.HTTPResponse(MockSock) + r.fp = fp + r.chunked = True + r.chunk_left = None + resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'}) + if getattr(self, "assertListEqual", False): + self.assertListEqual(stream, list(resp.read_chunked())) + else: + for index, item in enumerate(resp.read_chunked()): + v = stream[index] + self.assertEqual(item, v) + + def test_read_not_chunked_response_as_chunks(self): + fp = BytesIO(b'foo') + resp = HTTPResponse(fp, preload_content=False) + r = resp.read_chunked() + self.assertRaises(ResponseNotChunked, next, r) + + def test_invalid_chunks(self): + stream = [b"foooo", b"bbbbaaaaar"] + fp = MockChunkedInvalidEncoding(stream) + r = httplib.HTTPResponse(MockSock) + r.fp = fp + r.chunked = True + r.chunk_left = None + resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'}) + self.assertRaises(httplib.IncompleteRead, next, resp.read_chunked()) + + def test_chunked_response_without_crlf_on_end(self): + stream = [b"foo", b"bar", b"baz"] + fp = MockChunkedEncodingWithoutCRLFOnEnd(stream) + r = httplib.HTTPResponse(MockSock) + r.fp = fp + r.chunked = True + r.chunk_left = None + resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'}) + if getattr(self, "assertListEqual", False): + self.assertListEqual(stream, list(resp.stream())) + else: + for index, item in enumerate(resp.stream()): + v = stream[index] + self.assertEqual(item, v) + + def test_chunked_response_with_extensions(self): + stream = [b"foo", b"bar"] + fp = MockChunkedEncodingWithExtensions(stream) + r = httplib.HTTPResponse(MockSock) + r.fp = fp + r.chunked = True + r.chunk_left = None + resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'}) + if getattr(self, "assertListEqual", False): + self.assertListEqual(stream, list(resp.stream())) + else: + for index, item in enumerate(resp.stream()): + v = stream[index] + self.assertEqual(item, v) + def test_get_case_insensitive_headers(self): headers = {'host': 'example.com'} r = HTTPResponse(headers=headers) self.assertEqual(r.headers.get('host'), 'example.com') self.assertEqual(r.headers.get('Host'), 'example.com') + +class MockChunkedEncodingResponse(object): + + def __init__(self, content): + """ + content: collection of str, each str is a chunk in response + """ + self.content = content + self.index = 0 # This class iterates over self.content. + self.closed = False + self.cur_chunk = b'' + self.chunks_exhausted = False + + @staticmethod + def _encode_chunk(chunk): + # In the general case, we can't decode the chunk to unicode + length = '%X\r\n' % len(chunk) + return length.encode() + chunk + b'\r\n' + + def _pop_new_chunk(self): + if self.chunks_exhausted: + return b"" + try: + chunk = self.content[self.index] + except IndexError: + chunk = b'' + self.chunks_exhausted = True + else: + self.index += 1 + chunk = self._encode_chunk(chunk) + if not isinstance(chunk, bytes): + chunk = chunk.encode() + return chunk + + def pop_current_chunk(self, amt=-1, till_crlf=False): + if amt > 0 and till_crlf: + raise ValueError("Can't specify amt and till_crlf.") + if len(self.cur_chunk) <= 0: + self.cur_chunk = self._pop_new_chunk() + if till_crlf: + try: + i = self.cur_chunk.index(b"\r\n") + except ValueError: + # No CRLF in current chunk -- probably caused by encoder. + self.cur_chunk = b"" + return b"" + else: + chunk_part = self.cur_chunk[:i+2] + self.cur_chunk = self.cur_chunk[i+2:] + return chunk_part + elif amt <= -1: + chunk_part = self.cur_chunk + self.cur_chunk = b'' + return chunk_part + else: + try: + chunk_part = self.cur_chunk[:amt] + except IndexError: + chunk_part = self.cur_chunk + self.cur_chunk = b'' + else: + self.cur_chunk = self.cur_chunk[amt:] + return chunk_part + + def readline(self): + return self.pop_current_chunk(till_crlf=True) + + def read(self, amt=-1): + return self.pop_current_chunk(amt) + + def flush(self): + # Python 3 wants this method. + pass + + def close(self): + self.closed = True + + +class MockChunkedInvalidEncoding(MockChunkedEncodingResponse): + + def _encode_chunk(self, chunk): + return 'ZZZ\r\n%s\r\n' % chunk.decode() + + +class MockChunkedEncodingWithoutCRLFOnEnd(MockChunkedEncodingResponse): + + def _encode_chunk(self, chunk): + return '%X\r\n%s%s' % (len(chunk), chunk.decode(), + "\r\n" if len(chunk) > 0 else "") + + +class MockChunkedEncodingWithExtensions(MockChunkedEncodingResponse): + + def _encode_chunk(self, chunk): + return '%X;asd=qwe\r\n%s\r\n' % (len(chunk), chunk.decode()) + + +class MockSock(object): + @classmethod + def makefile(cls, *args, **kwargs): + return + + if __name__ == '__main__': unittest.main() |