aboutsummaryrefslogtreecommitdiff
path: root/test/test_response.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_response.py')
-rw-r--r--test/test_response.py113
1 files changed, 113 insertions, 0 deletions
diff --git a/test/test_response.py b/test/test_response.py
index 90d34eb..ecfcbee 100644
--- a/test/test_response.py
+++ b/test/test_response.py
@@ -5,6 +5,25 @@ from io import BytesIO, BufferedReader
from urllib3.response import HTTPResponse
from urllib3.exceptions import DecodeError
+
+from base64 import b64decode
+
+# A known random (i.e, not-too-compressible) payload generated with:
+# "".join(random.choice(string.printable) for i in xrange(512))
+# .encode("zlib").encode("base64")
+# Randomness in tests == bad, and fixing a seed may not be sufficient.
+ZLIB_PAYLOAD = b64decode(b"""\
+eJwFweuaoQAAANDfineQhiKLUiaiCzvuTEmNNlJGiL5QhnGpZ99z8luQfe1AHoMioB+QSWHQu/L+
+lzd7W5CipqYmeVTBjdgSATdg4l4Z2zhikbuF+EKn69Q0DTpdmNJz8S33odfJoVEexw/l2SS9nFdi
+pis7KOwXzfSqarSo9uJYgbDGrs1VNnQpT9f8zAorhYCEZronZQF9DuDFfNK3Hecc+WHLnZLQptwk
+nufw8S9I43sEwxsT71BiqedHo0QeIrFE01F/4atVFXuJs2yxIOak3bvtXjUKAA6OKnQJ/nNvDGKZ
+Khe5TF36JbnKVjdcL1EUNpwrWVfQpFYJ/WWm2b74qNeSZeQv5/xBhRdOmKTJFYgO96PwrHBlsnLn
+a3l0LwJsloWpMbzByU5WLbRE6X5INFqjQOtIwYz5BAlhkn+kVqJvWM5vBlfrwP42ifonM5yF4ciJ
+auHVks62997mNGOsM7WXNG3P98dBHPo2NhbTvHleL0BI5dus2JY81MUOnK3SGWLH8HeWPa1t5KcW
+S5moAj5HexY/g/F8TctpxwsvyZp38dXeLDjSQvEQIkF7XR3YXbeZgKk3V34KGCPOAeeuQDIgyVhV
+nP4HF2uWHA==""")
+
+
class TestLegacyResponse(unittest.TestCase):
def test_getheaders(self):
headers = {'host': 'example.com'}
@@ -167,6 +186,23 @@ class TestResponse(unittest.TestCase):
self.assertEqual(next(stream), b'o')
self.assertRaises(StopIteration, next, stream)
+ def test_streaming_tell(self):
+ fp = BytesIO(b'foo')
+ resp = HTTPResponse(fp, preload_content=False)
+ stream = resp.stream(2, decode_content=False)
+
+ position = 0
+
+ position += len(next(stream))
+ self.assertEqual(2, position)
+ self.assertEqual(position, resp.tell())
+
+ position += len(next(stream))
+ self.assertEqual(3, position)
+ self.assertEqual(position, resp.tell())
+
+ self.assertRaises(StopIteration, next, stream)
+
def test_gzipped_streaming(self):
import zlib
compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
@@ -182,6 +218,78 @@ class TestResponse(unittest.TestCase):
self.assertEqual(next(stream), b'oo')
self.assertRaises(StopIteration, next, stream)
+ def test_gzipped_streaming_tell(self):
+ import zlib
+ compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
+ uncompressed_data = b'foo'
+ data = compress.compress(uncompressed_data)
+ data += compress.flush()
+
+ fp = BytesIO(data)
+ resp = HTTPResponse(fp, headers={'content-encoding': 'gzip'},
+ preload_content=False)
+ stream = resp.stream()
+
+ # Read everything
+ payload = next(stream)
+ self.assertEqual(payload, uncompressed_data)
+
+ self.assertEqual(len(data), resp.tell())
+
+ self.assertRaises(StopIteration, next, stream)
+
+ def test_deflate_streaming_tell_intermediate_point(self):
+ # Ensure that ``tell()`` returns the correct number of bytes when
+ # part-way through streaming compressed content.
+ import zlib
+
+ NUMBER_OF_READS = 10
+
+ class MockCompressedDataReading(BytesIO):
+ """
+ A ByteIO-like reader returning ``payload`` in ``NUMBER_OF_READS``
+ calls to ``read``.
+ """
+
+ def __init__(self, payload, payload_part_size):
+ self.payloads = [
+ payload[i*payload_part_size:(i+1)*payload_part_size]
+ for i in range(NUMBER_OF_READS+1)]
+
+ assert b"".join(self.payloads) == payload
+
+ def read(self, _):
+ # Amount is unused.
+ if len(self.payloads) > 0:
+ return self.payloads.pop(0)
+ return b""
+
+ uncompressed_data = zlib.decompress(ZLIB_PAYLOAD)
+
+ payload_part_size = len(ZLIB_PAYLOAD) // NUMBER_OF_READS
+ fp = MockCompressedDataReading(ZLIB_PAYLOAD, payload_part_size)
+ resp = HTTPResponse(fp, headers={'content-encoding': 'deflate'},
+ preload_content=False)
+ stream = resp.stream()
+
+ parts_positions = [(part, resp.tell()) for part in stream]
+ end_of_stream = resp.tell()
+
+ self.assertRaises(StopIteration, next, stream)
+
+ parts, positions = zip(*parts_positions)
+
+ # Check that the payload is equal to the uncompressed data
+ payload = b"".join(parts)
+ self.assertEqual(uncompressed_data, payload)
+
+ # Check that the positions in the stream are correct
+ expected = [(i+1)*payload_part_size for i in range(NUMBER_OF_READS)]
+ self.assertEqual(expected, list(positions))
+
+ # Check that the end of the stream is in the correct place
+ self.assertEqual(len(ZLIB_PAYLOAD), end_of_stream)
+
def test_deflate_streaming(self):
import zlib
data = zlib.compress(b'foo')
@@ -244,6 +352,11 @@ class TestResponse(unittest.TestCase):
self.assertEqual(next(stream), b'o')
self.assertRaises(StopIteration, next, stream)
+ def test_get_case_insensitive_headers(self):
+ headers = {'host': 'example.com'}
+ r = HTTPResponse(headers=headers)
+ self.assertEqual(r.headers.get('host'), 'example.com')
+ self.assertEqual(r.headers.get('Host'), 'example.com')
if __name__ == '__main__':
unittest.main()