aboutsummaryrefslogtreecommitdiff
path: root/test/with_dummyserver/test_connectionpool.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/with_dummyserver/test_connectionpool.py')
-rw-r--r--test/with_dummyserver/test_connectionpool.py43
1 files changed, 35 insertions, 8 deletions
diff --git a/test/with_dummyserver/test_connectionpool.py b/test/with_dummyserver/test_connectionpool.py
index cc0f011..d6cb162 100644
--- a/test/with_dummyserver/test_connectionpool.py
+++ b/test/with_dummyserver/test_connectionpool.py
@@ -4,6 +4,7 @@ import socket
import sys
import unittest
import time
+import warnings
import mock
@@ -35,6 +36,7 @@ from urllib3.util.timeout import Timeout
import tornado
from dummyserver.testcase import HTTPDummyServerTestCase
+from dummyserver.server import NoIPv6Warning
from nose.tools import timed
@@ -597,7 +599,11 @@ class TestConnectionPool(HTTPDummyServerTestCase):
self.assertRaises(MaxRetryError, pool.request, 'GET', '/test', retries=2)
def test_source_address(self):
- for addr in VALID_SOURCE_ADDRESSES:
+ for addr, is_ipv6 in VALID_SOURCE_ADDRESSES:
+ if is_ipv6 and not socket.has_ipv6:
+ warnings.warn("No IPv6 support: skipping.",
+ NoIPv6Warning)
+ continue
pool = HTTPConnectionPool(self.host, self.port,
source_address=addr, retries=False)
r = pool.request('GET', '/source_address')
@@ -612,13 +618,34 @@ class TestConnectionPool(HTTPDummyServerTestCase):
self.assertRaises(ProtocolError,
pool.request, 'GET', '/source_address')
- @onlyPy3
- def test_httplib_headers_case_insensitive(self):
- HEADERS = {'Content-Length': '0', 'Content-type': 'text/plain',
- 'Server': 'TornadoServer/%s' % tornado.version}
- r = self.pool.request('GET', '/specific_method',
- fields={'method': 'GET'})
- self.assertEqual(HEADERS, dict(r.headers.items())) # to preserve case sensitivity
+ def test_stream_keepalive(self):
+ x = 2
+
+ for _ in range(x):
+ response = self.pool.request(
+ 'GET',
+ '/chunked',
+ headers={
+ 'Connection': 'keep-alive',
+ },
+ preload_content=False,
+ retries=False,
+ )
+ for chunk in response.stream():
+ self.assertEqual(chunk, b'123')
+
+ self.assertEqual(self.pool.num_connections, 1)
+ self.assertEqual(self.pool.num_requests, x)
+
+ def test_chunked_gzip(self):
+ response = self.pool.request(
+ 'GET',
+ '/chunked_gzip',
+ preload_content=False,
+ decode_content=True,
+ )
+
+ self.assertEqual(b'123' * 4, response.read())
class TestRetry(HTTPDummyServerTestCase):