aboutsummaryrefslogtreecommitdiff
path: root/test/test_response.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_response.py')
-rw-r--r--test/test_response.py246
1 files changed, 240 insertions, 6 deletions
diff --git a/test/test_response.py b/test/test_response.py
index 7d67c93..2e2be0e 100644
--- a/test/test_response.py
+++ b/test/test_response.py
@@ -2,8 +2,12 @@ import unittest
from io import BytesIO, BufferedReader
+try:
+ import http.client as httplib
+except ImportError:
+ import httplib
from urllib3.response import HTTPResponse
-from urllib3.exceptions import DecodeError
+from urllib3.exceptions import DecodeError, ResponseNotChunked
from base64 import b64decode
@@ -73,6 +77,15 @@ class TestResponse(unittest.TestCase):
'content-encoding': 'deflate'
})
+ def test_reference_read(self):
+ fp = BytesIO(b'foo')
+ r = HTTPResponse(fp, preload_content=False)
+
+ self.assertEqual(r.read(1), b'f')
+ self.assertEqual(r.read(2), b'oo')
+ self.assertEqual(r.read(), b'')
+ self.assertEqual(r.read(), b'')
+
def test_decode_deflate(self):
import zlib
data = zlib.compress(b'foo')
@@ -102,6 +115,9 @@ class TestResponse(unittest.TestCase):
self.assertEqual(r.read(3), b'')
self.assertEqual(r.read(1), b'f')
self.assertEqual(r.read(2), b'oo')
+ self.assertEqual(r.read(), b'')
+ self.assertEqual(r.read(), b'')
+
def test_chunked_decoding_deflate2(self):
import zlib
@@ -116,6 +132,9 @@ class TestResponse(unittest.TestCase):
self.assertEqual(r.read(1), b'')
self.assertEqual(r.read(1), b'f')
self.assertEqual(r.read(2), b'oo')
+ self.assertEqual(r.read(), b'')
+ self.assertEqual(r.read(), b'')
+
def test_chunked_decoding_gzip(self):
import zlib
@@ -130,6 +149,9 @@ class TestResponse(unittest.TestCase):
self.assertEqual(r.read(11), b'')
self.assertEqual(r.read(1), b'f')
self.assertEqual(r.read(2), b'oo')
+ self.assertEqual(r.read(), b'')
+ self.assertEqual(r.read(), b'')
+
def test_body_blob(self):
resp = HTTPResponse(b'foo')
@@ -138,10 +160,6 @@ class TestResponse(unittest.TestCase):
def test_io(self):
import socket
- try:
- from http.client import HTTPResponse as OldHTTPResponse
- except:
- from httplib import HTTPResponse as OldHTTPResponse
fp = BytesIO(b'foo')
resp = HTTPResponse(fp, preload_content=False)
@@ -156,7 +174,7 @@ class TestResponse(unittest.TestCase):
# Try closing with an `httplib.HTTPResponse`, because it has an
# `isclosed` method.
- hlr = OldHTTPResponse(socket.socket())
+ hlr = httplib.HTTPResponse(socket.socket())
resp2 = HTTPResponse(hlr, preload_content=False)
self.assertEqual(resp2.closed, False)
resp2.close()
@@ -388,11 +406,227 @@ class TestResponse(unittest.TestCase):
self.assertEqual(next(stream), b'o')
self.assertRaises(StopIteration, next, stream)
+ def test_mock_transfer_encoding_chunked(self):
+ stream = [b"fo", b"o", b"bar"]
+ fp = MockChunkedEncodingResponse(stream)
+ r = httplib.HTTPResponse(MockSock)
+ r.fp = fp
+ resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'})
+
+ i = 0
+ for c in resp.stream():
+ self.assertEqual(c, stream[i])
+ i += 1
+
+ def test_mock_gzipped_transfer_encoding_chunked_decoded(self):
+ """Show that we can decode the gizpped and chunked body."""
+ def stream():
+ # Set up a generator to chunk the gzipped body
+ import zlib
+ compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
+ data = compress.compress(b'foobar')
+ data += compress.flush()
+ for i in range(0, len(data), 2):
+ yield data[i:i+2]
+
+ fp = MockChunkedEncodingResponse(list(stream()))
+ r = httplib.HTTPResponse(MockSock)
+ r.fp = fp
+ headers = {'transfer-encoding': 'chunked', 'content-encoding': 'gzip'}
+ resp = HTTPResponse(r, preload_content=False, headers=headers)
+
+ data = b''
+ for c in resp.stream(decode_content=True):
+ data += c
+
+ self.assertEqual(b'foobar', data)
+
+ def test_mock_transfer_encoding_chunked_custom_read(self):
+ stream = [b"foooo", b"bbbbaaaaar"]
+ fp = MockChunkedEncodingResponse(stream)
+ r = httplib.HTTPResponse(MockSock)
+ r.fp = fp
+ r.chunked = True
+ r.chunk_left = None
+ resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'})
+ expected_response = [b'fo', b'oo', b'o', b'bb', b'bb', b'aa', b'aa', b'ar']
+ response = list(resp.read_chunked(2))
+ if getattr(self, "assertListEqual", False):
+ self.assertListEqual(expected_response, response)
+ else:
+ for index, item in enumerate(response):
+ v = expected_response[index]
+ self.assertEqual(item, v)
+
+ def test_mock_transfer_encoding_chunked_unlmtd_read(self):
+ stream = [b"foooo", b"bbbbaaaaar"]
+ fp = MockChunkedEncodingResponse(stream)
+ r = httplib.HTTPResponse(MockSock)
+ r.fp = fp
+ r.chunked = True
+ r.chunk_left = None
+ resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'})
+ if getattr(self, "assertListEqual", False):
+ self.assertListEqual(stream, list(resp.read_chunked()))
+ else:
+ for index, item in enumerate(resp.read_chunked()):
+ v = stream[index]
+ self.assertEqual(item, v)
+
+ def test_read_not_chunked_response_as_chunks(self):
+ fp = BytesIO(b'foo')
+ resp = HTTPResponse(fp, preload_content=False)
+ r = resp.read_chunked()
+ self.assertRaises(ResponseNotChunked, next, r)
+
+ def test_invalid_chunks(self):
+ stream = [b"foooo", b"bbbbaaaaar"]
+ fp = MockChunkedInvalidEncoding(stream)
+ r = httplib.HTTPResponse(MockSock)
+ r.fp = fp
+ r.chunked = True
+ r.chunk_left = None
+ resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'})
+ self.assertRaises(httplib.IncompleteRead, next, resp.read_chunked())
+
+ def test_chunked_response_without_crlf_on_end(self):
+ stream = [b"foo", b"bar", b"baz"]
+ fp = MockChunkedEncodingWithoutCRLFOnEnd(stream)
+ r = httplib.HTTPResponse(MockSock)
+ r.fp = fp
+ r.chunked = True
+ r.chunk_left = None
+ resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'})
+ if getattr(self, "assertListEqual", False):
+ self.assertListEqual(stream, list(resp.stream()))
+ else:
+ for index, item in enumerate(resp.stream()):
+ v = stream[index]
+ self.assertEqual(item, v)
+
+ def test_chunked_response_with_extensions(self):
+ stream = [b"foo", b"bar"]
+ fp = MockChunkedEncodingWithExtensions(stream)
+ r = httplib.HTTPResponse(MockSock)
+ r.fp = fp
+ r.chunked = True
+ r.chunk_left = None
+ resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'})
+ if getattr(self, "assertListEqual", False):
+ self.assertListEqual(stream, list(resp.stream()))
+ else:
+ for index, item in enumerate(resp.stream()):
+ v = stream[index]
+ self.assertEqual(item, v)
+
def test_get_case_insensitive_headers(self):
headers = {'host': 'example.com'}
r = HTTPResponse(headers=headers)
self.assertEqual(r.headers.get('host'), 'example.com')
self.assertEqual(r.headers.get('Host'), 'example.com')
+
+class MockChunkedEncodingResponse(object):
+
+ def __init__(self, content):
+ """
+ content: collection of str, each str is a chunk in response
+ """
+ self.content = content
+ self.index = 0 # This class iterates over self.content.
+ self.closed = False
+ self.cur_chunk = b''
+ self.chunks_exhausted = False
+
+ @staticmethod
+ def _encode_chunk(chunk):
+ # In the general case, we can't decode the chunk to unicode
+ length = '%X\r\n' % len(chunk)
+ return length.encode() + chunk + b'\r\n'
+
+ def _pop_new_chunk(self):
+ if self.chunks_exhausted:
+ return b""
+ try:
+ chunk = self.content[self.index]
+ except IndexError:
+ chunk = b''
+ self.chunks_exhausted = True
+ else:
+ self.index += 1
+ chunk = self._encode_chunk(chunk)
+ if not isinstance(chunk, bytes):
+ chunk = chunk.encode()
+ return chunk
+
+ def pop_current_chunk(self, amt=-1, till_crlf=False):
+ if amt > 0 and till_crlf:
+ raise ValueError("Can't specify amt and till_crlf.")
+ if len(self.cur_chunk) <= 0:
+ self.cur_chunk = self._pop_new_chunk()
+ if till_crlf:
+ try:
+ i = self.cur_chunk.index(b"\r\n")
+ except ValueError:
+ # No CRLF in current chunk -- probably caused by encoder.
+ self.cur_chunk = b""
+ return b""
+ else:
+ chunk_part = self.cur_chunk[:i+2]
+ self.cur_chunk = self.cur_chunk[i+2:]
+ return chunk_part
+ elif amt <= -1:
+ chunk_part = self.cur_chunk
+ self.cur_chunk = b''
+ return chunk_part
+ else:
+ try:
+ chunk_part = self.cur_chunk[:amt]
+ except IndexError:
+ chunk_part = self.cur_chunk
+ self.cur_chunk = b''
+ else:
+ self.cur_chunk = self.cur_chunk[amt:]
+ return chunk_part
+
+ def readline(self):
+ return self.pop_current_chunk(till_crlf=True)
+
+ def read(self, amt=-1):
+ return self.pop_current_chunk(amt)
+
+ def flush(self):
+ # Python 3 wants this method.
+ pass
+
+ def close(self):
+ self.closed = True
+
+
+class MockChunkedInvalidEncoding(MockChunkedEncodingResponse):
+
+ def _encode_chunk(self, chunk):
+ return 'ZZZ\r\n%s\r\n' % chunk.decode()
+
+
+class MockChunkedEncodingWithoutCRLFOnEnd(MockChunkedEncodingResponse):
+
+ def _encode_chunk(self, chunk):
+ return '%X\r\n%s%s' % (len(chunk), chunk.decode(),
+ "\r\n" if len(chunk) > 0 else "")
+
+
+class MockChunkedEncodingWithExtensions(MockChunkedEncodingResponse):
+
+ def _encode_chunk(self, chunk):
+ return '%X;asd=qwe\r\n%s\r\n' % (len(chunk), chunk.decode())
+
+
+class MockSock(object):
+ @classmethod
+ def makefile(cls, *args, **kwargs):
+ return
+
+
if __name__ == '__main__':
unittest.main()