aboutsummaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/__init__.py128
-rw-r--r--test/appengine/__init__.py71
-rw-r--r--test/appengine/app.yaml11
-rw-r--r--test/appengine/nose.cfg4
-rw-r--r--test/appengine/requirements.txt1
-rw-r--r--test/appengine/test_urlfetch.py49
-rw-r--r--test/benchmark.py77
-rw-r--r--test/contrib/__init__.py0
-rw-r--r--test/contrib/test_gae_manager.py185
-rw-r--r--test/contrib/test_pyopenssl.py23
-rw-r--r--test/port_helpers.py100
-rw-r--r--test/test_collections.py343
-rw-r--r--test/test_compatibility.py23
-rw-r--r--test/test_connectionpool.py239
-rw-r--r--test/test_exceptions.py54
-rw-r--r--test/test_fields.py49
-rw-r--r--test/test_filepost.py133
-rw-r--r--test/test_no_ssl.py89
-rw-r--r--test/test_poolmanager.py92
-rw-r--r--test/test_proxymanager.py47
-rw-r--r--test/test_response.py632
-rw-r--r--test/test_retry.py198
-rw-r--r--test/test_util.py406
-rw-r--r--test/with_dummyserver/__init__.py0
-rw-r--r--test/with_dummyserver/test_connectionpool.py760
-rw-r--r--test/with_dummyserver/test_https.py444
-rw-r--r--test/with_dummyserver/test_no_ssl.py29
-rw-r--r--test/with_dummyserver/test_poolmanager.py178
-rw-r--r--test/with_dummyserver/test_proxy_poolmanager.py325
-rw-r--r--test/with_dummyserver/test_socketlevel.py795
30 files changed, 5485 insertions, 0 deletions
diff --git a/test/__init__.py b/test/__init__.py
new file mode 100644
index 0000000..172493c
--- /dev/null
+++ b/test/__init__.py
@@ -0,0 +1,128 @@
+import warnings
+import sys
+import errno
+import functools
+import logging
+import socket
+
+from nose.plugins.skip import SkipTest
+
+from urllib3.exceptions import MaxRetryError, HTTPWarning
+from urllib3.packages import six
+
+# We need a host that will not immediately close the connection with a TCP
+# Reset. SO suggests this hostname
+TARPIT_HOST = '10.255.255.1'
+
+# (Arguments for socket, is it IPv6 address?)
+VALID_SOURCE_ADDRESSES = [(('::1', 0), True), (('127.0.0.1', 0), False)]
+# RFC 5737: 192.0.2.0/24 is for testing only.
+# RFC 3849: 2001:db8::/32 is for documentation only.
+INVALID_SOURCE_ADDRESSES = [('192.0.2.255', 0), ('2001:db8::1', 0)]
+
+
+def clear_warnings(cls=HTTPWarning):
+ new_filters = []
+ for f in warnings.filters:
+ if issubclass(f[2], cls):
+ continue
+ new_filters.append(f)
+ warnings.filters[:] = new_filters
+
+def setUp():
+ clear_warnings()
+ warnings.simplefilter('ignore', HTTPWarning)
+
+
+def onlyPy26OrOlder(test):
+ """Skips this test unless you are on Python2.6.x or earlier."""
+
+ @functools.wraps(test)
+ def wrapper(*args, **kwargs):
+ msg = "{name} only runs on Python2.6.x or older".format(name=test.__name__)
+ if sys.version_info >= (2, 7):
+ raise SkipTest(msg)
+ return test(*args, **kwargs)
+ return wrapper
+
+def onlyPy27OrNewer(test):
+ """Skips this test unless you are on Python 2.7.x or later."""
+
+ @functools.wraps(test)
+ def wrapper(*args, **kwargs):
+ msg = "{name} requires Python 2.7.x+ to run".format(name=test.__name__)
+ if sys.version_info < (2, 7):
+ raise SkipTest(msg)
+ return test(*args, **kwargs)
+ return wrapper
+
+def onlyPy3(test):
+ """Skips this test unless you are on Python3.x"""
+
+ @functools.wraps(test)
+ def wrapper(*args, **kwargs):
+ msg = "{name} requires Python3.x to run".format(name=test.__name__)
+ if not six.PY3:
+ raise SkipTest(msg)
+ return test(*args, **kwargs)
+ return wrapper
+
+def requires_network(test):
+ """Helps you skip tests that require the network"""
+
+ def _is_unreachable_err(err):
+ return getattr(err, 'errno', None) in (errno.ENETUNREACH,
+ errno.EHOSTUNREACH) # For OSX
+
+ @functools.wraps(test)
+ def wrapper(*args, **kwargs):
+ msg = "Can't run {name} because the network is unreachable".format(
+ name=test.__name__)
+ try:
+ return test(*args, **kwargs)
+ except socket.error as e:
+ # This test needs an initial network connection to attempt the
+ # connection to the TARPIT_HOST. This fails if you are in a place
+ # without an Internet connection, so we skip the test in that case.
+ if _is_unreachable_err(e):
+ raise SkipTest(msg)
+ raise
+ except MaxRetryError as e:
+ if _is_unreachable_err(e.reason):
+ raise SkipTest(msg)
+ raise
+ return wrapper
+
+
+class _ListHandler(logging.Handler):
+ def __init__(self):
+ super(_ListHandler, self).__init__()
+ self.records = []
+
+ def emit(self, record):
+ self.records.append(record)
+
+
+class LogRecorder(object):
+ def __init__(self, target=logging.root):
+ super(LogRecorder, self).__init__()
+ self._target = target
+ self._handler = _ListHandler()
+
+ @property
+ def records(self):
+ return self._handler.records
+
+ def install(self):
+ self._target.addHandler(self._handler)
+
+ def uninstall(self):
+ self._target.removeHandler(self._handler)
+
+ def __enter__(self):
+ self.install()
+ return self.records
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.uninstall()
+ return False
diff --git a/test/appengine/__init__.py b/test/appengine/__init__.py
new file mode 100644
index 0000000..917544d
--- /dev/null
+++ b/test/appengine/__init__.py
@@ -0,0 +1,71 @@
+import os
+import sys
+import unittest
+from nose.plugins.skip import SkipTest
+
+
+def activate_sandbox():
+ """
+ Enables parts of the GAE sandbox that are relevant.
+
+ Inserts the stub module import hook which causes the usage of appengine-specific
+ httplib, httplib2, socket, etc.
+ """
+ from google.appengine.tools.devappserver2.python import sandbox
+
+ for name in list(sys.modules):
+ if name in sandbox.dist27.MODULE_OVERRIDES:
+ del sys.modules[name]
+ sys.meta_path.insert(0, sandbox.StubModuleImportHook())
+ sys.path_importer_cache = {}
+
+
+def deactivate_sandbox():
+ from google.appengine.tools.devappserver2.python import sandbox
+
+ sys.meta_path = [
+ x for x in sys.meta_path if not isinstance(x, sandbox.StubModuleImportHook)]
+ sys.path_importer_cache = {}
+
+ # Delete any instances of sandboxed modules.
+ for name in list(sys.modules):
+ if name in sandbox.dist27.MODULE_OVERRIDES:
+ del sys.modules[name]
+
+
+class AppEngineSandboxTest(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+
+ if sys.version_info[:2] != (2, 7):
+ raise SkipTest("App Engine only tests on py2.7")
+
+ if 'APPLICATION_ID' not in os.environ:
+ raise SkipTest("NoseGAE plugin not used.")
+
+ try:
+ activate_sandbox()
+ except ImportError:
+ raise SkipTest("App Engine SDK not available.")
+
+ @classmethod
+ def tearDownClass(self):
+ try:
+ deactivate_sandbox()
+ except ImportError:
+ pass
+
+
+class MockResponse(object):
+ def __init__(self, content, status_code, content_was_truncated, final_url, headers):
+ import httplib
+ from StringIO import StringIO
+
+ self.content = content
+ self.status_code = status_code
+ self.content_was_truncated = content_was_truncated
+ self.final_url = final_url
+ self.header_msg = httplib.HTTPMessage(StringIO(''.join(
+ ["%s: %s\n" % (k, v) for k, v in headers.iteritems()] + ["\n"])))
+ self.headers = self.header_msg.items()
diff --git a/test/appengine/app.yaml b/test/appengine/app.yaml
new file mode 100644
index 0000000..907c57f
--- /dev/null
+++ b/test/appengine/app.yaml
@@ -0,0 +1,11 @@
+# dummy app.yaml for nosegae
+
+api_version: 1
+runtime: python27
+threadsafe: true
+
+handlers:
+- url: /
+ static_files: README.md
+ upload: README.md
+ mime_type: text/plain
diff --git a/test/appengine/nose.cfg b/test/appengine/nose.cfg
new file mode 100644
index 0000000..8d8b3f1
--- /dev/null
+++ b/test/appengine/nose.cfg
@@ -0,0 +1,4 @@
+[nosetests]
+cover-min-percentage=0
+with-gae=1
+gae-application=test/appengine/app.yaml
diff --git a/test/appengine/requirements.txt b/test/appengine/requirements.txt
new file mode 100644
index 0000000..b6d79e0
--- /dev/null
+++ b/test/appengine/requirements.txt
@@ -0,0 +1 @@
+NoseGAE==0.5.7
diff --git a/test/appengine/test_urlfetch.py b/test/appengine/test_urlfetch.py
new file mode 100644
index 0000000..3f72023
--- /dev/null
+++ b/test/appengine/test_urlfetch.py
@@ -0,0 +1,49 @@
+from . import AppEngineSandboxTest, MockResponse
+
+from mock import patch
+from nose.plugins.skip import SkipTest
+from ..test_no_ssl import TestWithoutSSL
+
+
+class TestHTTP(AppEngineSandboxTest, TestWithoutSSL):
+ nosegae_urlfetch = True
+
+ def test_urlfetch_called_with_http(self):
+ """
+ Check that URLFetch is used to fetch non-https resources
+ """
+ resp = MockResponse(
+ 'OK',
+ 200,
+ False,
+ 'http://www.google.com',
+ {'content-type': 'text/plain'})
+ with patch('google.appengine.api.urlfetch.fetch', return_value=resp) as fetchmock:
+ import urllib3
+ pool = urllib3.HTTPConnectionPool('www.google.com', '80')
+ r = pool.request('GET', '/')
+ self.assertEqual(r.status, 200, r.data)
+ self.assertEqual(fetchmock.call_count, 1)
+
+
+class TestHTTPS(AppEngineSandboxTest):
+ nosegae_urlfetch = True
+
+ def test_urlfetch_called_with_https(self):
+ """
+ Check that URLFetch is used when fetching https resources
+ """
+ raise SkipTest() # Skipped for now because it fails.
+ resp = MockResponse(
+ 'OK',
+ 200,
+ False,
+ 'https://www.google.com',
+ {'content-type': 'text/plain'})
+ with patch('google.appengine.api.urlfetch.fetch', return_value=resp) as fetchmock:
+ import urllib3
+ pool = urllib3.HTTPSConnectionPool('www.google.com', '443')
+ pool.ConnectionCls = urllib3.connection.UnverifiedHTTPSConnection
+ r = pool.request('GET', '/')
+ self.assertEqual(r.status, 200, r.data)
+ self.assertEqual(fetchmock.call_count, 1)
diff --git a/test/benchmark.py b/test/benchmark.py
new file mode 100644
index 0000000..242e72f
--- /dev/null
+++ b/test/benchmark.py
@@ -0,0 +1,77 @@
+#!/usr/bin/env python
+
+"""
+Really simple rudimentary benchmark to compare ConnectionPool versus standard
+urllib to demonstrate the usefulness of connection re-using.
+"""
+from __future__ import print_function
+
+import sys
+import time
+import urllib
+
+sys.path.append('../')
+import urllib3
+
+
+# URLs to download. Doesn't matter as long as they're from the same host, so we
+# can take advantage of connection re-using.
+TO_DOWNLOAD = [
+ 'http://code.google.com/apis/apps/',
+ 'http://code.google.com/apis/base/',
+ 'http://code.google.com/apis/blogger/',
+ 'http://code.google.com/apis/calendar/',
+ 'http://code.google.com/apis/codesearch/',
+ 'http://code.google.com/apis/contact/',
+ 'http://code.google.com/apis/books/',
+ 'http://code.google.com/apis/documents/',
+ 'http://code.google.com/apis/finance/',
+ 'http://code.google.com/apis/health/',
+ 'http://code.google.com/apis/notebook/',
+ 'http://code.google.com/apis/picasaweb/',
+ 'http://code.google.com/apis/spreadsheets/',
+ 'http://code.google.com/apis/webmastertools/',
+ 'http://code.google.com/apis/youtube/',
+]
+
+
+def urllib_get(url_list):
+ assert url_list
+ for url in url_list:
+ now = time.time()
+ r = urllib.urlopen(url)
+ elapsed = time.time() - now
+ print("Got in %0.3f: %s" % (elapsed, url))
+
+
+def pool_get(url_list):
+ assert url_list
+ pool = urllib3.PoolManager()
+ for url in url_list:
+ now = time.time()
+ r = pool.request('GET', url, assert_same_host=False)
+ elapsed = time.time() - now
+ print("Got in %0.3fs: %s" % (elapsed, url))
+
+
+if __name__ == '__main__':
+ print("Running pool_get ...")
+ now = time.time()
+ pool_get(TO_DOWNLOAD)
+ pool_elapsed = time.time() - now
+
+ print("Running urllib_get ...")
+ now = time.time()
+ urllib_get(TO_DOWNLOAD)
+ urllib_elapsed = time.time() - now
+
+ print("Completed pool_get in %0.3fs" % pool_elapsed)
+ print("Completed urllib_get in %0.3fs" % urllib_elapsed)
+
+
+"""
+Example results:
+
+Completed pool_get in 1.163s
+Completed urllib_get in 2.318s
+"""
diff --git a/test/contrib/__init__.py b/test/contrib/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/contrib/__init__.py
diff --git a/test/contrib/test_gae_manager.py b/test/contrib/test_gae_manager.py
new file mode 100644
index 0000000..aa909e9
--- /dev/null
+++ b/test/contrib/test_gae_manager.py
@@ -0,0 +1,185 @@
+import unittest
+
+from dummyserver.testcase import HTTPSDummyServerTestCase
+from nose.plugins.skip import SkipTest
+
+try:
+ from google.appengine.api import urlfetch
+ (urlfetch)
+except ImportError:
+ raise SkipTest("App Engine SDK not available.")
+
+from urllib3.contrib.appengine import AppEngineManager, AppEnginePlatformError
+from urllib3.exceptions import (
+ TimeoutError,
+ ProtocolError,
+ SSLError)
+from urllib3.util.url import Url
+from urllib3.util.retry import Retry
+
+from test.with_dummyserver.test_connectionpool import (
+ TestConnectionPool, TestRetry)
+
+
+# Prevent nose from running these test.
+TestConnectionPool.__test__ = False
+TestRetry.__test__ = False
+
+
+# This class is used so we can re-use the tests from the connection pool.
+# It proxies all requests to the manager.
+class MockPool(object):
+ def __init__(self, host, port, manager, scheme='http'):
+ self.host = host
+ self.port = port
+ self.manager = manager
+ self.scheme = scheme
+
+ def request(self, method, url, *args, **kwargs):
+ url = self._absolute_url(url)
+ return self.manager.request(method, url, *args, **kwargs)
+
+ def urlopen(self, method, url, *args, **kwargs):
+ url = self._absolute_url(url)
+ return self.manager.urlopen(method, url, *args, **kwargs)
+
+ def _absolute_url(self, path):
+ return Url(
+ scheme=self.scheme,
+ host=self.host,
+ port=self.port,
+ path=path).url
+
+
+# Note that this doesn't run in the sandbox, it only runs with the URLFetch
+# API stub enabled. There's no need to enable the sandbox as we know for a fact
+# that URLFetch is used by the connection manager.
+class TestGAEConnectionManager(TestConnectionPool):
+ __test__ = True
+
+ # Magic class variable that tells NoseGAE to enable the URLFetch stub.
+ nosegae_urlfetch = True
+
+ def setUp(self):
+ self.manager = AppEngineManager()
+ self.pool = MockPool(self.host, self.port, self.manager)
+
+ # Tests specific to AppEngineManager
+
+ def test_exceptions(self):
+ # DeadlineExceededError -> TimeoutError
+ self.assertRaises(
+ TimeoutError,
+ self.pool.request,
+ 'GET',
+ '/sleep?seconds=0.005',
+ timeout=0.001)
+
+ # InvalidURLError -> ProtocolError
+ self.assertRaises(
+ ProtocolError,
+ self.manager.request,
+ 'GET',
+ 'ftp://invalid/url')
+
+ # DownloadError -> ProtocolError
+ self.assertRaises(
+ ProtocolError,
+ self.manager.request,
+ 'GET',
+ 'http://0.0.0.0')
+
+ # ResponseTooLargeError -> AppEnginePlatformError
+ self.assertRaises(
+ AppEnginePlatformError,
+ self.pool.request,
+ 'GET',
+ '/nbytes?length=33554433') # One byte over 32 megabtyes.
+
+ # URLFetch reports the request too large error as a InvalidURLError,
+ # which maps to a AppEnginePlatformError.
+ body = b'1' * 10485761 # One byte over 10 megabytes.
+ self.assertRaises(
+ AppEnginePlatformError,
+ self.manager.request,
+ 'POST',
+ '/',
+ body=body)
+
+ # Re-used tests below this line.
+ # Subsumed tests
+ test_timeout_float = None # Covered by test_exceptions.
+
+ # Non-applicable tests
+ test_conn_closed = None
+ test_nagle = None
+ test_socket_options = None
+ test_disable_default_socket_options = None
+ test_defaults_are_applied = None
+ test_tunnel = None
+ test_keepalive = None
+ test_keepalive_close = None
+ test_connection_count = None
+ test_connection_count_bigpool = None
+ test_for_double_release = None
+ test_release_conn_parameter = None
+ test_stream_keepalive = None
+ test_cleanup_on_connection_error = None
+
+ # Tests that should likely be modified for appengine specific stuff
+ test_timeout = None
+ test_connect_timeout = None
+ test_connection_error_retries = None
+ test_total_timeout = None
+ test_none_total_applies_connect = None
+ test_timeout_success = None
+ test_source_address_error = None
+ test_bad_connect = None
+ test_partial_response = None
+ test_dns_error = None
+
+
+class TestGAEConnectionManagerWithSSL(HTTPSDummyServerTestCase):
+ nosegae_urlfetch = True
+
+ def setUp(self):
+ self.manager = AppEngineManager()
+ self.pool = MockPool(self.host, self.port, self.manager, 'https')
+
+ def test_exceptions(self):
+ # SSLCertificateError -> SSLError
+ # SSLError is raised with dummyserver because URLFetch doesn't allow
+ # self-signed certs.
+ self.assertRaises(
+ SSLError,
+ self.pool.request,
+ 'GET',
+ '/')
+
+
+class TestGAERetry(TestRetry):
+ __test__ = True
+
+ # Magic class variable that tells NoseGAE to enable the URLFetch stub.
+ nosegae_urlfetch = True
+
+ def setUp(self):
+ self.manager = AppEngineManager()
+ self.pool = MockPool(self.host, self.port, self.manager)
+
+ def test_default_method_whitelist_retried(self):
+ """ urllib3 should retry methods in the default method whitelist """
+ retry = Retry(total=1, status_forcelist=[418])
+ # Use HEAD instead of OPTIONS, as URLFetch doesn't support OPTIONS
+ resp = self.pool.request(
+ 'HEAD', '/successful_retry',
+ headers={'test-name': 'test_default_whitelist'},
+ retries=retry)
+ self.assertEqual(resp.status, 200)
+
+ #test_max_retry = None
+ #test_disabled_retry = None
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/contrib/test_pyopenssl.py b/test/contrib/test_pyopenssl.py
new file mode 100644
index 0000000..5d57527
--- /dev/null
+++ b/test/contrib/test_pyopenssl.py
@@ -0,0 +1,23 @@
+from nose.plugins.skip import SkipTest
+from urllib3.packages import six
+
+if six.PY3:
+ raise SkipTest('Testing of PyOpenSSL disabled on PY3')
+
+try:
+ from urllib3.contrib.pyopenssl import (inject_into_urllib3,
+ extract_from_urllib3)
+except ImportError as e:
+ raise SkipTest('Could not import PyOpenSSL: %r' % e)
+
+
+from ..with_dummyserver.test_https import TestHTTPS, TestHTTPS_TLSv1
+from ..with_dummyserver.test_socketlevel import TestSNI, TestSocketClosing
+
+
+def setup_module():
+ inject_into_urllib3()
+
+
+def teardown_module():
+ extract_from_urllib3()
diff --git a/test/port_helpers.py b/test/port_helpers.py
new file mode 100644
index 0000000..e818a9b
--- /dev/null
+++ b/test/port_helpers.py
@@ -0,0 +1,100 @@
+# These helpers are copied from test_support.py in the Python 2.7 standard
+# library test suite.
+
+import socket
+
+
+# Don't use "localhost", since resolving it uses the DNS under recent
+# Windows versions (see issue #18792).
+HOST = "127.0.0.1"
+HOSTv6 = "::1"
+
+def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
+ """Returns an unused port that should be suitable for binding. This is
+ achieved by creating a temporary socket with the same family and type as
+ the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
+ the specified host address (defaults to 0.0.0.0) with the port set to 0,
+ eliciting an unused ephemeral port from the OS. The temporary socket is
+ then closed and deleted, and the ephemeral port is returned.
+
+ Either this method or bind_port() should be used for any tests where a
+ server socket needs to be bound to a particular port for the duration of
+ the test. Which one to use depends on whether the calling code is creating
+ a python socket, or if an unused port needs to be provided in a constructor
+ or passed to an external program (i.e. the -accept argument to openssl's
+ s_server mode). Always prefer bind_port() over find_unused_port() where
+ possible. Hard coded ports should *NEVER* be used. As soon as a server
+ socket is bound to a hard coded port, the ability to run multiple instances
+ of the test simultaneously on the same host is compromised, which makes the
+ test a ticking time bomb in a buildbot environment. On Unix buildbots, this
+ may simply manifest as a failed test, which can be recovered from without
+ intervention in most cases, but on Windows, the entire python process can
+ completely and utterly wedge, requiring someone to log in to the buildbot
+ and manually kill the affected process.
+
+ (This is easy to reproduce on Windows, unfortunately, and can be traced to
+ the SO_REUSEADDR socket option having different semantics on Windows versus
+ Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
+ listen and then accept connections on identical host/ports. An EADDRINUSE
+ socket.error will be raised at some point (depending on the platform and
+ the order bind and listen were called on each socket).
+
+ However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
+ will ever be raised when attempting to bind two identical host/ports. When
+ accept() is called on each socket, the second caller's process will steal
+ the port from the first caller, leaving them both in an awkwardly wedged
+ state where they'll no longer respond to any signals or graceful kills, and
+ must be forcibly killed via OpenProcess()/TerminateProcess().
+
+ The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
+ instead of SO_REUSEADDR, which effectively affords the same semantics as
+ SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
+ Source world compared to Windows ones, this is a common mistake. A quick
+ look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
+ openssl.exe is called with the 's_server' option, for example. See
+ http://bugs.python.org/issue2550 for more info. The following site also
+ has a very thorough description about the implications of both REUSEADDR
+ and EXCLUSIVEADDRUSE on Windows:
+ http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
+
+ XXX: although this approach is a vast improvement on previous attempts to
+ elicit unused ports, it rests heavily on the assumption that the ephemeral
+ port returned to us by the OS won't immediately be dished back out to some
+ other process when we close and delete our temporary socket but before our
+ calling code has a chance to bind the returned port. We can deal with this
+ issue if/when we come across it."""
+ tempsock = socket.socket(family, socktype)
+ port = bind_port(tempsock)
+ tempsock.close()
+ del tempsock
+ return port
+
+def bind_port(sock, host=HOST):
+ """Bind the socket to a free port and return the port number. Relies on
+ ephemeral ports in order to ensure we are using an unbound port. This is
+ important as many tests may be running simultaneously, especially in a
+ buildbot environment. This method raises an exception if the sock.family
+ is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
+ or SO_REUSEPORT set on it. Tests should *never* set these socket options
+ for TCP/IP sockets. The only case for setting these options is testing
+ multicasting via multiple UDP sockets.
+
+ Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
+ on Windows), it will be set on the socket. This will prevent anyone else
+ from bind()'ing to our host/port for the duration of the test.
+ """
+ if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
+ if hasattr(socket, 'SO_REUSEADDR'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
+ raise ValueError("tests should never set the SO_REUSEADDR " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_REUSEPORT'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
+ raise ValueError("tests should never set the SO_REUSEPORT " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
+
+ sock.bind((host, 0))
+ port = sock.getsockname()[1]
+ return port
diff --git a/test/test_collections.py b/test/test_collections.py
new file mode 100644
index 0000000..9d72939
--- /dev/null
+++ b/test/test_collections.py
@@ -0,0 +1,343 @@
+import unittest
+
+from urllib3._collections import (
+ HTTPHeaderDict,
+ RecentlyUsedContainer as Container
+)
+from urllib3.packages import six
+xrange = six.moves.xrange
+
+from nose.plugins.skip import SkipTest
+
+
+class TestLRUContainer(unittest.TestCase):
+ def test_maxsize(self):
+ d = Container(5)
+
+ for i in xrange(5):
+ d[i] = str(i)
+
+ self.assertEqual(len(d), 5)
+
+ for i in xrange(5):
+ self.assertEqual(d[i], str(i))
+
+ d[i+1] = str(i+1)
+
+ self.assertEqual(len(d), 5)
+ self.assertFalse(0 in d)
+ self.assertTrue(i+1 in d)
+
+ def test_expire(self):
+ d = Container(5)
+
+ for i in xrange(5):
+ d[i] = str(i)
+
+ for i in xrange(5):
+ d.get(0)
+
+ # Add one more entry
+ d[5] = '5'
+
+ # Check state
+ self.assertEqual(list(d.keys()), [2, 3, 4, 0, 5])
+
+ def test_same_key(self):
+ d = Container(5)
+
+ for i in xrange(10):
+ d['foo'] = i
+
+ self.assertEqual(list(d.keys()), ['foo'])
+ self.assertEqual(len(d), 1)
+
+ def test_access_ordering(self):
+ d = Container(5)
+
+ for i in xrange(10):
+ d[i] = True
+
+ # Keys should be ordered by access time
+ self.assertEqual(list(d.keys()), [5, 6, 7, 8, 9])
+
+ new_order = [7,8,6,9,5]
+ for k in new_order:
+ d[k]
+
+ self.assertEqual(list(d.keys()), new_order)
+
+ def test_delete(self):
+ d = Container(5)
+
+ for i in xrange(5):
+ d[i] = True
+
+ del d[0]
+ self.assertFalse(0 in d)
+
+ d.pop(1)
+ self.assertFalse(1 in d)
+
+ d.pop(1, None)
+
+ def test_get(self):
+ d = Container(5)
+
+ for i in xrange(5):
+ d[i] = True
+
+ r = d.get(4)
+ self.assertEqual(r, True)
+
+ r = d.get(5)
+ self.assertEqual(r, None)
+
+ r = d.get(5, 42)
+ self.assertEqual(r, 42)
+
+ self.assertRaises(KeyError, lambda: d[5])
+
+ def test_disposal(self):
+ evicted_items = []
+
+ def dispose_func(arg):
+ # Save the evicted datum for inspection
+ evicted_items.append(arg)
+
+ d = Container(5, dispose_func=dispose_func)
+ for i in xrange(5):
+ d[i] = i
+ self.assertEqual(list(d.keys()), list(xrange(5)))
+ self.assertEqual(evicted_items, []) # Nothing disposed
+
+ d[5] = 5
+ self.assertEqual(list(d.keys()), list(xrange(1, 6)))
+ self.assertEqual(evicted_items, [0])
+
+ del d[1]
+ self.assertEqual(evicted_items, [0, 1])
+
+ d.clear()
+ self.assertEqual(evicted_items, [0, 1, 2, 3, 4, 5])
+
+ def test_iter(self):
+ d = Container()
+
+ self.assertRaises(NotImplementedError, d.__iter__)
+
+
+class NonMappingHeaderContainer(object):
+ def __init__(self, **kwargs):
+ self._data = {}
+ self._data.update(kwargs)
+
+ def keys(self):
+ return self._data.keys()
+
+ def __getitem__(self, key):
+ return self._data[key]
+
+
+class TestHTTPHeaderDict(unittest.TestCase):
+ def setUp(self):
+ self.d = HTTPHeaderDict(Cookie='foo')
+ self.d.add('cookie', 'bar')
+
+ def test_create_from_kwargs(self):
+ h = HTTPHeaderDict(ab=1, cd=2, ef=3, gh=4)
+ self.assertEqual(len(h), 4)
+ self.assertTrue('ab' in h)
+
+ def test_create_from_dict(self):
+ h = HTTPHeaderDict(dict(ab=1, cd=2, ef=3, gh=4))
+ self.assertEqual(len(h), 4)
+ self.assertTrue('ab' in h)
+
+ def test_create_from_iterator(self):
+ teststr = 'urllib3ontherocks'
+ h = HTTPHeaderDict((c, c*5) for c in teststr)
+ self.assertEqual(len(h), len(set(teststr)))
+
+ def test_create_from_list(self):
+ h = HTTPHeaderDict([('ab', 'A'), ('cd', 'B'), ('cookie', 'C'), ('cookie', 'D'), ('cookie', 'E')])
+ self.assertEqual(len(h), 3)
+ self.assertTrue('ab' in h)
+ clist = h.getlist('cookie')
+ self.assertEqual(len(clist), 3)
+ self.assertEqual(clist[0], 'C')
+ self.assertEqual(clist[-1], 'E')
+
+ def test_create_from_headerdict(self):
+ org = HTTPHeaderDict([('ab', 'A'), ('cd', 'B'), ('cookie', 'C'), ('cookie', 'D'), ('cookie', 'E')])
+ h = HTTPHeaderDict(org)
+ self.assertEqual(len(h), 3)
+ self.assertTrue('ab' in h)
+ clist = h.getlist('cookie')
+ self.assertEqual(len(clist), 3)
+ self.assertEqual(clist[0], 'C')
+ self.assertEqual(clist[-1], 'E')
+ self.assertFalse(h is org)
+ self.assertEqual(h, org)
+
+ def test_setitem(self):
+ self.d['Cookie'] = 'foo'
+ self.assertEqual(self.d['cookie'], 'foo')
+ self.d['cookie'] = 'with, comma'
+ self.assertEqual(self.d.getlist('cookie'), ['with, comma'])
+
+ def test_update(self):
+ self.d.update(dict(Cookie='foo'))
+ self.assertEqual(self.d['cookie'], 'foo')
+ self.d.update(dict(cookie='with, comma'))
+ self.assertEqual(self.d.getlist('cookie'), ['with, comma'])
+
+ def test_delitem(self):
+ del self.d['cookie']
+ self.assertFalse('cookie' in self.d)
+ self.assertFalse('COOKIE' in self.d)
+
+ def test_add_well_known_multiheader(self):
+ self.d.add('COOKIE', 'asdf')
+ self.assertEqual(self.d.getlist('cookie'), ['foo', 'bar', 'asdf'])
+ self.assertEqual(self.d['cookie'], 'foo, bar, asdf')
+
+ def test_add_comma_separated_multiheader(self):
+ self.d.add('bar', 'foo')
+ self.d.add('BAR', 'bar')
+ self.d.add('Bar', 'asdf')
+ self.assertEqual(self.d.getlist('bar'), ['foo', 'bar', 'asdf'])
+ self.assertEqual(self.d['bar'], 'foo, bar, asdf')
+
+ def test_extend_from_list(self):
+ self.d.extend([('set-cookie', '100'), ('set-cookie', '200'), ('set-cookie', '300')])
+ self.assertEqual(self.d['set-cookie'], '100, 200, 300')
+
+ def test_extend_from_dict(self):
+ self.d.extend(dict(cookie='asdf'), b='100')
+ self.assertEqual(self.d['cookie'], 'foo, bar, asdf')
+ self.assertEqual(self.d['b'], '100')
+ self.d.add('cookie', 'with, comma')
+ self.assertEqual(self.d.getlist('cookie'), ['foo', 'bar', 'asdf', 'with, comma'])
+
+ def test_extend_from_container(self):
+ h = NonMappingHeaderContainer(Cookie='foo', e='foofoo')
+ self.d.extend(h)
+ self.assertEqual(self.d['cookie'], 'foo, bar, foo')
+ self.assertEqual(self.d['e'], 'foofoo')
+ self.assertEqual(len(self.d), 2)
+
+ def test_extend_from_headerdict(self):
+ h = HTTPHeaderDict(Cookie='foo', e='foofoo')
+ self.d.extend(h)
+ self.assertEqual(self.d['cookie'], 'foo, bar, foo')
+ self.assertEqual(self.d['e'], 'foofoo')
+ self.assertEqual(len(self.d), 2)
+
+ def test_copy(self):
+ h = self.d.copy()
+ self.assertTrue(self.d is not h)
+ self.assertEqual(self.d, h)
+
+ def test_getlist(self):
+ self.assertEqual(self.d.getlist('cookie'), ['foo', 'bar'])
+ self.assertEqual(self.d.getlist('Cookie'), ['foo', 'bar'])
+ self.assertEqual(self.d.getlist('b'), [])
+ self.d.add('b', 'asdf')
+ self.assertEqual(self.d.getlist('b'), ['asdf'])
+
+ def test_getlist_after_copy(self):
+ self.assertEqual(self.d.getlist('cookie'), HTTPHeaderDict(self.d).getlist('cookie'))
+
+ def test_equal(self):
+ b = HTTPHeaderDict(cookie='foo, bar')
+ c = NonMappingHeaderContainer(cookie='foo, bar')
+ self.assertEqual(self.d, b)
+ self.assertEqual(self.d, c)
+ self.assertNotEqual(self.d, 2)
+
+ def test_not_equal(self):
+ b = HTTPHeaderDict(cookie='foo, bar')
+ c = NonMappingHeaderContainer(cookie='foo, bar')
+ self.assertFalse(self.d != b)
+ self.assertFalse(self.d != c)
+ self.assertNotEqual(self.d, 2)
+
+ def test_pop(self):
+ key = 'Cookie'
+ a = self.d[key]
+ b = self.d.pop(key)
+ self.assertEqual(a, b)
+ self.assertFalse(key in self.d)
+ self.assertRaises(KeyError, self.d.pop, key)
+ dummy = object()
+ self.assertTrue(dummy is self.d.pop(key, dummy))
+
+ def test_discard(self):
+ self.d.discard('cookie')
+ self.assertFalse('cookie' in self.d)
+ self.d.discard('cookie')
+
+ def test_len(self):
+ self.assertEqual(len(self.d), 1)
+ self.d.add('cookie', 'bla')
+ self.d.add('asdf', 'foo')
+ # len determined by unique fieldnames
+ self.assertEqual(len(self.d), 2)
+
+ def test_repr(self):
+ rep = "HTTPHeaderDict({'Cookie': 'foo, bar'})"
+ self.assertEqual(repr(self.d), rep)
+
+ def test_items(self):
+ items = self.d.items()
+ self.assertEqual(len(items), 2)
+ self.assertEqual(items[0][0], 'Cookie')
+ self.assertEqual(items[0][1], 'foo')
+ self.assertEqual(items[1][0], 'Cookie')
+ self.assertEqual(items[1][1], 'bar')
+
+ def test_dict_conversion(self):
+ # Also tested in connectionpool, needs to preserve case
+ hdict = {'Content-Length': '0', 'Content-type': 'text/plain', 'Server': 'TornadoServer/1.2.3'}
+ h = dict(HTTPHeaderDict(hdict).items())
+ self.assertEqual(hdict, h)
+ self.assertEqual(hdict, dict(HTTPHeaderDict(hdict)))
+
+ def test_string_enforcement(self):
+ # This currently throws AttributeError on key.lower(), should probably be something nicer
+ self.assertRaises(Exception, self.d.__setitem__, 3, 5)
+ self.assertRaises(Exception, self.d.add, 3, 4)
+ self.assertRaises(Exception, self.d.__delitem__, 3)
+ self.assertRaises(Exception, HTTPHeaderDict, {3: 3})
+
+ def test_from_httplib_py2(self):
+ if six.PY3:
+ raise SkipTest("python3 has a different internal header implementation")
+ msg = """
+Server: nginx
+Content-Type: text/html; charset=windows-1251
+Connection: keep-alive
+X-Some-Multiline: asdf
+ asdf
+ asdf
+Set-Cookie: bb_lastvisit=1348253375; expires=Sat, 21-Sep-2013 18:49:35 GMT; path=/
+Set-Cookie: bb_lastactivity=0; expires=Sat, 21-Sep-2013 18:49:35 GMT; path=/
+www-authenticate: asdf
+www-authenticate: bla
+
+"""
+ buffer = six.moves.StringIO(msg.lstrip().replace('\n', '\r\n'))
+ msg = six.moves.http_client.HTTPMessage(buffer)
+ d = HTTPHeaderDict.from_httplib(msg)
+ self.assertEqual(d['server'], 'nginx')
+ cookies = d.getlist('set-cookie')
+ self.assertEqual(len(cookies), 2)
+ self.assertTrue(cookies[0].startswith("bb_lastvisit"))
+ self.assertTrue(cookies[1].startswith("bb_lastactivity"))
+ self.assertEqual(d['x-some-multiline'].split(), ['asdf', 'asdf', 'asdf'])
+ self.assertEqual(d['www-authenticate'], 'asdf, bla')
+ self.assertEqual(d.getlist('www-authenticate'), ['asdf', 'bla'])
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_compatibility.py b/test/test_compatibility.py
new file mode 100644
index 0000000..05ee4de
--- /dev/null
+++ b/test/test_compatibility.py
@@ -0,0 +1,23 @@
+import unittest
+import warnings
+
+from urllib3.connection import HTTPConnection
+
+
+class TestVersionCompatibility(unittest.TestCase):
+ def test_connection_strict(self):
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter("always")
+
+ # strict=True is deprecated in Py33+
+ conn = HTTPConnection('localhost', 12345, strict=True)
+
+ if w:
+ self.fail('HTTPConnection raised warning on strict=True: %r' % w[0].message)
+
+ def test_connection_source_address(self):
+ try:
+ # source_address does not exist in Py26-
+ conn = HTTPConnection('localhost', 12345, source_address='127.0.0.1')
+ except TypeError as e:
+ self.fail('HTTPConnection raised TypeError on source_adddress: %r' % e)
diff --git a/test/test_connectionpool.py b/test/test_connectionpool.py
new file mode 100644
index 0000000..ee37913
--- /dev/null
+++ b/test/test_connectionpool.py
@@ -0,0 +1,239 @@
+import unittest
+
+from urllib3.connectionpool import (
+ connection_from_url,
+ HTTPConnection,
+ HTTPConnectionPool,
+)
+from urllib3.util.timeout import Timeout
+from urllib3.packages.ssl_match_hostname import CertificateError
+from urllib3.exceptions import (
+ ClosedPoolError,
+ EmptyPoolError,
+ HostChangedError,
+ LocationValueError,
+ MaxRetryError,
+ ProtocolError,
+ SSLError,
+)
+
+from socket import error as SocketError
+from ssl import SSLError as BaseSSLError
+
+try: # Python 3
+ from queue import Empty
+ from http.client import HTTPException
+except ImportError:
+ from Queue import Empty
+ from httplib import HTTPException
+
+
+class TestConnectionPool(unittest.TestCase):
+ """
+ Tests in this suite should exercise the ConnectionPool functionality
+ without actually making any network requests or connections.
+ """
+ def test_same_host(self):
+ same_host = [
+ ('http://google.com/', '/'),
+ ('http://google.com/', 'http://google.com/'),
+ ('http://google.com/', 'http://google.com'),
+ ('http://google.com/', 'http://google.com/abra/cadabra'),
+ ('http://google.com:42/', 'http://google.com:42/abracadabra'),
+ # Test comparison using default ports
+ ('http://google.com:80/', 'http://google.com/abracadabra'),
+ ('http://google.com/', 'http://google.com:80/abracadabra'),
+ ('https://google.com:443/', 'https://google.com/abracadabra'),
+ ('https://google.com/', 'https://google.com:443/abracadabra'),
+ ]
+
+ for a, b in same_host:
+ c = connection_from_url(a)
+ self.assertTrue(c.is_same_host(b), "%s =? %s" % (a, b))
+
+ not_same_host = [
+ ('https://google.com/', 'http://google.com/'),
+ ('http://google.com/', 'https://google.com/'),
+ ('http://yahoo.com/', 'http://google.com/'),
+ ('http://google.com:42', 'https://google.com/abracadabra'),
+ ('http://google.com', 'https://google.net/'),
+ # Test comparison with default ports
+ ('http://google.com:42', 'http://google.com'),
+ ('https://google.com:42', 'https://google.com'),
+ ('http://google.com:443', 'http://google.com'),
+ ('https://google.com:80', 'https://google.com'),
+ ('http://google.com:443', 'https://google.com'),
+ ('https://google.com:80', 'http://google.com'),
+ ('https://google.com:443', 'http://google.com'),
+ ('http://google.com:80', 'https://google.com'),
+ ]
+
+ for a, b in not_same_host:
+ c = connection_from_url(a)
+ self.assertFalse(c.is_same_host(b), "%s =? %s" % (a, b))
+ c = connection_from_url(b)
+ self.assertFalse(c.is_same_host(a), "%s =? %s" % (b, a))
+
+
+ def test_max_connections(self):
+ pool = HTTPConnectionPool(host='localhost', maxsize=1, block=True)
+
+ pool._get_conn(timeout=0.01)
+
+ try:
+ pool._get_conn(timeout=0.01)
+ self.fail("Managed to get a connection without EmptyPoolError")
+ except EmptyPoolError:
+ pass
+
+ try:
+ pool.request('GET', '/', pool_timeout=0.01)
+ self.fail("Managed to get a connection without EmptyPoolError")
+ except EmptyPoolError:
+ pass
+
+ self.assertEqual(pool.num_connections, 1)
+
+ def test_pool_edgecases(self):
+ pool = HTTPConnectionPool(host='localhost', maxsize=1, block=False)
+
+ conn1 = pool._get_conn()
+ conn2 = pool._get_conn() # New because block=False
+
+ pool._put_conn(conn1)
+ pool._put_conn(conn2) # Should be discarded
+
+ self.assertEqual(conn1, pool._get_conn())
+ self.assertNotEqual(conn2, pool._get_conn())
+
+ self.assertEqual(pool.num_connections, 3)
+
+ def test_exception_str(self):
+ self.assertEqual(
+ str(EmptyPoolError(HTTPConnectionPool(host='localhost'), "Test.")),
+ "HTTPConnectionPool(host='localhost', port=None): Test.")
+
+ def test_retry_exception_str(self):
+ self.assertEqual(
+ str(MaxRetryError(
+ HTTPConnectionPool(host='localhost'), "Test.", None)),
+ "HTTPConnectionPool(host='localhost', port=None): "
+ "Max retries exceeded with url: Test. (Caused by None)")
+
+ err = SocketError("Test")
+
+ # using err.__class__ here, as socket.error is an alias for OSError
+ # since Py3.3 and gets printed as this
+ self.assertEqual(
+ str(MaxRetryError(
+ HTTPConnectionPool(host='localhost'), "Test.", err)),
+ "HTTPConnectionPool(host='localhost', port=None): "
+ "Max retries exceeded with url: Test. "
+ "(Caused by %r)" % err)
+
+
+ def test_pool_size(self):
+ POOL_SIZE = 1
+ pool = HTTPConnectionPool(host='localhost', maxsize=POOL_SIZE, block=True)
+
+ def _raise(ex):
+ raise ex()
+
+ def _test(exception, expect):
+ pool._make_request = lambda *args, **kwargs: _raise(exception)
+ self.assertRaises(expect, pool.request, 'GET', '/')
+
+ self.assertEqual(pool.pool.qsize(), POOL_SIZE)
+
+ # Make sure that all of the exceptions return the connection to the pool
+ _test(Empty, EmptyPoolError)
+ _test(BaseSSLError, SSLError)
+ _test(CertificateError, SSLError)
+
+ # The pool should never be empty, and with these two exceptions being raised,
+ # a retry will be triggered, but that retry will fail, eventually raising
+ # MaxRetryError, not EmptyPoolError
+ # See: https://github.com/shazow/urllib3/issues/76
+ pool._make_request = lambda *args, **kwargs: _raise(HTTPException)
+ self.assertRaises(MaxRetryError, pool.request,
+ 'GET', '/', retries=1, pool_timeout=0.01)
+ self.assertEqual(pool.pool.qsize(), POOL_SIZE)
+
+ def test_assert_same_host(self):
+ c = connection_from_url('http://google.com:80')
+
+ self.assertRaises(HostChangedError, c.request,
+ 'GET', 'http://yahoo.com:80', assert_same_host=True)
+
+ def test_pool_close(self):
+ pool = connection_from_url('http://google.com:80')
+
+ # Populate with some connections
+ conn1 = pool._get_conn()
+ conn2 = pool._get_conn()
+ conn3 = pool._get_conn()
+ pool._put_conn(conn1)
+ pool._put_conn(conn2)
+
+ old_pool_queue = pool.pool
+
+ pool.close()
+ self.assertEqual(pool.pool, None)
+
+ self.assertRaises(ClosedPoolError, pool._get_conn)
+
+ pool._put_conn(conn3)
+
+ self.assertRaises(ClosedPoolError, pool._get_conn)
+
+ self.assertRaises(Empty, old_pool_queue.get, block=False)
+
+ def test_pool_timeouts(self):
+ pool = HTTPConnectionPool(host='localhost')
+ conn = pool._new_conn()
+ self.assertEqual(conn.__class__, HTTPConnection)
+ self.assertEqual(pool.timeout.__class__, Timeout)
+ self.assertEqual(pool.timeout._read, Timeout.DEFAULT_TIMEOUT)
+ self.assertEqual(pool.timeout._connect, Timeout.DEFAULT_TIMEOUT)
+ self.assertEqual(pool.timeout.total, None)
+
+ pool = HTTPConnectionPool(host='localhost', timeout=3)
+ self.assertEqual(pool.timeout._read, 3)
+ self.assertEqual(pool.timeout._connect, 3)
+ self.assertEqual(pool.timeout.total, None)
+
+ def test_no_host(self):
+ self.assertRaises(LocationValueError, HTTPConnectionPool, None)
+
+ def test_contextmanager(self):
+ with connection_from_url('http://google.com:80') as pool:
+ # Populate with some connections
+ conn1 = pool._get_conn()
+ conn2 = pool._get_conn()
+ conn3 = pool._get_conn()
+ pool._put_conn(conn1)
+ pool._put_conn(conn2)
+
+ old_pool_queue = pool.pool
+
+ self.assertEqual(pool.pool, None)
+ self.assertRaises(ClosedPoolError, pool._get_conn)
+
+ pool._put_conn(conn3)
+ self.assertRaises(ClosedPoolError, pool._get_conn)
+ self.assertRaises(Empty, old_pool_queue.get, block=False)
+
+ def test_absolute_url(self):
+ c = connection_from_url('http://google.com:80')
+ self.assertEqual(
+ 'http://google.com:80/path?query=foo',
+ c._absolute_url('path?query=foo'))
+
+ def test_ca_certs_default_cert_required(self):
+ with connection_from_url('https://google.com:80', ca_certs='/etc/ssl/certs/custom.pem') as pool:
+ conn = pool._get_conn()
+ self.assertEqual(conn.cert_reqs, 'CERT_REQUIRED')
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_exceptions.py b/test/test_exceptions.py
new file mode 100644
index 0000000..b5bb93e
--- /dev/null
+++ b/test/test_exceptions.py
@@ -0,0 +1,54 @@
+import unittest
+import pickle
+
+from urllib3.exceptions import (HTTPError, MaxRetryError, LocationParseError,
+ ClosedPoolError, EmptyPoolError,
+ HostChangedError, ReadTimeoutError,
+ ConnectTimeoutError, HeaderParsingError)
+from urllib3.connectionpool import HTTPConnectionPool
+
+
+
+class TestPickle(unittest.TestCase):
+
+ def verify_pickling(self, item):
+ return pickle.loads(pickle.dumps(item))
+
+ def test_exceptions(self):
+ assert self.verify_pickling(HTTPError(None))
+ assert self.verify_pickling(MaxRetryError(None, None, None))
+ assert self.verify_pickling(LocationParseError(None))
+ assert self.verify_pickling(ConnectTimeoutError(None))
+
+ def test_exceptions_with_objects(self):
+ assert self.verify_pickling(
+ HTTPError('foo'))
+
+ assert self.verify_pickling(
+ HTTPError('foo', IOError('foo')))
+
+ assert self.verify_pickling(
+ MaxRetryError(HTTPConnectionPool('localhost'), '/', None))
+
+ assert self.verify_pickling(
+ LocationParseError('fake location'))
+
+ assert self.verify_pickling(
+ ClosedPoolError(HTTPConnectionPool('localhost'), None))
+
+ assert self.verify_pickling(
+ EmptyPoolError(HTTPConnectionPool('localhost'), None))
+
+ assert self.verify_pickling(
+ HostChangedError(HTTPConnectionPool('localhost'), '/', None))
+
+ assert self.verify_pickling(
+ ReadTimeoutError(HTTPConnectionPool('localhost'), '/', None))
+
+
+class TestFormat(unittest.TestCase):
+ def test_header_parsing_errors(self):
+ hpe = HeaderParsingError('defects', 'unparsed_data')
+
+ self.assertTrue('defects' in str(hpe))
+ self.assertTrue('unparsed_data' in str(hpe))
diff --git a/test/test_fields.py b/test/test_fields.py
new file mode 100644
index 0000000..cdec68b
--- /dev/null
+++ b/test/test_fields.py
@@ -0,0 +1,49 @@
+import unittest
+
+from urllib3.fields import guess_content_type, RequestField
+from urllib3.packages.six import u
+
+
+class TestRequestField(unittest.TestCase):
+
+ def test_guess_content_type(self):
+ self.assertTrue(guess_content_type('image.jpg') in
+ ['image/jpeg', 'image/pjpeg'])
+ self.assertEqual(guess_content_type('notsure'),
+ 'application/octet-stream')
+ self.assertEqual(guess_content_type(None), 'application/octet-stream')
+
+ def test_create(self):
+ simple_field = RequestField('somename', 'data')
+ self.assertEqual(simple_field.render_headers(), '\r\n')
+ filename_field = RequestField('somename', 'data',
+ filename='somefile.txt')
+ self.assertEqual(filename_field.render_headers(), '\r\n')
+ headers_field = RequestField('somename', 'data',
+ headers={'Content-Length': 4})
+ self.assertEqual(
+ headers_field.render_headers(), 'Content-Length: 4\r\n\r\n')
+
+ def test_make_multipart(self):
+ field = RequestField('somename', 'data')
+ field.make_multipart(content_type='image/jpg',
+ content_location='/test')
+ self.assertEqual(
+ field.render_headers(),
+ 'Content-Disposition: form-data; name="somename"\r\n'
+ 'Content-Type: image/jpg\r\n'
+ 'Content-Location: /test\r\n'
+ '\r\n')
+
+ def test_render_parts(self):
+ field = RequestField('somename', 'data')
+ parts = field._render_parts({'name': 'value', 'filename': 'value'})
+ self.assertTrue('name="value"' in parts)
+ self.assertTrue('filename="value"' in parts)
+ parts = field._render_parts([('name', 'value'), ('filename', 'value')])
+ self.assertEqual(parts, 'name="value"; filename="value"')
+
+ def test_render_part(self):
+ field = RequestField('somename', 'data')
+ param = field._render_part('filename', u('n\u00e4me'))
+ self.assertEqual(param, "filename*=utf-8''n%C3%A4me")
diff --git a/test/test_filepost.py b/test/test_filepost.py
new file mode 100644
index 0000000..390dbb3
--- /dev/null
+++ b/test/test_filepost.py
@@ -0,0 +1,133 @@
+import unittest
+
+from urllib3.filepost import encode_multipart_formdata, iter_fields
+from urllib3.fields import RequestField
+from urllib3.packages.six import b, u
+
+
+BOUNDARY = '!! test boundary !!'
+
+
+class TestIterfields(unittest.TestCase):
+
+ def test_dict(self):
+ for fieldname, value in iter_fields(dict(a='b')):
+ self.assertEqual((fieldname, value), ('a', 'b'))
+
+ self.assertEqual(
+ list(sorted(iter_fields(dict(a='b', c='d')))),
+ [('a', 'b'), ('c', 'd')])
+
+ def test_tuple_list(self):
+ for fieldname, value in iter_fields([('a', 'b')]):
+ self.assertEqual((fieldname, value), ('a', 'b'))
+
+ self.assertEqual(
+ list(iter_fields([('a', 'b'), ('c', 'd')])),
+ [('a', 'b'), ('c', 'd')])
+
+
+class TestMultipartEncoding(unittest.TestCase):
+
+ def test_input_datastructures(self):
+ fieldsets = [
+ dict(k='v', k2='v2'),
+ [('k', 'v'), ('k2', 'v2')],
+ ]
+
+ for fields in fieldsets:
+ encoded, _ = encode_multipart_formdata(fields, boundary=BOUNDARY)
+ self.assertEqual(encoded.count(b(BOUNDARY)), 3)
+
+
+ def test_field_encoding(self):
+ fieldsets = [
+ [('k', 'v'), ('k2', 'v2')],
+ [('k', b'v'), (u('k2'), b'v2')],
+ [('k', b'v'), (u('k2'), 'v2')],
+ ]
+
+ for fields in fieldsets:
+ encoded, content_type = encode_multipart_formdata(fields, boundary=BOUNDARY)
+
+ self.assertEqual(encoded,
+ b'--' + b(BOUNDARY) + b'\r\n'
+ b'Content-Disposition: form-data; name="k"\r\n'
+ b'\r\n'
+ b'v\r\n'
+ b'--' + b(BOUNDARY) + b'\r\n'
+ b'Content-Disposition: form-data; name="k2"\r\n'
+ b'\r\n'
+ b'v2\r\n'
+ b'--' + b(BOUNDARY) + b'--\r\n'
+ , fields)
+
+ self.assertEqual(content_type,
+ 'multipart/form-data; boundary=' + str(BOUNDARY))
+
+
+ def test_filename(self):
+ fields = [('k', ('somename', b'v'))]
+
+ encoded, content_type = encode_multipart_formdata(fields, boundary=BOUNDARY)
+
+ self.assertEqual(encoded,
+ b'--' + b(BOUNDARY) + b'\r\n'
+ b'Content-Disposition: form-data; name="k"; filename="somename"\r\n'
+ b'Content-Type: application/octet-stream\r\n'
+ b'\r\n'
+ b'v\r\n'
+ b'--' + b(BOUNDARY) + b'--\r\n'
+ )
+
+ self.assertEqual(content_type,
+ 'multipart/form-data; boundary=' + str(BOUNDARY))
+
+
+ def test_textplain(self):
+ fields = [('k', ('somefile.txt', b'v'))]
+
+ encoded, content_type = encode_multipart_formdata(fields, boundary=BOUNDARY)
+
+ self.assertEqual(encoded,
+ b'--' + b(BOUNDARY) + b'\r\n'
+ b'Content-Disposition: form-data; name="k"; filename="somefile.txt"\r\n'
+ b'Content-Type: text/plain\r\n'
+ b'\r\n'
+ b'v\r\n'
+ b'--' + b(BOUNDARY) + b'--\r\n'
+ )
+
+ self.assertEqual(content_type,
+ 'multipart/form-data; boundary=' + str(BOUNDARY))
+
+
+ def test_explicit(self):
+ fields = [('k', ('somefile.txt', b'v', 'image/jpeg'))]
+
+ encoded, content_type = encode_multipart_formdata(fields, boundary=BOUNDARY)
+
+ self.assertEqual(encoded,
+ b'--' + b(BOUNDARY) + b'\r\n'
+ b'Content-Disposition: form-data; name="k"; filename="somefile.txt"\r\n'
+ b'Content-Type: image/jpeg\r\n'
+ b'\r\n'
+ b'v\r\n'
+ b'--' + b(BOUNDARY) + b'--\r\n'
+ )
+
+ self.assertEqual(content_type,
+ 'multipart/form-data; boundary=' + str(BOUNDARY))
+
+ def test_request_fields(self):
+ fields = [RequestField('k', b'v', filename='somefile.txt', headers={'Content-Type': 'image/jpeg'})]
+
+ encoded, content_type = encode_multipart_formdata(fields, boundary=BOUNDARY)
+
+ self.assertEqual(encoded,
+ b'--' + b(BOUNDARY) + b'\r\n'
+ b'Content-Type: image/jpeg\r\n'
+ b'\r\n'
+ b'v\r\n'
+ b'--' + b(BOUNDARY) + b'--\r\n'
+ )
diff --git a/test/test_no_ssl.py b/test/test_no_ssl.py
new file mode 100644
index 0000000..79058f7
--- /dev/null
+++ b/test/test_no_ssl.py
@@ -0,0 +1,89 @@
+"""
+Test what happens if Python was built without SSL
+
+* Everything that does not involve HTTPS should still work
+* HTTPS requests must fail with an error that points at the ssl module
+"""
+
+import sys
+import unittest
+
+
+class ImportBlocker(object):
+ """
+ Block Imports
+
+ To be placed on ``sys.meta_path``. This ensures that the modules
+ specified cannot be imported, even if they are a builtin.
+ """
+ def __init__(self, *namestoblock):
+ self.namestoblock = namestoblock
+
+ def find_module(self, fullname, path=None):
+ if fullname in self.namestoblock:
+ return self
+ return None
+
+ def load_module(self, fullname):
+ raise ImportError('import of {0} is blocked'.format(fullname))
+
+
+class ModuleStash(object):
+ """
+ Stashes away previously imported modules
+
+ If we reimport a module the data from coverage is lost, so we reuse the old
+ modules
+ """
+
+ def __init__(self, namespace, modules=sys.modules):
+ self.namespace = namespace
+ self.modules = modules
+ self._data = {}
+
+ def stash(self):
+ self._data[self.namespace] = self.modules.pop(self.namespace, None)
+
+ for module in list(self.modules.keys()):
+ if module.startswith(self.namespace + '.'):
+ self._data[module] = self.modules.pop(module)
+
+ def pop(self):
+ self.modules.pop(self.namespace, None)
+
+ for module in list(self.modules.keys()):
+ if module.startswith(self.namespace + '.'):
+ self.modules.pop(module)
+
+ self.modules.update(self._data)
+
+
+ssl_blocker = ImportBlocker('ssl', '_ssl')
+module_stash = ModuleStash('urllib3')
+
+
+class TestWithoutSSL(unittest.TestCase):
+ def setUp(self):
+ sys.modules.pop('ssl', None)
+ sys.modules.pop('_ssl', None)
+
+ module_stash.stash()
+ sys.meta_path.insert(0, ssl_blocker)
+
+ def tearDown(self):
+ sys.meta_path.remove(ssl_blocker)
+ module_stash.pop()
+
+
+class TestImportWithoutSSL(TestWithoutSSL):
+ def test_cannot_import_ssl(self):
+ # python26 has neither contextmanagers (for assertRaises) nor
+ # importlib.
+ # 'import' inside 'lambda' is invalid syntax.
+ def import_ssl():
+ import ssl
+
+ self.assertRaises(ImportError, import_ssl)
+
+ def test_import_urllib3(self):
+ import urllib3
diff --git a/test/test_poolmanager.py b/test/test_poolmanager.py
new file mode 100644
index 0000000..6195d51
--- /dev/null
+++ b/test/test_poolmanager.py
@@ -0,0 +1,92 @@
+import unittest
+
+from urllib3.poolmanager import PoolManager
+from urllib3 import connection_from_url
+from urllib3.exceptions import (
+ ClosedPoolError,
+ LocationValueError,
+)
+
+
+class TestPoolManager(unittest.TestCase):
+ def test_same_url(self):
+ # Convince ourselves that normally we don't get the same object
+ conn1 = connection_from_url('http://localhost:8081/foo')
+ conn2 = connection_from_url('http://localhost:8081/bar')
+
+ self.assertNotEqual(conn1, conn2)
+
+ # Now try again using the PoolManager
+ p = PoolManager(1)
+
+ conn1 = p.connection_from_url('http://localhost:8081/foo')
+ conn2 = p.connection_from_url('http://localhost:8081/bar')
+
+ self.assertEqual(conn1, conn2)
+
+ def test_many_urls(self):
+ urls = [
+ "http://localhost:8081/foo",
+ "http://www.google.com/mail",
+ "http://localhost:8081/bar",
+ "https://www.google.com/",
+ "https://www.google.com/mail",
+ "http://yahoo.com",
+ "http://bing.com",
+ "http://yahoo.com/",
+ ]
+
+ connections = set()
+
+ p = PoolManager(10)
+
+ for url in urls:
+ conn = p.connection_from_url(url)
+ connections.add(conn)
+
+ self.assertEqual(len(connections), 5)
+
+ def test_manager_clear(self):
+ p = PoolManager(5)
+
+ conn_pool = p.connection_from_url('http://google.com')
+ self.assertEqual(len(p.pools), 1)
+
+ conn = conn_pool._get_conn()
+
+ p.clear()
+ self.assertEqual(len(p.pools), 0)
+
+ self.assertRaises(ClosedPoolError, conn_pool._get_conn)
+
+ conn_pool._put_conn(conn)
+
+ self.assertRaises(ClosedPoolError, conn_pool._get_conn)
+
+ self.assertEqual(len(p.pools), 0)
+
+
+ def test_nohost(self):
+ p = PoolManager(5)
+ self.assertRaises(LocationValueError, p.connection_from_url, 'http://@')
+ self.assertRaises(LocationValueError, p.connection_from_url, None)
+
+ def test_contextmanager(self):
+ with PoolManager(1) as p:
+ conn_pool = p.connection_from_url('http://google.com')
+ self.assertEqual(len(p.pools), 1)
+ conn = conn_pool._get_conn()
+
+ self.assertEqual(len(p.pools), 0)
+
+ self.assertRaises(ClosedPoolError, conn_pool._get_conn)
+
+ conn_pool._put_conn(conn)
+
+ self.assertRaises(ClosedPoolError, conn_pool._get_conn)
+
+ self.assertEqual(len(p.pools), 0)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_proxymanager.py b/test/test_proxymanager.py
new file mode 100644
index 0000000..7b19334
--- /dev/null
+++ b/test/test_proxymanager.py
@@ -0,0 +1,47 @@
+import unittest
+
+from urllib3.poolmanager import ProxyManager
+
+
+class TestProxyManager(unittest.TestCase):
+ def test_proxy_headers(self):
+ p = ProxyManager('http://something:1234')
+ url = 'http://pypi.python.org/test'
+
+ # Verify default headers
+ default_headers = {'Accept': '*/*',
+ 'Host': 'pypi.python.org'}
+ headers = p._set_proxy_headers(url)
+
+ self.assertEqual(headers, default_headers)
+
+ # Verify default headers don't overwrite provided headers
+ provided_headers = {'Accept': 'application/json',
+ 'custom': 'header',
+ 'Host': 'test.python.org'}
+ headers = p._set_proxy_headers(url, provided_headers)
+
+ self.assertEqual(headers, provided_headers)
+
+ # Verify proxy with nonstandard port
+ provided_headers = {'Accept': 'application/json'}
+ expected_headers = provided_headers.copy()
+ expected_headers.update({'Host': 'pypi.python.org:8080'})
+ url_with_port = 'http://pypi.python.org:8080/test'
+ headers = p._set_proxy_headers(url_with_port, provided_headers)
+
+ self.assertEqual(headers, expected_headers)
+
+ def test_default_port(self):
+ p = ProxyManager('http://something')
+ self.assertEqual(p.proxy.port, 80)
+ p = ProxyManager('https://something')
+ self.assertEqual(p.proxy.port, 443)
+
+ def test_invalid_scheme(self):
+ self.assertRaises(AssertionError, ProxyManager, 'invalid://host/p')
+ self.assertRaises(ValueError, ProxyManager, 'invalid://host/p')
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_response.py b/test/test_response.py
new file mode 100644
index 0000000..47d0521
--- /dev/null
+++ b/test/test_response.py
@@ -0,0 +1,632 @@
+import unittest
+
+from io import BytesIO, BufferedReader
+
+try:
+ import http.client as httplib
+except ImportError:
+ import httplib
+from urllib3.response import HTTPResponse
+from urllib3.exceptions import DecodeError, ResponseNotChunked, ProtocolError
+
+
+from base64 import b64decode
+
+# A known random (i.e, not-too-compressible) payload generated with:
+# "".join(random.choice(string.printable) for i in xrange(512))
+# .encode("zlib").encode("base64")
+# Randomness in tests == bad, and fixing a seed may not be sufficient.
+ZLIB_PAYLOAD = b64decode(b"""\
+eJwFweuaoQAAANDfineQhiKLUiaiCzvuTEmNNlJGiL5QhnGpZ99z8luQfe1AHoMioB+QSWHQu/L+
+lzd7W5CipqYmeVTBjdgSATdg4l4Z2zhikbuF+EKn69Q0DTpdmNJz8S33odfJoVEexw/l2SS9nFdi
+pis7KOwXzfSqarSo9uJYgbDGrs1VNnQpT9f8zAorhYCEZronZQF9DuDFfNK3Hecc+WHLnZLQptwk
+nufw8S9I43sEwxsT71BiqedHo0QeIrFE01F/4atVFXuJs2yxIOak3bvtXjUKAA6OKnQJ/nNvDGKZ
+Khe5TF36JbnKVjdcL1EUNpwrWVfQpFYJ/WWm2b74qNeSZeQv5/xBhRdOmKTJFYgO96PwrHBlsnLn
+a3l0LwJsloWpMbzByU5WLbRE6X5INFqjQOtIwYz5BAlhkn+kVqJvWM5vBlfrwP42ifonM5yF4ciJ
+auHVks62997mNGOsM7WXNG3P98dBHPo2NhbTvHleL0BI5dus2JY81MUOnK3SGWLH8HeWPa1t5KcW
+S5moAj5HexY/g/F8TctpxwsvyZp38dXeLDjSQvEQIkF7XR3YXbeZgKk3V34KGCPOAeeuQDIgyVhV
+nP4HF2uWHA==""")
+
+
+class TestLegacyResponse(unittest.TestCase):
+ def test_getheaders(self):
+ headers = {'host': 'example.com'}
+ r = HTTPResponse(headers=headers)
+ self.assertEqual(r.getheaders(), headers)
+
+ def test_getheader(self):
+ headers = {'host': 'example.com'}
+ r = HTTPResponse(headers=headers)
+ self.assertEqual(r.getheader('host'), 'example.com')
+
+
+class TestResponse(unittest.TestCase):
+ def test_cache_content(self):
+ r = HTTPResponse('foo')
+ self.assertEqual(r.data, 'foo')
+ self.assertEqual(r._body, 'foo')
+
+ def test_default(self):
+ r = HTTPResponse()
+ self.assertEqual(r.data, None)
+
+ def test_none(self):
+ r = HTTPResponse(None)
+ self.assertEqual(r.data, None)
+
+ def test_preload(self):
+ fp = BytesIO(b'foo')
+
+ r = HTTPResponse(fp, preload_content=True)
+
+ self.assertEqual(fp.tell(), len(b'foo'))
+ self.assertEqual(r.data, b'foo')
+
+ def test_no_preload(self):
+ fp = BytesIO(b'foo')
+
+ r = HTTPResponse(fp, preload_content=False)
+
+ self.assertEqual(fp.tell(), 0)
+ self.assertEqual(r.data, b'foo')
+ self.assertEqual(fp.tell(), len(b'foo'))
+
+ def test_decode_bad_data(self):
+ fp = BytesIO(b'\x00' * 10)
+ self.assertRaises(DecodeError, HTTPResponse, fp, headers={
+ 'content-encoding': 'deflate'
+ })
+
+ def test_reference_read(self):
+ fp = BytesIO(b'foo')
+ r = HTTPResponse(fp, preload_content=False)
+
+ self.assertEqual(r.read(1), b'f')
+ self.assertEqual(r.read(2), b'oo')
+ self.assertEqual(r.read(), b'')
+ self.assertEqual(r.read(), b'')
+
+ def test_decode_deflate(self):
+ import zlib
+ data = zlib.compress(b'foo')
+
+ fp = BytesIO(data)
+ r = HTTPResponse(fp, headers={'content-encoding': 'deflate'})
+
+ self.assertEqual(r.data, b'foo')
+
+ def test_decode_deflate_case_insensitve(self):
+ import zlib
+ data = zlib.compress(b'foo')
+
+ fp = BytesIO(data)
+ r = HTTPResponse(fp, headers={'content-encoding': 'DeFlAtE'})
+
+ self.assertEqual(r.data, b'foo')
+
+ def test_chunked_decoding_deflate(self):
+ import zlib
+ data = zlib.compress(b'foo')
+
+ fp = BytesIO(data)
+ r = HTTPResponse(fp, headers={'content-encoding': 'deflate'},
+ preload_content=False)
+
+ self.assertEqual(r.read(3), b'')
+ self.assertEqual(r.read(1), b'f')
+ self.assertEqual(r.read(2), b'oo')
+ self.assertEqual(r.read(), b'')
+ self.assertEqual(r.read(), b'')
+
+
+ def test_chunked_decoding_deflate2(self):
+ import zlib
+ compress = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS)
+ data = compress.compress(b'foo')
+ data += compress.flush()
+
+ fp = BytesIO(data)
+ r = HTTPResponse(fp, headers={'content-encoding': 'deflate'},
+ preload_content=False)
+
+ self.assertEqual(r.read(1), b'')
+ self.assertEqual(r.read(1), b'f')
+ self.assertEqual(r.read(2), b'oo')
+ self.assertEqual(r.read(), b'')
+ self.assertEqual(r.read(), b'')
+
+
+ def test_chunked_decoding_gzip(self):
+ import zlib
+ compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
+ data = compress.compress(b'foo')
+ data += compress.flush()
+
+ fp = BytesIO(data)
+ r = HTTPResponse(fp, headers={'content-encoding': 'gzip'},
+ preload_content=False)
+
+ self.assertEqual(r.read(11), b'')
+ self.assertEqual(r.read(1), b'f')
+ self.assertEqual(r.read(2), b'oo')
+ self.assertEqual(r.read(), b'')
+ self.assertEqual(r.read(), b'')
+
+
+ def test_body_blob(self):
+ resp = HTTPResponse(b'foo')
+ self.assertEqual(resp.data, b'foo')
+ self.assertTrue(resp.closed)
+
+ def test_io(self):
+ import socket
+
+ fp = BytesIO(b'foo')
+ resp = HTTPResponse(fp, preload_content=False)
+
+ self.assertEqual(resp.closed, False)
+ self.assertEqual(resp.readable(), True)
+ self.assertEqual(resp.writable(), False)
+ self.assertRaises(IOError, resp.fileno)
+
+ resp.close()
+ self.assertEqual(resp.closed, True)
+
+ # Try closing with an `httplib.HTTPResponse`, because it has an
+ # `isclosed` method.
+ hlr = httplib.HTTPResponse(socket.socket())
+ resp2 = HTTPResponse(hlr, preload_content=False)
+ self.assertEqual(resp2.closed, False)
+ resp2.close()
+ self.assertEqual(resp2.closed, True)
+
+ #also try when only data is present.
+ resp3 = HTTPResponse('foodata')
+ self.assertRaises(IOError, resp3.fileno)
+
+ resp3._fp = 2
+ # A corner case where _fp is present but doesn't have `closed`,
+ # `isclosed`, or `fileno`. Unlikely, but possible.
+ self.assertEqual(resp3.closed, True)
+ self.assertRaises(IOError, resp3.fileno)
+
+ def test_io_bufferedreader(self):
+ fp = BytesIO(b'foo')
+ resp = HTTPResponse(fp, preload_content=False)
+ br = BufferedReader(resp)
+
+ self.assertEqual(br.read(), b'foo')
+
+ br.close()
+ self.assertEqual(resp.closed, True)
+
+ b = b'fooandahalf'
+ fp = BytesIO(b)
+ resp = HTTPResponse(fp, preload_content=False)
+ br = BufferedReader(resp, 5)
+
+ br.read(1) # sets up the buffer, reading 5
+ self.assertEqual(len(fp.read()), len(b) - 5)
+
+ # This is necessary to make sure the "no bytes left" part of `readinto`
+ # gets tested.
+ while not br.closed:
+ br.read(5)
+
+ def test_io_readinto(self):
+ # This test is necessary because in py2.6, `readinto` doesn't get called
+ # in `test_io_bufferedreader` like it does for all the other python
+ # versions. Probably this is because the `io` module in py2.6 is an
+ # old version that has a different underlying implementation.
+
+
+ fp = BytesIO(b'foo')
+ resp = HTTPResponse(fp, preload_content=False)
+
+ barr = bytearray(3)
+ assert resp.readinto(barr) == 3
+ assert b'foo' == barr
+
+ # The reader should already be empty, so this should read nothing.
+ assert resp.readinto(barr) == 0
+ assert b'foo' == barr
+
+ def test_streaming(self):
+ fp = BytesIO(b'foo')
+ resp = HTTPResponse(fp, preload_content=False)
+ stream = resp.stream(2, decode_content=False)
+
+ self.assertEqual(next(stream), b'fo')
+ self.assertEqual(next(stream), b'o')
+ self.assertRaises(StopIteration, next, stream)
+
+ def test_streaming_tell(self):
+ fp = BytesIO(b'foo')
+ resp = HTTPResponse(fp, preload_content=False)
+ stream = resp.stream(2, decode_content=False)
+
+ position = 0
+
+ position += len(next(stream))
+ self.assertEqual(2, position)
+ self.assertEqual(position, resp.tell())
+
+ position += len(next(stream))
+ self.assertEqual(3, position)
+ self.assertEqual(position, resp.tell())
+
+ self.assertRaises(StopIteration, next, stream)
+
+ def test_gzipped_streaming(self):
+ import zlib
+ compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
+ data = compress.compress(b'foo')
+ data += compress.flush()
+
+ fp = BytesIO(data)
+ resp = HTTPResponse(fp, headers={'content-encoding': 'gzip'},
+ preload_content=False)
+ stream = resp.stream(2)
+
+ self.assertEqual(next(stream), b'f')
+ self.assertEqual(next(stream), b'oo')
+ self.assertRaises(StopIteration, next, stream)
+
+ def test_gzipped_streaming_tell(self):
+ import zlib
+ compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
+ uncompressed_data = b'foo'
+ data = compress.compress(uncompressed_data)
+ data += compress.flush()
+
+ fp = BytesIO(data)
+ resp = HTTPResponse(fp, headers={'content-encoding': 'gzip'},
+ preload_content=False)
+ stream = resp.stream()
+
+ # Read everything
+ payload = next(stream)
+ self.assertEqual(payload, uncompressed_data)
+
+ self.assertEqual(len(data), resp.tell())
+
+ self.assertRaises(StopIteration, next, stream)
+
+ def test_deflate_streaming_tell_intermediate_point(self):
+ # Ensure that ``tell()`` returns the correct number of bytes when
+ # part-way through streaming compressed content.
+ import zlib
+
+ NUMBER_OF_READS = 10
+
+ class MockCompressedDataReading(BytesIO):
+ """
+ A ByteIO-like reader returning ``payload`` in ``NUMBER_OF_READS``
+ calls to ``read``.
+ """
+
+ def __init__(self, payload, payload_part_size):
+ self.payloads = [
+ payload[i*payload_part_size:(i+1)*payload_part_size]
+ for i in range(NUMBER_OF_READS+1)]
+
+ assert b"".join(self.payloads) == payload
+
+ def read(self, _):
+ # Amount is unused.
+ if len(self.payloads) > 0:
+ return self.payloads.pop(0)
+ return b""
+
+ uncompressed_data = zlib.decompress(ZLIB_PAYLOAD)
+
+ payload_part_size = len(ZLIB_PAYLOAD) // NUMBER_OF_READS
+ fp = MockCompressedDataReading(ZLIB_PAYLOAD, payload_part_size)
+ resp = HTTPResponse(fp, headers={'content-encoding': 'deflate'},
+ preload_content=False)
+ stream = resp.stream()
+
+ parts_positions = [(part, resp.tell()) for part in stream]
+ end_of_stream = resp.tell()
+
+ self.assertRaises(StopIteration, next, stream)
+
+ parts, positions = zip(*parts_positions)
+
+ # Check that the payload is equal to the uncompressed data
+ payload = b"".join(parts)
+ self.assertEqual(uncompressed_data, payload)
+
+ # Check that the positions in the stream are correct
+ expected = [(i+1)*payload_part_size for i in range(NUMBER_OF_READS)]
+ self.assertEqual(expected, list(positions))
+
+ # Check that the end of the stream is in the correct place
+ self.assertEqual(len(ZLIB_PAYLOAD), end_of_stream)
+
+ def test_deflate_streaming(self):
+ import zlib
+ data = zlib.compress(b'foo')
+
+ fp = BytesIO(data)
+ resp = HTTPResponse(fp, headers={'content-encoding': 'deflate'},
+ preload_content=False)
+ stream = resp.stream(2)
+
+ self.assertEqual(next(stream), b'f')
+ self.assertEqual(next(stream), b'oo')
+ self.assertRaises(StopIteration, next, stream)
+
+ def test_deflate2_streaming(self):
+ import zlib
+ compress = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS)
+ data = compress.compress(b'foo')
+ data += compress.flush()
+
+ fp = BytesIO(data)
+ resp = HTTPResponse(fp, headers={'content-encoding': 'deflate'},
+ preload_content=False)
+ stream = resp.stream(2)
+
+ self.assertEqual(next(stream), b'f')
+ self.assertEqual(next(stream), b'oo')
+ self.assertRaises(StopIteration, next, stream)
+
+ def test_empty_stream(self):
+ fp = BytesIO(b'')
+ resp = HTTPResponse(fp, preload_content=False)
+ stream = resp.stream(2, decode_content=False)
+
+ self.assertRaises(StopIteration, next, stream)
+
+ def test_mock_httpresponse_stream(self):
+ # Mock out a HTTP Request that does enough to make it through urllib3's
+ # read() and close() calls, and also exhausts and underlying file
+ # object.
+ class MockHTTPRequest(object):
+ self.fp = None
+
+ def read(self, amt):
+ data = self.fp.read(amt)
+ if not data:
+ self.fp = None
+
+ return data
+
+ def close(self):
+ self.fp = None
+
+ bio = BytesIO(b'foo')
+ fp = MockHTTPRequest()
+ fp.fp = bio
+ resp = HTTPResponse(fp, preload_content=False)
+ stream = resp.stream(2)
+
+ self.assertEqual(next(stream), b'fo')
+ self.assertEqual(next(stream), b'o')
+ self.assertRaises(StopIteration, next, stream)
+
+ def test_mock_transfer_encoding_chunked(self):
+ stream = [b"fo", b"o", b"bar"]
+ fp = MockChunkedEncodingResponse(stream)
+ r = httplib.HTTPResponse(MockSock)
+ r.fp = fp
+ resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'})
+
+ i = 0
+ for c in resp.stream():
+ self.assertEqual(c, stream[i])
+ i += 1
+
+ def test_mock_gzipped_transfer_encoding_chunked_decoded(self):
+ """Show that we can decode the gizpped and chunked body."""
+ def stream():
+ # Set up a generator to chunk the gzipped body
+ import zlib
+ compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
+ data = compress.compress(b'foobar')
+ data += compress.flush()
+ for i in range(0, len(data), 2):
+ yield data[i:i+2]
+
+ fp = MockChunkedEncodingResponse(list(stream()))
+ r = httplib.HTTPResponse(MockSock)
+ r.fp = fp
+ headers = {'transfer-encoding': 'chunked', 'content-encoding': 'gzip'}
+ resp = HTTPResponse(r, preload_content=False, headers=headers)
+
+ data = b''
+ for c in resp.stream(decode_content=True):
+ data += c
+
+ self.assertEqual(b'foobar', data)
+
+ def test_mock_transfer_encoding_chunked_custom_read(self):
+ stream = [b"foooo", b"bbbbaaaaar"]
+ fp = MockChunkedEncodingResponse(stream)
+ r = httplib.HTTPResponse(MockSock)
+ r.fp = fp
+ r.chunked = True
+ r.chunk_left = None
+ resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'})
+ expected_response = [b'fo', b'oo', b'o', b'bb', b'bb', b'aa', b'aa', b'ar']
+ response = list(resp.read_chunked(2))
+ if getattr(self, "assertListEqual", False):
+ self.assertListEqual(expected_response, response)
+ else:
+ for index, item in enumerate(response):
+ v = expected_response[index]
+ self.assertEqual(item, v)
+
+ def test_mock_transfer_encoding_chunked_unlmtd_read(self):
+ stream = [b"foooo", b"bbbbaaaaar"]
+ fp = MockChunkedEncodingResponse(stream)
+ r = httplib.HTTPResponse(MockSock)
+ r.fp = fp
+ r.chunked = True
+ r.chunk_left = None
+ resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'})
+ if getattr(self, "assertListEqual", False):
+ self.assertListEqual(stream, list(resp.read_chunked()))
+ else:
+ for index, item in enumerate(resp.read_chunked()):
+ v = stream[index]
+ self.assertEqual(item, v)
+
+ def test_read_not_chunked_response_as_chunks(self):
+ fp = BytesIO(b'foo')
+ resp = HTTPResponse(fp, preload_content=False)
+ r = resp.read_chunked()
+ self.assertRaises(ResponseNotChunked, next, r)
+
+ def test_invalid_chunks(self):
+ stream = [b"foooo", b"bbbbaaaaar"]
+ fp = MockChunkedInvalidEncoding(stream)
+ r = httplib.HTTPResponse(MockSock)
+ r.fp = fp
+ r.chunked = True
+ r.chunk_left = None
+ resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'})
+ self.assertRaises(ProtocolError, next, resp.read_chunked())
+
+ def test_chunked_response_without_crlf_on_end(self):
+ stream = [b"foo", b"bar", b"baz"]
+ fp = MockChunkedEncodingWithoutCRLFOnEnd(stream)
+ r = httplib.HTTPResponse(MockSock)
+ r.fp = fp
+ r.chunked = True
+ r.chunk_left = None
+ resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'})
+ if getattr(self, "assertListEqual", False):
+ self.assertListEqual(stream, list(resp.stream()))
+ else:
+ for index, item in enumerate(resp.stream()):
+ v = stream[index]
+ self.assertEqual(item, v)
+
+ def test_chunked_response_with_extensions(self):
+ stream = [b"foo", b"bar"]
+ fp = MockChunkedEncodingWithExtensions(stream)
+ r = httplib.HTTPResponse(MockSock)
+ r.fp = fp
+ r.chunked = True
+ r.chunk_left = None
+ resp = HTTPResponse(r, preload_content=False, headers={'transfer-encoding': 'chunked'})
+ if getattr(self, "assertListEqual", False):
+ self.assertListEqual(stream, list(resp.stream()))
+ else:
+ for index, item in enumerate(resp.stream()):
+ v = stream[index]
+ self.assertEqual(item, v)
+
+ def test_get_case_insensitive_headers(self):
+ headers = {'host': 'example.com'}
+ r = HTTPResponse(headers=headers)
+ self.assertEqual(r.headers.get('host'), 'example.com')
+ self.assertEqual(r.headers.get('Host'), 'example.com')
+
+
+class MockChunkedEncodingResponse(object):
+
+ def __init__(self, content):
+ """
+ content: collection of str, each str is a chunk in response
+ """
+ self.content = content
+ self.index = 0 # This class iterates over self.content.
+ self.closed = False
+ self.cur_chunk = b''
+ self.chunks_exhausted = False
+
+ @staticmethod
+ def _encode_chunk(chunk):
+ # In the general case, we can't decode the chunk to unicode
+ length = '%X\r\n' % len(chunk)
+ return length.encode() + chunk + b'\r\n'
+
+ def _pop_new_chunk(self):
+ if self.chunks_exhausted:
+ return b""
+ try:
+ chunk = self.content[self.index]
+ except IndexError:
+ chunk = b''
+ self.chunks_exhausted = True
+ else:
+ self.index += 1
+ chunk = self._encode_chunk(chunk)
+ if not isinstance(chunk, bytes):
+ chunk = chunk.encode()
+ return chunk
+
+ def pop_current_chunk(self, amt=-1, till_crlf=False):
+ if amt > 0 and till_crlf:
+ raise ValueError("Can't specify amt and till_crlf.")
+ if len(self.cur_chunk) <= 0:
+ self.cur_chunk = self._pop_new_chunk()
+ if till_crlf:
+ try:
+ i = self.cur_chunk.index(b"\r\n")
+ except ValueError:
+ # No CRLF in current chunk -- probably caused by encoder.
+ self.cur_chunk = b""
+ return b""
+ else:
+ chunk_part = self.cur_chunk[:i+2]
+ self.cur_chunk = self.cur_chunk[i+2:]
+ return chunk_part
+ elif amt <= -1:
+ chunk_part = self.cur_chunk
+ self.cur_chunk = b''
+ return chunk_part
+ else:
+ try:
+ chunk_part = self.cur_chunk[:amt]
+ except IndexError:
+ chunk_part = self.cur_chunk
+ self.cur_chunk = b''
+ else:
+ self.cur_chunk = self.cur_chunk[amt:]
+ return chunk_part
+
+ def readline(self):
+ return self.pop_current_chunk(till_crlf=True)
+
+ def read(self, amt=-1):
+ return self.pop_current_chunk(amt)
+
+ def flush(self):
+ # Python 3 wants this method.
+ pass
+
+ def close(self):
+ self.closed = True
+
+
+class MockChunkedInvalidEncoding(MockChunkedEncodingResponse):
+
+ def _encode_chunk(self, chunk):
+ return 'ZZZ\r\n%s\r\n' % chunk.decode()
+
+
+class MockChunkedEncodingWithoutCRLFOnEnd(MockChunkedEncodingResponse):
+
+ def _encode_chunk(self, chunk):
+ return '%X\r\n%s%s' % (len(chunk), chunk.decode(),
+ "\r\n" if len(chunk) > 0 else "")
+
+
+class MockChunkedEncodingWithExtensions(MockChunkedEncodingResponse):
+
+ def _encode_chunk(self, chunk):
+ return '%X;asd=qwe\r\n%s\r\n' % (len(chunk), chunk.decode())
+
+
+class MockSock(object):
+ @classmethod
+ def makefile(cls, *args, **kwargs):
+ return
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_retry.py b/test/test_retry.py
new file mode 100644
index 0000000..421e508
--- /dev/null
+++ b/test/test_retry.py
@@ -0,0 +1,198 @@
+import unittest
+
+from urllib3.response import HTTPResponse
+from urllib3.packages.six.moves import xrange
+from urllib3.util.retry import Retry
+from urllib3.exceptions import (
+ ConnectTimeoutError,
+ MaxRetryError,
+ ReadTimeoutError,
+ ResponseError,
+)
+
+
+class RetryTest(unittest.TestCase):
+
+ def test_string(self):
+ """ Retry string representation looks the way we expect """
+ retry = Retry()
+ self.assertEqual(str(retry), 'Retry(total=10, connect=None, read=None, redirect=None)')
+ for _ in range(3):
+ retry = retry.increment()
+ self.assertEqual(str(retry), 'Retry(total=7, connect=None, read=None, redirect=None)')
+
+ def test_retry_both_specified(self):
+ """Total can win if it's lower than the connect value"""
+ error = ConnectTimeoutError()
+ retry = Retry(connect=3, total=2)
+ retry = retry.increment(error=error)
+ retry = retry.increment(error=error)
+ try:
+ retry.increment(error=error)
+ self.fail("Failed to raise error.")
+ except MaxRetryError as e:
+ self.assertEqual(e.reason, error)
+
+ def test_retry_higher_total_loses(self):
+ """ A lower connect timeout than the total is honored """
+ error = ConnectTimeoutError()
+ retry = Retry(connect=2, total=3)
+ retry = retry.increment(error=error)
+ retry = retry.increment(error=error)
+ self.assertRaises(MaxRetryError, retry.increment, error=error)
+
+ def test_retry_higher_total_loses_vs_read(self):
+ """ A lower read timeout than the total is honored """
+ error = ReadTimeoutError(None, "/", "read timed out")
+ retry = Retry(read=2, total=3)
+ retry = retry.increment(error=error)
+ retry = retry.increment(error=error)
+ self.assertRaises(MaxRetryError, retry.increment, error=error)
+
+ def test_retry_total_none(self):
+ """ if Total is none, connect error should take precedence """
+ error = ConnectTimeoutError()
+ retry = Retry(connect=2, total=None)
+ retry = retry.increment(error=error)
+ retry = retry.increment(error=error)
+ try:
+ retry.increment(error=error)
+ self.fail("Failed to raise error.")
+ except MaxRetryError as e:
+ self.assertEqual(e.reason, error)
+
+ error = ReadTimeoutError(None, "/", "read timed out")
+ retry = Retry(connect=2, total=None)
+ retry = retry.increment(error=error)
+ retry = retry.increment(error=error)
+ retry = retry.increment(error=error)
+ self.assertFalse(retry.is_exhausted())
+
+ def test_retry_default(self):
+ """ If no value is specified, should retry connects 3 times """
+ retry = Retry()
+ self.assertEqual(retry.total, 10)
+ self.assertEqual(retry.connect, None)
+ self.assertEqual(retry.read, None)
+ self.assertEqual(retry.redirect, None)
+
+ error = ConnectTimeoutError()
+ retry = Retry(connect=1)
+ retry = retry.increment(error=error)
+ self.assertRaises(MaxRetryError, retry.increment, error=error)
+
+ retry = Retry(connect=1)
+ retry = retry.increment(error=error)
+ self.assertFalse(retry.is_exhausted())
+
+ self.assertTrue(Retry(0).raise_on_redirect)
+ self.assertFalse(Retry(False).raise_on_redirect)
+
+ def test_retry_read_zero(self):
+ """ No second chances on read timeouts, by default """
+ error = ReadTimeoutError(None, "/", "read timed out")
+ retry = Retry(read=0)
+ try:
+ retry.increment(error=error)
+ self.fail("Failed to raise error.")
+ except MaxRetryError as e:
+ self.assertEqual(e.reason, error)
+
+ def test_backoff(self):
+ """ Backoff is computed correctly """
+ max_backoff = Retry.BACKOFF_MAX
+
+ retry = Retry(total=100, backoff_factor=0.2)
+ self.assertEqual(retry.get_backoff_time(), 0) # First request
+
+ retry = retry.increment()
+ self.assertEqual(retry.get_backoff_time(), 0) # First retry
+
+ retry = retry.increment()
+ self.assertEqual(retry.backoff_factor, 0.2)
+ self.assertEqual(retry.total, 98)
+ self.assertEqual(retry.get_backoff_time(), 0.4) # Start backoff
+
+ retry = retry.increment()
+ self.assertEqual(retry.get_backoff_time(), 0.8)
+
+ retry = retry.increment()
+ self.assertEqual(retry.get_backoff_time(), 1.6)
+
+ for i in xrange(10):
+ retry = retry.increment()
+
+ self.assertEqual(retry.get_backoff_time(), max_backoff)
+
+ def test_zero_backoff(self):
+ retry = Retry()
+ self.assertEqual(retry.get_backoff_time(), 0)
+ retry = retry.increment()
+ retry = retry.increment()
+ self.assertEqual(retry.get_backoff_time(), 0)
+
+ def test_sleep(self):
+ # sleep a very small amount of time so our code coverage is happy
+ retry = Retry(backoff_factor=0.0001)
+ retry = retry.increment()
+ retry = retry.increment()
+ retry.sleep()
+
+ def test_status_forcelist(self):
+ retry = Retry(status_forcelist=xrange(500,600))
+ self.assertFalse(retry.is_forced_retry('GET', status_code=200))
+ self.assertFalse(retry.is_forced_retry('GET', status_code=400))
+ self.assertTrue(retry.is_forced_retry('GET', status_code=500))
+
+ retry = Retry(total=1, status_forcelist=[418])
+ self.assertFalse(retry.is_forced_retry('GET', status_code=400))
+ self.assertTrue(retry.is_forced_retry('GET', status_code=418))
+
+ def test_exhausted(self):
+ self.assertFalse(Retry(0).is_exhausted())
+ self.assertTrue(Retry(-1).is_exhausted())
+ self.assertEqual(Retry(1).increment().total, 0)
+
+ def test_disabled(self):
+ self.assertRaises(MaxRetryError, Retry(-1).increment)
+ self.assertRaises(MaxRetryError, Retry(0).increment)
+
+ def test_error_message(self):
+ retry = Retry(total=0)
+ try:
+ retry = retry.increment(error=ReadTimeoutError(None, "/", "read timed out"))
+ raise AssertionError("Should have raised a MaxRetryError")
+ except MaxRetryError as e:
+ assert 'Caused by redirect' not in str(e)
+ self.assertEqual(str(e.reason), 'None: read timed out')
+
+ retry = Retry(total=1)
+ try:
+ retry = retry.increment('POST', '/')
+ retry = retry.increment('POST', '/')
+ raise AssertionError("Should have raised a MaxRetryError")
+ except MaxRetryError as e:
+ assert 'Caused by redirect' not in str(e)
+ self.assertTrue(isinstance(e.reason, ResponseError),
+ "%s should be a ResponseError" % e.reason)
+ self.assertEqual(str(e.reason), ResponseError.GENERIC_ERROR)
+
+ retry = Retry(total=1)
+ try:
+ response = HTTPResponse(status=500)
+ retry = retry.increment('POST', '/', response=response)
+ retry = retry.increment('POST', '/', response=response)
+ raise AssertionError("Should have raised a MaxRetryError")
+ except MaxRetryError as e:
+ assert 'Caused by redirect' not in str(e)
+ msg = ResponseError.SPECIFIC_ERROR.format(status_code=500)
+ self.assertEqual(str(e.reason), msg)
+
+ retry = Retry(connect=1)
+ try:
+ retry = retry.increment(error=ConnectTimeoutError('conntimeout'))
+ retry = retry.increment(error=ConnectTimeoutError('conntimeout'))
+ raise AssertionError("Should have raised a MaxRetryError")
+ except MaxRetryError as e:
+ assert 'Caused by redirect' not in str(e)
+ self.assertEqual(str(e.reason), 'conntimeout')
diff --git a/test/test_util.py b/test/test_util.py
new file mode 100644
index 0000000..19ba57e
--- /dev/null
+++ b/test/test_util.py
@@ -0,0 +1,406 @@
+import warnings
+import logging
+import unittest
+import ssl
+from itertools import chain
+
+from mock import patch, Mock
+
+from urllib3 import add_stderr_logger, disable_warnings
+from urllib3.util.request import make_headers
+from urllib3.util.timeout import Timeout
+from urllib3.util.url import (
+ get_host,
+ parse_url,
+ split_first,
+ Url,
+)
+from urllib3.util.ssl_ import (
+ resolve_cert_reqs,
+ ssl_wrap_socket,
+)
+from urllib3.exceptions import (
+ LocationParseError,
+ TimeoutStateError,
+ InsecureRequestWarning,
+ SSLError,
+)
+
+from urllib3.util import is_fp_closed, ssl_
+
+from . import clear_warnings
+
+# This number represents a time in seconds, it doesn't mean anything in
+# isolation. Setting to a high-ish value to avoid conflicts with the smaller
+# numbers used for timeouts
+TIMEOUT_EPOCH = 1000
+
+class TestUtil(unittest.TestCase):
+ def test_get_host(self):
+ url_host_map = {
+ # Hosts
+ 'http://google.com/mail': ('http', 'google.com', None),
+ 'http://google.com/mail/': ('http', 'google.com', None),
+ 'google.com/mail': ('http', 'google.com', None),
+ 'http://google.com/': ('http', 'google.com', None),
+ 'http://google.com': ('http', 'google.com', None),
+ 'http://www.google.com': ('http', 'www.google.com', None),
+ 'http://mail.google.com': ('http', 'mail.google.com', None),
+ 'http://google.com:8000/mail/': ('http', 'google.com', 8000),
+ 'http://google.com:8000': ('http', 'google.com', 8000),
+ 'https://google.com': ('https', 'google.com', None),
+ 'https://google.com:8000': ('https', 'google.com', 8000),
+ 'http://user:password@127.0.0.1:1234': ('http', '127.0.0.1', 1234),
+ 'http://google.com/foo=http://bar:42/baz': ('http', 'google.com', None),
+ 'http://google.com?foo=http://bar:42/baz': ('http', 'google.com', None),
+ 'http://google.com#foo=http://bar:42/baz': ('http', 'google.com', None),
+
+ # IPv4
+ '173.194.35.7': ('http', '173.194.35.7', None),
+ 'http://173.194.35.7': ('http', '173.194.35.7', None),
+ 'http://173.194.35.7/test': ('http', '173.194.35.7', None),
+ 'http://173.194.35.7:80': ('http', '173.194.35.7', 80),
+ 'http://173.194.35.7:80/test': ('http', '173.194.35.7', 80),
+
+ # IPv6
+ '[2a00:1450:4001:c01::67]': ('http', '[2a00:1450:4001:c01::67]', None),
+ 'http://[2a00:1450:4001:c01::67]': ('http', '[2a00:1450:4001:c01::67]', None),
+ 'http://[2a00:1450:4001:c01::67]/test': ('http', '[2a00:1450:4001:c01::67]', None),
+ 'http://[2a00:1450:4001:c01::67]:80': ('http', '[2a00:1450:4001:c01::67]', 80),
+ 'http://[2a00:1450:4001:c01::67]:80/test': ('http', '[2a00:1450:4001:c01::67]', 80),
+
+ # More IPv6 from http://www.ietf.org/rfc/rfc2732.txt
+ 'http://[FEDC:BA98:7654:3210:FEDC:BA98:7654:3210]:8000/index.html': ('http', '[FEDC:BA98:7654:3210:FEDC:BA98:7654:3210]', 8000),
+ 'http://[1080:0:0:0:8:800:200C:417A]/index.html': ('http', '[1080:0:0:0:8:800:200C:417A]', None),
+ 'http://[3ffe:2a00:100:7031::1]': ('http', '[3ffe:2a00:100:7031::1]', None),
+ 'http://[1080::8:800:200C:417A]/foo': ('http', '[1080::8:800:200C:417A]', None),
+ 'http://[::192.9.5.5]/ipng': ('http', '[::192.9.5.5]', None),
+ 'http://[::FFFF:129.144.52.38]:42/index.html': ('http', '[::FFFF:129.144.52.38]', 42),
+ 'http://[2010:836B:4179::836B:4179]': ('http', '[2010:836B:4179::836B:4179]', None),
+ }
+ for url, expected_host in url_host_map.items():
+ returned_host = get_host(url)
+ self.assertEqual(returned_host, expected_host)
+
+ def test_invalid_host(self):
+ # TODO: Add more tests
+ invalid_host = [
+ 'http://google.com:foo',
+ 'http://::1/',
+ 'http://::1:80/',
+ ]
+
+ for location in invalid_host:
+ self.assertRaises(LocationParseError, get_host, location)
+
+
+ parse_url_host_map = {
+ 'http://google.com/mail': Url('http', host='google.com', path='/mail'),
+ 'http://google.com/mail/': Url('http', host='google.com', path='/mail/'),
+ 'http://google.com/mail': Url('http', host='google.com', path='mail'),
+ 'google.com/mail': Url(host='google.com', path='/mail'),
+ 'http://google.com/': Url('http', host='google.com', path='/'),
+ 'http://google.com': Url('http', host='google.com'),
+ 'http://google.com?foo': Url('http', host='google.com', path='', query='foo'),
+
+ # Path/query/fragment
+ '': Url(),
+ '/': Url(path='/'),
+ '#?/!google.com/?foo#bar': Url(path='', fragment='?/!google.com/?foo#bar'),
+ '/foo': Url(path='/foo'),
+ '/foo?bar=baz': Url(path='/foo', query='bar=baz'),
+ '/foo?bar=baz#banana?apple/orange': Url(path='/foo', query='bar=baz', fragment='banana?apple/orange'),
+
+ # Port
+ 'http://google.com/': Url('http', host='google.com', path='/'),
+ 'http://google.com:80/': Url('http', host='google.com', port=80, path='/'),
+ 'http://google.com:80': Url('http', host='google.com', port=80),
+
+ # Auth
+ 'http://foo:bar@localhost/': Url('http', auth='foo:bar', host='localhost', path='/'),
+ 'http://foo@localhost/': Url('http', auth='foo', host='localhost', path='/'),
+ 'http://foo:bar@baz@localhost/': Url('http', auth='foo:bar@baz', host='localhost', path='/'),
+ 'http://@': Url('http', host=None, auth='')
+ }
+
+ non_round_tripping_parse_url_host_map = {
+ # Path/query/fragment
+ '?': Url(path='', query=''),
+ '#': Url(path='', fragment=''),
+
+ # Empty Port
+ 'http://google.com:': Url('http', host='google.com'),
+ 'http://google.com:/': Url('http', host='google.com', path='/'),
+
+ }
+
+ def test_parse_url(self):
+ for url, expected_Url in chain(self.parse_url_host_map.items(), self.non_round_tripping_parse_url_host_map.items()):
+ returned_Url = parse_url(url)
+ self.assertEqual(returned_Url, expected_Url)
+
+ def test_unparse_url(self):
+ for url, expected_Url in self.parse_url_host_map.items():
+ self.assertEqual(url, expected_Url.url)
+
+ def test_parse_url_invalid_IPv6(self):
+ self.assertRaises(ValueError, parse_url, '[::1')
+
+ def test_Url_str(self):
+ U = Url('http', host='google.com')
+ self.assertEqual(str(U), U.url)
+
+ def test_request_uri(self):
+ url_host_map = {
+ 'http://google.com/mail': '/mail',
+ 'http://google.com/mail/': '/mail/',
+ 'http://google.com/': '/',
+ 'http://google.com': '/',
+ '': '/',
+ '/': '/',
+ '?': '/?',
+ '#': '/',
+ '/foo?bar=baz': '/foo?bar=baz',
+ }
+ for url, expected_request_uri in url_host_map.items():
+ returned_url = parse_url(url)
+ self.assertEqual(returned_url.request_uri, expected_request_uri)
+
+ def test_netloc(self):
+ url_netloc_map = {
+ 'http://google.com/mail': 'google.com',
+ 'http://google.com:80/mail': 'google.com:80',
+ 'google.com/foobar': 'google.com',
+ 'google.com:12345': 'google.com:12345',
+ }
+
+ for url, expected_netloc in url_netloc_map.items():
+ self.assertEqual(parse_url(url).netloc, expected_netloc)
+
+ def test_make_headers(self):
+ self.assertEqual(
+ make_headers(accept_encoding=True),
+ {'accept-encoding': 'gzip,deflate'})
+
+ self.assertEqual(
+ make_headers(accept_encoding='foo,bar'),
+ {'accept-encoding': 'foo,bar'})
+
+ self.assertEqual(
+ make_headers(accept_encoding=['foo', 'bar']),
+ {'accept-encoding': 'foo,bar'})
+
+ self.assertEqual(
+ make_headers(accept_encoding=True, user_agent='banana'),
+ {'accept-encoding': 'gzip,deflate', 'user-agent': 'banana'})
+
+ self.assertEqual(
+ make_headers(user_agent='banana'),
+ {'user-agent': 'banana'})
+
+ self.assertEqual(
+ make_headers(keep_alive=True),
+ {'connection': 'keep-alive'})
+
+ self.assertEqual(
+ make_headers(basic_auth='foo:bar'),
+ {'authorization': 'Basic Zm9vOmJhcg=='})
+
+ self.assertEqual(
+ make_headers(proxy_basic_auth='foo:bar'),
+ {'proxy-authorization': 'Basic Zm9vOmJhcg=='})
+
+ self.assertEqual(
+ make_headers(disable_cache=True),
+ {'cache-control': 'no-cache'})
+
+ def test_split_first(self):
+ test_cases = {
+ ('abcd', 'b'): ('a', 'cd', 'b'),
+ ('abcd', 'cb'): ('a', 'cd', 'b'),
+ ('abcd', ''): ('abcd', '', None),
+ ('abcd', 'a'): ('', 'bcd', 'a'),
+ ('abcd', 'ab'): ('', 'bcd', 'a'),
+ }
+ for input, expected in test_cases.items():
+ output = split_first(*input)
+ self.assertEqual(output, expected)
+
+ def test_add_stderr_logger(self):
+ handler = add_stderr_logger(level=logging.INFO) # Don't actually print debug
+ logger = logging.getLogger('urllib3')
+ self.assertTrue(handler in logger.handlers)
+
+ logger.debug('Testing add_stderr_logger')
+ logger.removeHandler(handler)
+
+ def test_disable_warnings(self):
+ with warnings.catch_warnings(record=True) as w:
+ clear_warnings()
+ warnings.warn('This is a test.', InsecureRequestWarning)
+ self.assertEqual(len(w), 1)
+ disable_warnings()
+ warnings.warn('This is a test.', InsecureRequestWarning)
+ self.assertEqual(len(w), 1)
+
+ def _make_time_pass(self, seconds, timeout, time_mock):
+ """ Make some time pass for the timeout object """
+ time_mock.return_value = TIMEOUT_EPOCH
+ timeout.start_connect()
+ time_mock.return_value = TIMEOUT_EPOCH + seconds
+ return timeout
+
+ def test_invalid_timeouts(self):
+ try:
+ Timeout(total=-1)
+ self.fail("negative value should throw exception")
+ except ValueError as e:
+ self.assertTrue('less than' in str(e))
+ try:
+ Timeout(connect=2, total=-1)
+ self.fail("negative value should throw exception")
+ except ValueError as e:
+ self.assertTrue('less than' in str(e))
+
+ try:
+ Timeout(read=-1)
+ self.fail("negative value should throw exception")
+ except ValueError as e:
+ self.assertTrue('less than' in str(e))
+
+ # Booleans are allowed also by socket.settimeout and converted to the
+ # equivalent float (1.0 for True, 0.0 for False)
+ Timeout(connect=False, read=True)
+
+ try:
+ Timeout(read="foo")
+ self.fail("string value should not be allowed")
+ except ValueError as e:
+ self.assertTrue('int or float' in str(e))
+
+
+ @patch('urllib3.util.timeout.current_time')
+ def test_timeout(self, current_time):
+ timeout = Timeout(total=3)
+
+ # make 'no time' elapse
+ timeout = self._make_time_pass(seconds=0, timeout=timeout,
+ time_mock=current_time)
+ self.assertEqual(timeout.read_timeout, 3)
+ self.assertEqual(timeout.connect_timeout, 3)
+
+ timeout = Timeout(total=3, connect=2)
+ self.assertEqual(timeout.connect_timeout, 2)
+
+ timeout = Timeout()
+ self.assertEqual(timeout.connect_timeout, Timeout.DEFAULT_TIMEOUT)
+
+ # Connect takes 5 seconds, leaving 5 seconds for read
+ timeout = Timeout(total=10, read=7)
+ timeout = self._make_time_pass(seconds=5, timeout=timeout,
+ time_mock=current_time)
+ self.assertEqual(timeout.read_timeout, 5)
+
+ # Connect takes 2 seconds, read timeout still 7 seconds
+ timeout = Timeout(total=10, read=7)
+ timeout = self._make_time_pass(seconds=2, timeout=timeout,
+ time_mock=current_time)
+ self.assertEqual(timeout.read_timeout, 7)
+
+ timeout = Timeout(total=10, read=7)
+ self.assertEqual(timeout.read_timeout, 7)
+
+ timeout = Timeout(total=None, read=None, connect=None)
+ self.assertEqual(timeout.connect_timeout, None)
+ self.assertEqual(timeout.read_timeout, None)
+ self.assertEqual(timeout.total, None)
+
+ timeout = Timeout(5)
+ self.assertEqual(timeout.total, 5)
+
+
+ def test_timeout_str(self):
+ timeout = Timeout(connect=1, read=2, total=3)
+ self.assertEqual(str(timeout), "Timeout(connect=1, read=2, total=3)")
+ timeout = Timeout(connect=1, read=None, total=3)
+ self.assertEqual(str(timeout), "Timeout(connect=1, read=None, total=3)")
+
+
+ @patch('urllib3.util.timeout.current_time')
+ def test_timeout_elapsed(self, current_time):
+ current_time.return_value = TIMEOUT_EPOCH
+ timeout = Timeout(total=3)
+ self.assertRaises(TimeoutStateError, timeout.get_connect_duration)
+
+ timeout.start_connect()
+ self.assertRaises(TimeoutStateError, timeout.start_connect)
+
+ current_time.return_value = TIMEOUT_EPOCH + 2
+ self.assertEqual(timeout.get_connect_duration(), 2)
+ current_time.return_value = TIMEOUT_EPOCH + 37
+ self.assertEqual(timeout.get_connect_duration(), 37)
+
+ def test_resolve_cert_reqs(self):
+ self.assertEqual(resolve_cert_reqs(None), ssl.CERT_NONE)
+ self.assertEqual(resolve_cert_reqs(ssl.CERT_NONE), ssl.CERT_NONE)
+
+ self.assertEqual(resolve_cert_reqs(ssl.CERT_REQUIRED), ssl.CERT_REQUIRED)
+ self.assertEqual(resolve_cert_reqs('REQUIRED'), ssl.CERT_REQUIRED)
+ self.assertEqual(resolve_cert_reqs('CERT_REQUIRED'), ssl.CERT_REQUIRED)
+
+ def test_is_fp_closed_object_supports_closed(self):
+ class ClosedFile(object):
+ @property
+ def closed(self):
+ return True
+
+ self.assertTrue(is_fp_closed(ClosedFile()))
+
+ def test_is_fp_closed_object_has_none_fp(self):
+ class NoneFpFile(object):
+ @property
+ def fp(self):
+ return None
+
+ self.assertTrue(is_fp_closed(NoneFpFile()))
+
+ def test_is_fp_closed_object_has_fp(self):
+ class FpFile(object):
+ @property
+ def fp(self):
+ return True
+
+ self.assertTrue(not is_fp_closed(FpFile()))
+
+ def test_is_fp_closed_object_has_neither_fp_nor_closed(self):
+ class NotReallyAFile(object):
+ pass
+
+ self.assertRaises(ValueError, is_fp_closed, NotReallyAFile())
+
+ def test_ssl_wrap_socket_loads_the_cert_chain(self):
+ socket = object()
+ mock_context = Mock()
+ ssl_wrap_socket(ssl_context=mock_context, sock=socket,
+ certfile='/path/to/certfile')
+
+ mock_context.load_cert_chain.assert_called_once_with(
+ '/path/to/certfile', None)
+
+ def test_ssl_wrap_socket_loads_verify_locations(self):
+ socket = object()
+ mock_context = Mock()
+ ssl_wrap_socket(ssl_context=mock_context, ca_certs='/path/to/pem',
+ sock=socket)
+ mock_context.load_verify_locations.assert_called_once_with(
+ '/path/to/pem')
+
+ def test_ssl_wrap_socket_with_no_sni(self):
+ socket = object()
+ mock_context = Mock()
+ # Ugly preservation of original value
+ HAS_SNI = ssl_.HAS_SNI
+ ssl_.HAS_SNI = False
+ ssl_wrap_socket(ssl_context=mock_context, sock=socket)
+ mock_context.wrap_socket.assert_called_once_with(socket)
+ ssl_.HAS_SNI = HAS_SNI
diff --git a/test/with_dummyserver/__init__.py b/test/with_dummyserver/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/with_dummyserver/__init__.py
diff --git a/test/with_dummyserver/test_connectionpool.py b/test/with_dummyserver/test_connectionpool.py
new file mode 100644
index 0000000..741ae7b
--- /dev/null
+++ b/test/with_dummyserver/test_connectionpool.py
@@ -0,0 +1,760 @@
+import errno
+import logging
+import socket
+import sys
+import unittest
+import time
+import warnings
+
+import mock
+
+try:
+ from urllib.parse import urlencode
+except:
+ from urllib import urlencode
+
+from .. import (
+ requires_network, onlyPy3, onlyPy26OrOlder,
+ TARPIT_HOST, VALID_SOURCE_ADDRESSES, INVALID_SOURCE_ADDRESSES,
+)
+from ..port_helpers import find_unused_port
+from urllib3 import (
+ encode_multipart_formdata,
+ HTTPConnectionPool,
+)
+from urllib3.exceptions import (
+ ConnectTimeoutError,
+ EmptyPoolError,
+ DecodeError,
+ MaxRetryError,
+ ReadTimeoutError,
+ ProtocolError,
+)
+from urllib3.packages.six import b, u
+from urllib3.util.retry import Retry
+from urllib3.util.timeout import Timeout
+
+import tornado
+from dummyserver.testcase import HTTPDummyServerTestCase
+from dummyserver.server import NoIPv6Warning, HAS_IPV6_AND_DNS
+
+from nose.tools import timed
+
+log = logging.getLogger('urllib3.connectionpool')
+log.setLevel(logging.NOTSET)
+log.addHandler(logging.StreamHandler(sys.stdout))
+
+
+class TestConnectionPool(HTTPDummyServerTestCase):
+
+ def setUp(self):
+ self.pool = HTTPConnectionPool(self.host, self.port)
+
+ def test_get(self):
+ r = self.pool.request('GET', '/specific_method',
+ fields={'method': 'GET'})
+ self.assertEqual(r.status, 200, r.data)
+
+ def test_post_url(self):
+ r = self.pool.request('POST', '/specific_method',
+ fields={'method': 'POST'})
+ self.assertEqual(r.status, 200, r.data)
+
+ def test_urlopen_put(self):
+ r = self.pool.urlopen('PUT', '/specific_method?method=PUT')
+ self.assertEqual(r.status, 200, r.data)
+
+ def test_wrong_specific_method(self):
+ # To make sure the dummy server is actually returning failed responses
+ r = self.pool.request('GET', '/specific_method',
+ fields={'method': 'POST'})
+ self.assertEqual(r.status, 400, r.data)
+
+ r = self.pool.request('POST', '/specific_method',
+ fields={'method': 'GET'})
+ self.assertEqual(r.status, 400, r.data)
+
+ def test_upload(self):
+ data = "I'm in ur multipart form-data, hazing a cheezburgr"
+ fields = {
+ 'upload_param': 'filefield',
+ 'upload_filename': 'lolcat.txt',
+ 'upload_size': len(data),
+ 'filefield': ('lolcat.txt', data),
+ }
+
+ r = self.pool.request('POST', '/upload', fields=fields)
+ self.assertEqual(r.status, 200, r.data)
+
+ def test_one_name_multiple_values(self):
+ fields = [
+ ('foo', 'a'),
+ ('foo', 'b'),
+ ]
+
+ # urlencode
+ r = self.pool.request('GET', '/echo', fields=fields)
+ self.assertEqual(r.data, b'foo=a&foo=b')
+
+ # multipart
+ r = self.pool.request('POST', '/echo', fields=fields)
+ self.assertEqual(r.data.count(b'name="foo"'), 2)
+
+ def test_request_method_body(self):
+ body = b'hi'
+ r = self.pool.request('POST', '/echo', body=body)
+ self.assertEqual(r.data, body)
+
+ fields = [('hi', 'hello')]
+ self.assertRaises(TypeError, self.pool.request, 'POST', '/echo', body=body, fields=fields)
+
+ def test_unicode_upload(self):
+ fieldname = u('myfile')
+ filename = u('\xe2\x99\xa5.txt')
+ data = u('\xe2\x99\xa5').encode('utf8')
+ size = len(data)
+
+ fields = {
+ u('upload_param'): fieldname,
+ u('upload_filename'): filename,
+ u('upload_size'): size,
+ fieldname: (filename, data),
+ }
+
+ r = self.pool.request('POST', '/upload', fields=fields)
+ self.assertEqual(r.status, 200, r.data)
+
+ def test_timeout_float(self):
+ url = '/sleep?seconds=0.005'
+ # Pool-global timeout
+ pool = HTTPConnectionPool(self.host, self.port, timeout=0.001, retries=False)
+ self.assertRaises(ReadTimeoutError, pool.request, 'GET', url)
+
+ def test_conn_closed(self):
+ pool = HTTPConnectionPool(self.host, self.port, timeout=0.001, retries=False)
+ conn = pool._get_conn()
+ pool._put_conn(conn)
+ try:
+ url = '/sleep?seconds=0.005'
+ pool.urlopen('GET', url)
+ self.fail("The request should fail with a timeout error.")
+ except ReadTimeoutError:
+ if conn.sock:
+ self.assertRaises(socket.error, conn.sock.recv, 1024)
+ finally:
+ pool._put_conn(conn)
+
+ def test_nagle(self):
+ """ Test that connections have TCP_NODELAY turned on """
+ # This test needs to be here in order to be run. socket.create_connection actually tries to
+ # connect to the host provided so we need a dummyserver to be running.
+ pool = HTTPConnectionPool(self.host, self.port)
+ conn = pool._get_conn()
+ pool._make_request(conn, 'GET', '/')
+ tcp_nodelay_setting = conn.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
+ assert tcp_nodelay_setting > 0, ("Expected TCP_NODELAY to be set on the "
+ "socket (with value greater than 0) "
+ "but instead was %s" %
+ tcp_nodelay_setting)
+
+ def test_socket_options(self):
+ """Test that connections accept socket options."""
+ # This test needs to be here in order to be run. socket.create_connection actually tries to
+ # connect to the host provided so we need a dummyserver to be running.
+ pool = HTTPConnectionPool(self.host, self.port, socket_options=[
+ (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ ])
+ s = pool._new_conn()._new_conn() # Get the socket
+ using_keepalive = s.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE) > 0
+ self.assertTrue(using_keepalive)
+ s.close()
+
+ def test_disable_default_socket_options(self):
+ """Test that passing None disables all socket options."""
+ # This test needs to be here in order to be run. socket.create_connection actually tries to
+ # connect to the host provided so we need a dummyserver to be running.
+ pool = HTTPConnectionPool(self.host, self.port, socket_options=None)
+ s = pool._new_conn()._new_conn()
+ using_nagle = s.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) == 0
+ self.assertTrue(using_nagle)
+ s.close()
+
+ def test_defaults_are_applied(self):
+ """Test that modifying the default socket options works."""
+ # This test needs to be here in order to be run. socket.create_connection actually tries to
+ # connect to the host provided so we need a dummyserver to be running.
+ pool = HTTPConnectionPool(self.host, self.port)
+ # Get the HTTPConnection instance
+ conn = pool._new_conn()
+ # Update the default socket options
+ conn.default_socket_options += [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)]
+ s = conn._new_conn()
+ nagle_disabled = s.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) > 0
+ using_keepalive = s.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE) > 0
+ self.assertTrue(nagle_disabled)
+ self.assertTrue(using_keepalive)
+
+ @timed(0.5)
+ def test_timeout(self):
+ """ Requests should time out when expected """
+ url = '/sleep?seconds=0.003'
+ timeout = Timeout(read=0.001)
+
+ # Pool-global timeout
+ pool = HTTPConnectionPool(self.host, self.port, timeout=timeout, retries=False)
+
+ conn = pool._get_conn()
+ self.assertRaises(ReadTimeoutError, pool._make_request,
+ conn, 'GET', url)
+ pool._put_conn(conn)
+
+ time.sleep(0.02) # Wait for server to start receiving again. :(
+
+ self.assertRaises(ReadTimeoutError, pool.request, 'GET', url)
+
+ # Request-specific timeouts should raise errors
+ pool = HTTPConnectionPool(self.host, self.port, timeout=0.1, retries=False)
+
+ conn = pool._get_conn()
+ self.assertRaises(ReadTimeoutError, pool._make_request,
+ conn, 'GET', url, timeout=timeout)
+ pool._put_conn(conn)
+
+ time.sleep(0.02) # Wait for server to start receiving again. :(
+
+ self.assertRaises(ReadTimeoutError, pool.request,
+ 'GET', url, timeout=timeout)
+
+ # Timeout int/float passed directly to request and _make_request should
+ # raise a request timeout
+ self.assertRaises(ReadTimeoutError, pool.request,
+ 'GET', url, timeout=0.001)
+ conn = pool._new_conn()
+ self.assertRaises(ReadTimeoutError, pool._make_request, conn,
+ 'GET', url, timeout=0.001)
+ pool._put_conn(conn)
+
+ # Timeout int/float passed directly to _make_request should not raise a
+ # request timeout if it's a high value
+ pool.request('GET', url, timeout=1)
+
+ @requires_network
+ @timed(0.5)
+ def test_connect_timeout(self):
+ url = '/sleep?seconds=0.005'
+ timeout = Timeout(connect=0.001)
+
+ # Pool-global timeout
+ pool = HTTPConnectionPool(TARPIT_HOST, self.port, timeout=timeout)
+ conn = pool._get_conn()
+ self.assertRaises(ConnectTimeoutError, pool._make_request, conn, 'GET', url)
+
+ # Retries
+ retries = Retry(connect=0)
+ self.assertRaises(MaxRetryError, pool.request, 'GET', url,
+ retries=retries)
+
+ # Request-specific connection timeouts
+ big_timeout = Timeout(read=0.2, connect=0.2)
+ pool = HTTPConnectionPool(TARPIT_HOST, self.port,
+ timeout=big_timeout, retries=False)
+ conn = pool._get_conn()
+ self.assertRaises(ConnectTimeoutError, pool._make_request, conn, 'GET',
+ url, timeout=timeout)
+
+ pool._put_conn(conn)
+ self.assertRaises(ConnectTimeoutError, pool.request, 'GET', url,
+ timeout=timeout)
+
+
+ def test_connection_error_retries(self):
+ """ ECONNREFUSED error should raise a connection error, with retries """
+ port = find_unused_port()
+ pool = HTTPConnectionPool(self.host, port)
+ try:
+ pool.request('GET', '/', retries=Retry(connect=3))
+ self.fail("Should have failed with a connection error.")
+ except MaxRetryError as e:
+ self.assertTrue(isinstance(e.reason, ProtocolError))
+ self.assertEqual(e.reason.args[1].errno, errno.ECONNREFUSED)
+
+ def test_timeout_reset(self):
+ """ If the read timeout isn't set, socket timeout should reset """
+ url = '/sleep?seconds=0.005'
+ timeout = Timeout(connect=0.001)
+ pool = HTTPConnectionPool(self.host, self.port, timeout=timeout)
+ conn = pool._get_conn()
+ try:
+ pool._make_request(conn, 'GET', url)
+ except ReadTimeoutError:
+ self.fail("This request shouldn't trigger a read timeout.")
+
+ @requires_network
+ @timed(5.0)
+ def test_total_timeout(self):
+ url = '/sleep?seconds=0.005'
+
+ timeout = Timeout(connect=3, read=5, total=0.001)
+ pool = HTTPConnectionPool(TARPIT_HOST, self.port, timeout=timeout)
+ conn = pool._get_conn()
+ self.assertRaises(ConnectTimeoutError, pool._make_request, conn, 'GET', url)
+
+ # This will get the socket to raise an EAGAIN on the read
+ timeout = Timeout(connect=3, read=0)
+ pool = HTTPConnectionPool(self.host, self.port, timeout=timeout)
+ conn = pool._get_conn()
+ self.assertRaises(ReadTimeoutError, pool._make_request, conn, 'GET', url)
+
+ # The connect should succeed and this should hit the read timeout
+ timeout = Timeout(connect=3, read=5, total=0.002)
+ pool = HTTPConnectionPool(self.host, self.port, timeout=timeout)
+ conn = pool._get_conn()
+ self.assertRaises(ReadTimeoutError, pool._make_request, conn, 'GET', url)
+
+ @requires_network
+ def test_none_total_applies_connect(self):
+ url = '/sleep?seconds=0.005'
+ timeout = Timeout(total=None, connect=0.001)
+ pool = HTTPConnectionPool(TARPIT_HOST, self.port, timeout=timeout)
+ conn = pool._get_conn()
+ self.assertRaises(ConnectTimeoutError, pool._make_request, conn, 'GET',
+ url)
+
+ def test_timeout_success(self):
+ timeout = Timeout(connect=3, read=5, total=None)
+ pool = HTTPConnectionPool(self.host, self.port, timeout=timeout)
+ pool.request('GET', '/')
+ # This should not raise a "Timeout already started" error
+ pool.request('GET', '/')
+
+ pool = HTTPConnectionPool(self.host, self.port, timeout=timeout)
+ # This should also not raise a "Timeout already started" error
+ pool.request('GET', '/')
+
+ timeout = Timeout(total=None)
+ pool = HTTPConnectionPool(self.host, self.port, timeout=timeout)
+ pool.request('GET', '/')
+
+ def test_tunnel(self):
+ # note the actual httplib.py has no tests for this functionality
+ timeout = Timeout(total=None)
+ pool = HTTPConnectionPool(self.host, self.port, timeout=timeout)
+ conn = pool._get_conn()
+ try:
+ conn.set_tunnel(self.host, self.port)
+ except AttributeError: # python 2.6
+ conn._set_tunnel(self.host, self.port)
+
+ conn._tunnel = mock.Mock(return_value=None)
+ pool._make_request(conn, 'GET', '/')
+ conn._tunnel.assert_called_once_with()
+
+ # test that it's not called when tunnel is not set
+ timeout = Timeout(total=None)
+ pool = HTTPConnectionPool(self.host, self.port, timeout=timeout)
+ conn = pool._get_conn()
+
+ conn._tunnel = mock.Mock(return_value=None)
+ pool._make_request(conn, 'GET', '/')
+ self.assertEqual(conn._tunnel.called, False)
+
+ def test_redirect(self):
+ r = self.pool.request('GET', '/redirect', fields={'target': '/'}, redirect=False)
+ self.assertEqual(r.status, 303)
+
+ r = self.pool.request('GET', '/redirect', fields={'target': '/'})
+ self.assertEqual(r.status, 200)
+ self.assertEqual(r.data, b'Dummy server!')
+
+ def test_bad_connect(self):
+ pool = HTTPConnectionPool('badhost.invalid', self.port)
+ try:
+ pool.request('GET', '/', retries=5)
+ self.fail("should raise timeout exception here")
+ except MaxRetryError as e:
+ self.assertTrue(isinstance(e.reason, ProtocolError), e.reason)
+
+ def test_keepalive(self):
+ pool = HTTPConnectionPool(self.host, self.port, block=True, maxsize=1)
+
+ r = pool.request('GET', '/keepalive?close=0')
+ r = pool.request('GET', '/keepalive?close=0')
+
+ self.assertEqual(r.status, 200)
+ self.assertEqual(pool.num_connections, 1)
+ self.assertEqual(pool.num_requests, 2)
+
+ def test_keepalive_close(self):
+ pool = HTTPConnectionPool(self.host, self.port,
+ block=True, maxsize=1, timeout=2)
+
+ r = pool.request('GET', '/keepalive?close=1', retries=0,
+ headers={
+ "Connection": "close",
+ })
+
+ self.assertEqual(pool.num_connections, 1)
+
+ # The dummyserver will have responded with Connection:close,
+ # and httplib will properly cleanup the socket.
+
+ # We grab the HTTPConnection object straight from the Queue,
+ # because _get_conn() is where the check & reset occurs
+ # pylint: disable-msg=W0212
+ conn = pool.pool.get()
+ self.assertEqual(conn.sock, None)
+ pool._put_conn(conn)
+
+ # Now with keep-alive
+ r = pool.request('GET', '/keepalive?close=0', retries=0,
+ headers={
+ "Connection": "keep-alive",
+ })
+
+ # The dummyserver responded with Connection:keep-alive, the connection
+ # persists.
+ conn = pool.pool.get()
+ self.assertNotEqual(conn.sock, None)
+ pool._put_conn(conn)
+
+ # Another request asking the server to close the connection. This one
+ # should get cleaned up for the next request.
+ r = pool.request('GET', '/keepalive?close=1', retries=0,
+ headers={
+ "Connection": "close",
+ })
+
+ self.assertEqual(r.status, 200)
+
+ conn = pool.pool.get()
+ self.assertEqual(conn.sock, None)
+ pool._put_conn(conn)
+
+ # Next request
+ r = pool.request('GET', '/keepalive?close=0')
+
+ def test_post_with_urlencode(self):
+ data = {'banana': 'hammock', 'lol': 'cat'}
+ r = self.pool.request('POST', '/echo', fields=data, encode_multipart=False)
+ self.assertEqual(r.data.decode('utf-8'), urlencode(data))
+
+ def test_post_with_multipart(self):
+ data = {'banana': 'hammock', 'lol': 'cat'}
+ r = self.pool.request('POST', '/echo',
+ fields=data,
+ encode_multipart=True)
+ body = r.data.split(b'\r\n')
+
+ encoded_data = encode_multipart_formdata(data)[0]
+ expected_body = encoded_data.split(b'\r\n')
+
+ # TODO: Get rid of extra parsing stuff when you can specify
+ # a custom boundary to encode_multipart_formdata
+ """
+ We need to loop the return lines because a timestamp is attached
+ from within encode_multipart_formdata. When the server echos back
+ the data, it has the timestamp from when the data was encoded, which
+ is not equivalent to when we run encode_multipart_formdata on
+ the data again.
+ """
+ for i, line in enumerate(body):
+ if line.startswith(b'--'):
+ continue
+
+ self.assertEqual(body[i], expected_body[i])
+
+ def test_check_gzip(self):
+ r = self.pool.request('GET', '/encodingrequest',
+ headers={'accept-encoding': 'gzip'})
+ self.assertEqual(r.headers.get('content-encoding'), 'gzip')
+ self.assertEqual(r.data, b'hello, world!')
+
+ def test_check_deflate(self):
+ r = self.pool.request('GET', '/encodingrequest',
+ headers={'accept-encoding': 'deflate'})
+ self.assertEqual(r.headers.get('content-encoding'), 'deflate')
+ self.assertEqual(r.data, b'hello, world!')
+
+ def test_bad_decode(self):
+ self.assertRaises(DecodeError, self.pool.request,
+ 'GET', '/encodingrequest',
+ headers={'accept-encoding': 'garbage-deflate'})
+
+ self.assertRaises(DecodeError, self.pool.request,
+ 'GET', '/encodingrequest',
+ headers={'accept-encoding': 'garbage-gzip'})
+
+ def test_connection_count(self):
+ pool = HTTPConnectionPool(self.host, self.port, maxsize=1)
+
+ pool.request('GET', '/')
+ pool.request('GET', '/')
+ pool.request('GET', '/')
+
+ self.assertEqual(pool.num_connections, 1)
+ self.assertEqual(pool.num_requests, 3)
+
+ def test_connection_count_bigpool(self):
+ http_pool = HTTPConnectionPool(self.host, self.port, maxsize=16)
+
+ http_pool.request('GET', '/')
+ http_pool.request('GET', '/')
+ http_pool.request('GET', '/')
+
+ self.assertEqual(http_pool.num_connections, 1)
+ self.assertEqual(http_pool.num_requests, 3)
+
+ def test_partial_response(self):
+ pool = HTTPConnectionPool(self.host, self.port, maxsize=1)
+
+ req_data = {'lol': 'cat'}
+ resp_data = urlencode(req_data).encode('utf-8')
+
+ r = pool.request('GET', '/echo', fields=req_data, preload_content=False)
+
+ self.assertEqual(r.read(5), resp_data[:5])
+ self.assertEqual(r.read(), resp_data[5:])
+
+ def test_lazy_load_twice(self):
+ # This test is sad and confusing. Need to figure out what's
+ # going on with partial reads and socket reuse.
+
+ pool = HTTPConnectionPool(self.host, self.port, block=True, maxsize=1, timeout=2)
+
+ payload_size = 1024 * 2
+ first_chunk = 512
+
+ boundary = 'foo'
+
+ req_data = {'count': 'a' * payload_size}
+ resp_data = encode_multipart_formdata(req_data, boundary=boundary)[0]
+
+ req2_data = {'count': 'b' * payload_size}
+ resp2_data = encode_multipart_formdata(req2_data, boundary=boundary)[0]
+
+ r1 = pool.request('POST', '/echo', fields=req_data, multipart_boundary=boundary, preload_content=False)
+
+ self.assertEqual(r1.read(first_chunk), resp_data[:first_chunk])
+
+ try:
+ r2 = pool.request('POST', '/echo', fields=req2_data, multipart_boundary=boundary,
+ preload_content=False, pool_timeout=0.001)
+
+ # This branch should generally bail here, but maybe someday it will
+ # work? Perhaps by some sort of magic. Consider it a TODO.
+
+ self.assertEqual(r2.read(first_chunk), resp2_data[:first_chunk])
+
+ self.assertEqual(r1.read(), resp_data[first_chunk:])
+ self.assertEqual(r2.read(), resp2_data[first_chunk:])
+ self.assertEqual(pool.num_requests, 2)
+
+ except EmptyPoolError:
+ self.assertEqual(r1.read(), resp_data[first_chunk:])
+ self.assertEqual(pool.num_requests, 1)
+
+ self.assertEqual(pool.num_connections, 1)
+
+ def test_for_double_release(self):
+ MAXSIZE=5
+
+ # Check default state
+ pool = HTTPConnectionPool(self.host, self.port, maxsize=MAXSIZE)
+ self.assertEqual(pool.num_connections, 0)
+ self.assertEqual(pool.pool.qsize(), MAXSIZE)
+
+ # Make an empty slot for testing
+ pool.pool.get()
+ self.assertEqual(pool.pool.qsize(), MAXSIZE-1)
+
+ # Check state after simple request
+ pool.urlopen('GET', '/')
+ self.assertEqual(pool.pool.qsize(), MAXSIZE-1)
+
+ # Check state without release
+ pool.urlopen('GET', '/', preload_content=False)
+ self.assertEqual(pool.pool.qsize(), MAXSIZE-2)
+
+ pool.urlopen('GET', '/')
+ self.assertEqual(pool.pool.qsize(), MAXSIZE-2)
+
+ # Check state after read
+ pool.urlopen('GET', '/').data
+ self.assertEqual(pool.pool.qsize(), MAXSIZE-2)
+
+ pool.urlopen('GET', '/')
+ self.assertEqual(pool.pool.qsize(), MAXSIZE-2)
+
+ def test_release_conn_parameter(self):
+ MAXSIZE=5
+ pool = HTTPConnectionPool(self.host, self.port, maxsize=MAXSIZE)
+ self.assertEqual(pool.pool.qsize(), MAXSIZE)
+
+ # Make request without releasing connection
+ pool.request('GET', '/', release_conn=False, preload_content=False)
+ self.assertEqual(pool.pool.qsize(), MAXSIZE-1)
+
+ def test_dns_error(self):
+ pool = HTTPConnectionPool('thishostdoesnotexist.invalid', self.port, timeout=0.001)
+ self.assertRaises(MaxRetryError, pool.request, 'GET', '/test', retries=2)
+
+ def test_source_address(self):
+ for addr, is_ipv6 in VALID_SOURCE_ADDRESSES:
+ if is_ipv6 and not HAS_IPV6_AND_DNS:
+ warnings.warn("No IPv6 support: skipping.",
+ NoIPv6Warning)
+ continue
+ pool = HTTPConnectionPool(self.host, self.port,
+ source_address=addr, retries=False)
+ r = pool.request('GET', '/source_address')
+ assert r.data == b(addr[0]), (
+ "expected the response to contain the source address {addr}, "
+ "but was {data}".format(data=r.data, addr=b(addr[0])))
+
+ def test_source_address_error(self):
+ for addr in INVALID_SOURCE_ADDRESSES:
+ pool = HTTPConnectionPool(self.host, self.port,
+ source_address=addr, retries=False)
+ self.assertRaises(ProtocolError,
+ pool.request, 'GET', '/source_address')
+
+ def test_stream_keepalive(self):
+ x = 2
+
+ for _ in range(x):
+ response = self.pool.request(
+ 'GET',
+ '/chunked',
+ headers={
+ 'Connection': 'keep-alive',
+ },
+ preload_content=False,
+ retries=False,
+ )
+ for chunk in response.stream():
+ self.assertEqual(chunk, b'123')
+
+ self.assertEqual(self.pool.num_connections, 1)
+ self.assertEqual(self.pool.num_requests, x)
+
+ def test_chunked_gzip(self):
+ response = self.pool.request(
+ 'GET',
+ '/chunked_gzip',
+ preload_content=False,
+ decode_content=True,
+ )
+
+ self.assertEqual(b'123' * 4, response.read())
+
+ def test_cleanup_on_connection_error(self):
+ '''
+ Test that connections are recycled to the pool on
+ connection errors where no http response is received.
+ '''
+ poolsize = 3
+ with HTTPConnectionPool(self.host, self.port, maxsize=poolsize, block=True) as http:
+ self.assertEqual(http.pool.qsize(), poolsize)
+
+ # force a connection error by supplying a non-existent
+ # url. We won't get a response for this and so the
+ # conn won't be implicitly returned to the pool.
+ self.assertRaises(MaxRetryError,
+ http.request, 'GET', '/redirect', fields={'target': '/'}, release_conn=False, retries=0)
+
+ r = http.request('GET', '/redirect', fields={'target': '/'}, release_conn=False, retries=1)
+ r.release_conn()
+
+ # the pool should still contain poolsize elements
+ self.assertEqual(http.pool.qsize(), http.pool.maxsize)
+
+
+class TestRetry(HTTPDummyServerTestCase):
+ def setUp(self):
+ self.pool = HTTPConnectionPool(self.host, self.port)
+
+ def test_max_retry(self):
+ try:
+ r = self.pool.request('GET', '/redirect',
+ fields={'target': '/'},
+ retries=0)
+ self.fail("Failed to raise MaxRetryError exception, returned %r" % r.status)
+ except MaxRetryError:
+ pass
+
+ def test_disabled_retry(self):
+ """ Disabled retries should disable redirect handling. """
+ r = self.pool.request('GET', '/redirect',
+ fields={'target': '/'},
+ retries=False)
+ self.assertEqual(r.status, 303)
+
+ r = self.pool.request('GET', '/redirect',
+ fields={'target': '/'},
+ retries=Retry(redirect=False))
+ self.assertEqual(r.status, 303)
+
+ pool = HTTPConnectionPool('thishostdoesnotexist.invalid', self.port, timeout=0.001)
+ self.assertRaises(ProtocolError, pool.request, 'GET', '/test', retries=False)
+
+ def test_read_retries(self):
+ """ Should retry for status codes in the whitelist """
+ retry = Retry(read=1, status_forcelist=[418])
+ resp = self.pool.request('GET', '/successful_retry',
+ headers={'test-name': 'test_read_retries'},
+ retries=retry)
+ self.assertEqual(resp.status, 200)
+
+ def test_read_total_retries(self):
+ """ HTTP response w/ status code in the whitelist should be retried """
+ headers = {'test-name': 'test_read_total_retries'}
+ retry = Retry(total=1, status_forcelist=[418])
+ resp = self.pool.request('GET', '/successful_retry',
+ headers=headers, retries=retry)
+ self.assertEqual(resp.status, 200)
+
+ def test_retries_wrong_whitelist(self):
+ """HTTP response w/ status code not in whitelist shouldn't be retried"""
+ retry = Retry(total=1, status_forcelist=[202])
+ resp = self.pool.request('GET', '/successful_retry',
+ headers={'test-name': 'test_wrong_whitelist'},
+ retries=retry)
+ self.assertEqual(resp.status, 418)
+
+ def test_default_method_whitelist_retried(self):
+ """ urllib3 should retry methods in the default method whitelist """
+ retry = Retry(total=1, status_forcelist=[418])
+ resp = self.pool.request('OPTIONS', '/successful_retry',
+ headers={'test-name': 'test_default_whitelist'},
+ retries=retry)
+ self.assertEqual(resp.status, 200)
+
+ def test_retries_wrong_method_list(self):
+ """Method not in our whitelist should not be retried, even if code matches"""
+ headers = {'test-name': 'test_wrong_method_whitelist'}
+ retry = Retry(total=1, status_forcelist=[418],
+ method_whitelist=['POST'])
+ resp = self.pool.request('GET', '/successful_retry',
+ headers=headers, retries=retry)
+ self.assertEqual(resp.status, 418)
+
+ def test_read_retries_unsuccessful(self):
+ headers = {'test-name': 'test_read_retries_unsuccessful'}
+ resp = self.pool.request('GET', '/successful_retry',
+ headers=headers, retries=1)
+ self.assertEqual(resp.status, 418)
+
+ def test_retry_reuse_safe(self):
+ """ It should be possible to reuse a Retry object across requests """
+ headers = {'test-name': 'test_retry_safe'}
+ retry = Retry(total=1, status_forcelist=[418])
+ resp = self.pool.request('GET', '/successful_retry',
+ headers=headers, retries=retry)
+ self.assertEqual(resp.status, 200)
+ resp = self.pool.request('GET', '/successful_retry',
+ headers=headers, retries=retry)
+ self.assertEqual(resp.status, 200)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/with_dummyserver/test_https.py b/test/with_dummyserver/test_https.py
new file mode 100644
index 0000000..63aea66
--- /dev/null
+++ b/test/with_dummyserver/test_https.py
@@ -0,0 +1,444 @@
+import datetime
+import logging
+import ssl
+import sys
+import unittest
+import warnings
+
+import mock
+from nose.plugins.skip import SkipTest
+
+from dummyserver.testcase import HTTPSDummyServerTestCase
+from dummyserver.server import (DEFAULT_CA, DEFAULT_CA_BAD, DEFAULT_CERTS,
+ NO_SAN_CERTS, NO_SAN_CA)
+
+from test import (
+ onlyPy26OrOlder,
+ requires_network,
+ TARPIT_HOST,
+ clear_warnings,
+)
+from urllib3 import HTTPSConnectionPool
+from urllib3.connection import (
+ VerifiedHTTPSConnection,
+ UnverifiedHTTPSConnection,
+ RECENT_DATE,
+)
+from urllib3.exceptions import (
+ SSLError,
+ ReadTimeoutError,
+ ConnectTimeoutError,
+ InsecureRequestWarning,
+ SystemTimeWarning,
+ InsecurePlatformWarning,
+)
+from urllib3.packages import six
+from urllib3.util.timeout import Timeout
+
+
+ResourceWarning = getattr(
+ six.moves.builtins,
+ 'ResourceWarning', type('ResourceWarning', (), {}))
+
+
+log = logging.getLogger('urllib3.connectionpool')
+log.setLevel(logging.NOTSET)
+log.addHandler(logging.StreamHandler(sys.stdout))
+
+
+
+class TestHTTPS(HTTPSDummyServerTestCase):
+ def setUp(self):
+ self._pool = HTTPSConnectionPool(self.host, self.port)
+
+ def test_simple(self):
+ r = self._pool.request('GET', '/')
+ self.assertEqual(r.status, 200, r.data)
+
+ def test_set_ssl_version_to_tlsv1(self):
+ self._pool.ssl_version = ssl.PROTOCOL_TLSv1
+ r = self._pool.request('GET', '/')
+ self.assertEqual(r.status, 200, r.data)
+
+ def test_verified(self):
+ https_pool = HTTPSConnectionPool(self.host, self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ conn = https_pool._new_conn()
+ self.assertEqual(conn.__class__, VerifiedHTTPSConnection)
+
+ with mock.patch('warnings.warn') as warn:
+ r = https_pool.request('GET', '/')
+ self.assertEqual(r.status, 200)
+
+ if sys.version_info >= (2, 7, 9):
+ self.assertFalse(warn.called, warn.call_args_list)
+ else:
+ self.assertTrue(warn.called)
+ call, = warn.call_args_list
+ error = call[0][1]
+ self.assertEqual(error, InsecurePlatformWarning)
+
+ def test_invalid_common_name(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+ try:
+ https_pool.request('GET', '/')
+ self.fail("Didn't raise SSL invalid common name")
+ except SSLError as e:
+ self.assertTrue("doesn't match" in str(e))
+
+ def test_verified_with_bad_ca_certs(self):
+ https_pool = HTTPSConnectionPool(self.host, self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA_BAD)
+
+ try:
+ https_pool.request('GET', '/')
+ self.fail("Didn't raise SSL error with bad CA certs")
+ except SSLError as e:
+ self.assertTrue('certificate verify failed' in str(e),
+ "Expected 'certificate verify failed',"
+ "instead got: %r" % e)
+
+ def test_verified_without_ca_certs(self):
+ # default is cert_reqs=None which is ssl.CERT_NONE
+ https_pool = HTTPSConnectionPool(self.host, self.port,
+ cert_reqs='CERT_REQUIRED')
+
+ try:
+ https_pool.request('GET', '/')
+ self.fail("Didn't raise SSL error with no CA certs when"
+ "CERT_REQUIRED is set")
+ except SSLError as e:
+ # there is a different error message depending on whether or
+ # not pyopenssl is injected
+ self.assertTrue('No root certificates specified' in str(e) or
+ 'certificate verify failed' in str(e),
+ "Expected 'No root certificates specified' or "
+ "'certificate verify failed', "
+ "instead got: %r" % e)
+
+ def test_no_ssl(self):
+ pool = HTTPSConnectionPool(self.host, self.port)
+ pool.ConnectionCls = None
+ self.assertRaises(SSLError, pool._new_conn)
+ self.assertRaises(SSLError, pool.request, 'GET', '/')
+
+ def test_unverified_ssl(self):
+ """ Test that bare HTTPSConnection can connect, make requests """
+ pool = HTTPSConnectionPool(self.host, self.port)
+ pool.ConnectionCls = UnverifiedHTTPSConnection
+
+ with mock.patch('warnings.warn') as warn:
+ r = pool.request('GET', '/')
+ self.assertEqual(r.status, 200)
+ self.assertTrue(warn.called)
+
+ call, = warn.call_args_list
+ category = call[0][1]
+ self.assertEqual(category, InsecureRequestWarning)
+
+ def test_ssl_unverified_with_ca_certs(self):
+ pool = HTTPSConnectionPool(self.host, self.port,
+ cert_reqs='CERT_NONE',
+ ca_certs=DEFAULT_CA_BAD)
+
+ with mock.patch('warnings.warn') as warn:
+ r = pool.request('GET', '/')
+ self.assertEqual(r.status, 200)
+ self.assertTrue(warn.called)
+
+ calls = warn.call_args_list
+ if sys.version_info >= (2, 7, 9):
+ category = calls[0][0][1]
+ else:
+ category = calls[1][0][1]
+ self.assertEqual(category, InsecureRequestWarning)
+
+ @requires_network
+ def test_ssl_verified_with_platform_ca_certs(self):
+ """
+ We should rely on the platform CA file to validate authenticity of SSL
+ certificates. Since this file is used by many components of the OS,
+ such as curl, apt-get, etc., we decided to not touch it, in order to
+ not compromise the security of the OS running the test suite (typically
+ urllib3 developer's OS).
+
+ This test assumes that httpbin.org uses a certificate signed by a well
+ known Certificate Authority.
+ """
+ try:
+ import urllib3.contrib.pyopenssl
+ except ImportError:
+ raise SkipTest('Test requires PyOpenSSL')
+ if (urllib3.connection.ssl_wrap_socket is
+ urllib3.contrib.pyopenssl.orig_connection_ssl_wrap_socket):
+ # Not patched
+ raise SkipTest('Test should only be run after PyOpenSSL '
+ 'monkey patching')
+
+ https_pool = HTTPSConnectionPool('httpbin.org', 443,
+ cert_reqs=ssl.CERT_REQUIRED)
+
+ https_pool.request('HEAD', '/')
+
+ def test_assert_hostname_false(self):
+ https_pool = HTTPSConnectionPool('localhost', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ https_pool.assert_hostname = False
+ https_pool.request('GET', '/')
+
+ def test_assert_specific_hostname(self):
+ https_pool = HTTPSConnectionPool('localhost', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ https_pool.assert_hostname = 'localhost'
+ https_pool.request('GET', '/')
+
+ def test_assert_fingerprint_md5(self):
+ https_pool = HTTPSConnectionPool('localhost', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ https_pool.assert_fingerprint = 'CA:84:E1:AD0E5a:ef:2f:C3:09' \
+ ':E7:30:F8:CD:C8:5B'
+ https_pool.request('GET', '/')
+
+ def test_assert_fingerprint_sha1(self):
+ https_pool = HTTPSConnectionPool('localhost', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ https_pool.assert_fingerprint = 'CC:45:6A:90:82:F7FF:C0:8218:8e:' \
+ '7A:F2:8A:D7:1E:07:33:67:DE'
+ https_pool.request('GET', '/')
+
+ def test_assert_fingerprint_sha256(self):
+ https_pool = HTTPSConnectionPool('localhost', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ https_pool.assert_fingerprint = ('9A:29:9D:4F:47:85:1C:51:23:F5:9A:A3:'
+ '0F:5A:EF:96:F9:2E:3C:22:2E:FC:E8:BC:'
+ '0E:73:90:37:ED:3B:AA:AB')
+ https_pool.request('GET', '/')
+
+ def test_assert_invalid_fingerprint(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ https_pool.assert_fingerprint = 'AA:AA:AA:AA:AA:AAAA:AA:AAAA:AA:' \
+ 'AA:AA:AA:AA:AA:AA:AA:AA:AA'
+
+ self.assertRaises(SSLError, https_pool.request, 'GET', '/')
+ https_pool._get_conn()
+
+ # Uneven length
+ https_pool.assert_fingerprint = 'AA:A'
+ self.assertRaises(SSLError, https_pool.request, 'GET', '/')
+ https_pool._get_conn()
+
+ # Invalid length
+ https_pool.assert_fingerprint = 'AA'
+ self.assertRaises(SSLError, https_pool.request, 'GET', '/')
+
+ def test_verify_none_and_bad_fingerprint(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_NONE',
+ ca_certs=DEFAULT_CA_BAD)
+
+ https_pool.assert_fingerprint = 'AA:AA:AA:AA:AA:AAAA:AA:AAAA:AA:' \
+ 'AA:AA:AA:AA:AA:AA:AA:AA:AA'
+ self.assertRaises(SSLError, https_pool.request, 'GET', '/')
+
+ def test_verify_none_and_good_fingerprint(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_NONE',
+ ca_certs=DEFAULT_CA_BAD)
+
+ https_pool.assert_fingerprint = 'CC:45:6A:90:82:F7FF:C0:8218:8e:' \
+ '7A:F2:8A:D7:1E:07:33:67:DE'
+ https_pool.request('GET', '/')
+
+ def test_good_fingerprint_and_hostname_mismatch(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ https_pool.assert_fingerprint = 'CC:45:6A:90:82:F7FF:C0:8218:8e:' \
+ '7A:F2:8A:D7:1E:07:33:67:DE'
+ https_pool.request('GET', '/')
+
+ @requires_network
+ def test_https_timeout(self):
+ timeout = Timeout(connect=0.001)
+ https_pool = HTTPSConnectionPool(TARPIT_HOST, self.port,
+ timeout=timeout, retries=False,
+ cert_reqs='CERT_REQUIRED')
+
+ timeout = Timeout(total=None, connect=0.001)
+ https_pool = HTTPSConnectionPool(TARPIT_HOST, self.port,
+ timeout=timeout, retries=False,
+ cert_reqs='CERT_REQUIRED')
+ self.assertRaises(ConnectTimeoutError, https_pool.request, 'GET', '/')
+
+ timeout = Timeout(read=0.001)
+ https_pool = HTTPSConnectionPool(self.host, self.port,
+ timeout=timeout, retries=False,
+ cert_reqs='CERT_REQUIRED')
+ https_pool.ca_certs = DEFAULT_CA
+ https_pool.assert_fingerprint = 'CC:45:6A:90:82:F7FF:C0:8218:8e:' \
+ '7A:F2:8A:D7:1E:07:33:67:DE'
+ url = '/sleep?seconds=0.005'
+ self.assertRaises(ReadTimeoutError, https_pool.request, 'GET', url)
+
+ timeout = Timeout(total=None)
+ https_pool = HTTPSConnectionPool(self.host, self.port, timeout=timeout,
+ cert_reqs='CERT_NONE')
+ https_pool.request('GET', '/')
+
+ def test_tunnel(self):
+ """ test the _tunnel behavior """
+ timeout = Timeout(total=None)
+ https_pool = HTTPSConnectionPool(self.host, self.port, timeout=timeout,
+ cert_reqs='CERT_NONE')
+ conn = https_pool._new_conn()
+ try:
+ conn.set_tunnel(self.host, self.port)
+ except AttributeError: # python 2.6
+ conn._set_tunnel(self.host, self.port)
+ conn._tunnel = mock.Mock()
+ https_pool._make_request(conn, 'GET', '/')
+ conn._tunnel.assert_called_once_with()
+
+ @onlyPy26OrOlder
+ def test_tunnel_old_python(self):
+ """HTTPSConnection can still make connections if _tunnel_host isn't set
+
+ The _tunnel_host attribute was added in 2.6.3 - because our test runners
+ generally use the latest Python 2.6, we simulate the old version by
+ deleting the attribute from the HTTPSConnection.
+ """
+ conn = self._pool._new_conn()
+ del conn._tunnel_host
+ self._pool._make_request(conn, 'GET', '/')
+
+ @requires_network
+ def test_enhanced_timeout(self):
+ def new_pool(timeout, cert_reqs='CERT_REQUIRED'):
+ https_pool = HTTPSConnectionPool(TARPIT_HOST, self.port,
+ timeout=timeout,
+ retries=False,
+ cert_reqs=cert_reqs)
+ return https_pool
+
+ https_pool = new_pool(Timeout(connect=0.001))
+ conn = https_pool._new_conn()
+ self.assertRaises(ConnectTimeoutError, https_pool.request, 'GET', '/')
+ self.assertRaises(ConnectTimeoutError, https_pool._make_request, conn,
+ 'GET', '/')
+
+ https_pool = new_pool(Timeout(connect=5))
+ self.assertRaises(ConnectTimeoutError, https_pool.request, 'GET', '/',
+ timeout=Timeout(connect=0.001))
+
+ t = Timeout(total=None)
+ https_pool = new_pool(t)
+ conn = https_pool._new_conn()
+ self.assertRaises(ConnectTimeoutError, https_pool.request, 'GET', '/',
+ timeout=Timeout(total=None, connect=0.001))
+
+ def test_enhanced_ssl_connection(self):
+ fingerprint = 'CC:45:6A:90:82:F7FF:C0:8218:8e:7A:F2:8A:D7:1E:07:33:67:DE'
+
+ conn = VerifiedHTTPSConnection(self.host, self.port)
+ https_pool = HTTPSConnectionPool(self.host, self.port,
+ cert_reqs='CERT_REQUIRED', ca_certs=DEFAULT_CA,
+ assert_fingerprint=fingerprint)
+
+ https_pool._make_request(conn, 'GET', '/')
+
+ def test_ssl_correct_system_time(self):
+ self._pool.cert_reqs = 'CERT_REQUIRED'
+ self._pool.ca_certs = DEFAULT_CA
+
+ w = self._request_without_resource_warnings('GET', '/')
+ self.assertEqual([], w)
+
+ def test_ssl_wrong_system_time(self):
+ self._pool.cert_reqs = 'CERT_REQUIRED'
+ self._pool.ca_certs = DEFAULT_CA
+ with mock.patch('urllib3.connection.datetime') as mock_date:
+ mock_date.date.today.return_value = datetime.date(1970, 1, 1)
+
+ w = self._request_without_resource_warnings('GET', '/')
+
+ self.assertEqual(len(w), 1)
+ warning = w[0]
+
+ self.assertEqual(SystemTimeWarning, warning.category)
+ self.assertTrue(str(RECENT_DATE) in warning.message.args[0])
+
+ def _request_without_resource_warnings(self, method, url):
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter('always')
+ self._pool.request(method, url)
+
+ return [x for x in w if not isinstance(x.message, ResourceWarning)]
+
+
+class TestHTTPS_TLSv1(HTTPSDummyServerTestCase):
+ certs = DEFAULT_CERTS.copy()
+ certs['ssl_version'] = ssl.PROTOCOL_TLSv1
+
+ def setUp(self):
+ self._pool = HTTPSConnectionPool(self.host, self.port)
+
+ def test_set_ssl_version_to_sslv3(self):
+ self._pool.ssl_version = ssl.PROTOCOL_SSLv3
+ self.assertRaises(SSLError, self._pool.request, 'GET', '/')
+
+ def test_ssl_version_as_string(self):
+ self._pool.ssl_version = 'PROTOCOL_SSLv3'
+ self.assertRaises(SSLError, self._pool.request, 'GET', '/')
+
+ def test_ssl_version_as_short_string(self):
+ self._pool.ssl_version = 'SSLv3'
+ self.assertRaises(SSLError, self._pool.request, 'GET', '/')
+
+ def test_discards_connection_on_sslerror(self):
+ self._pool.cert_reqs = 'CERT_REQUIRED'
+ self.assertRaises(SSLError, self._pool.request, 'GET', '/')
+ self._pool.ca_certs = DEFAULT_CA
+ self._pool.request('GET', '/')
+
+ def test_set_cert_default_cert_required(self):
+ conn = VerifiedHTTPSConnection(self.host, self.port)
+ conn.set_cert(ca_certs='/etc/ssl/certs/custom.pem')
+ self.assertEqual(conn.cert_reqs, 'CERT_REQUIRED')
+
+
+class TestHTTPS_NoSAN(HTTPSDummyServerTestCase):
+ certs = NO_SAN_CERTS
+
+ def test_warning_for_certs_without_a_san(self):
+ """Ensure that a warning is raised when the cert from the server has
+ no Subject Alternative Name."""
+ with mock.patch('warnings.warn') as warn:
+ https_pool = HTTPSConnectionPool(self.host, self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=NO_SAN_CA)
+ r = https_pool.request('GET', '/')
+ self.assertEqual(r.status, 200)
+ self.assertTrue(warn.called)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/with_dummyserver/test_no_ssl.py b/test/with_dummyserver/test_no_ssl.py
new file mode 100644
index 0000000..f266d49
--- /dev/null
+++ b/test/with_dummyserver/test_no_ssl.py
@@ -0,0 +1,29 @@
+"""
+Test connections without the builtin ssl module
+
+Note: Import urllib3 inside the test functions to get the importblocker to work
+"""
+from ..test_no_ssl import TestWithoutSSL
+
+from dummyserver.testcase import (
+ HTTPDummyServerTestCase, HTTPSDummyServerTestCase)
+
+
+class TestHTTPWithoutSSL(HTTPDummyServerTestCase, TestWithoutSSL):
+ def test_simple(self):
+ import urllib3
+
+ pool = urllib3.HTTPConnectionPool(self.host, self.port)
+ r = pool.request('GET', '/')
+ self.assertEqual(r.status, 200, r.data)
+
+
+class TestHTTPSWithoutSSL(HTTPSDummyServerTestCase, TestWithoutSSL):
+ def test_simple(self):
+ import urllib3
+
+ pool = urllib3.HTTPSConnectionPool(self.host, self.port)
+ try:
+ pool.request('GET', '/')
+ except urllib3.exceptions.SSLError as e:
+ self.assertTrue('SSL module is not available' in str(e))
diff --git a/test/with_dummyserver/test_poolmanager.py b/test/with_dummyserver/test_poolmanager.py
new file mode 100644
index 0000000..099ac52
--- /dev/null
+++ b/test/with_dummyserver/test_poolmanager.py
@@ -0,0 +1,178 @@
+import unittest
+import json
+
+from nose.plugins.skip import SkipTest
+from dummyserver.server import HAS_IPV6
+from dummyserver.testcase import (HTTPDummyServerTestCase,
+ IPv6HTTPDummyServerTestCase)
+from urllib3.poolmanager import PoolManager
+from urllib3.connectionpool import port_by_scheme
+from urllib3.exceptions import MaxRetryError, SSLError
+from urllib3.util.retry import Retry
+
+
+class TestPoolManager(HTTPDummyServerTestCase):
+
+ def setUp(self):
+ self.base_url = 'http://%s:%d' % (self.host, self.port)
+ self.base_url_alt = 'http://%s:%d' % (self.host_alt, self.port)
+
+ def test_redirect(self):
+ http = PoolManager()
+
+ r = http.request('GET', '%s/redirect' % self.base_url,
+ fields={'target': '%s/' % self.base_url},
+ redirect=False)
+
+ self.assertEqual(r.status, 303)
+
+ r = http.request('GET', '%s/redirect' % self.base_url,
+ fields={'target': '%s/' % self.base_url})
+
+ self.assertEqual(r.status, 200)
+ self.assertEqual(r.data, b'Dummy server!')
+
+ def test_redirect_twice(self):
+ http = PoolManager()
+
+ r = http.request('GET', '%s/redirect' % self.base_url,
+ fields={'target': '%s/redirect' % self.base_url},
+ redirect=False)
+
+ self.assertEqual(r.status, 303)
+
+ r = http.request('GET', '%s/redirect' % self.base_url,
+ fields={'target': '%s/redirect?target=%s/' % (self.base_url, self.base_url)})
+
+ self.assertEqual(r.status, 200)
+ self.assertEqual(r.data, b'Dummy server!')
+
+ def test_redirect_to_relative_url(self):
+ http = PoolManager()
+
+ r = http.request('GET', '%s/redirect' % self.base_url,
+ fields = {'target': '/redirect'},
+ redirect = False)
+
+ self.assertEqual(r.status, 303)
+
+ r = http.request('GET', '%s/redirect' % self.base_url,
+ fields = {'target': '/redirect'})
+
+ self.assertEqual(r.status, 200)
+ self.assertEqual(r.data, b'Dummy server!')
+
+ def test_cross_host_redirect(self):
+ http = PoolManager()
+
+ cross_host_location = '%s/echo?a=b' % self.base_url_alt
+ try:
+ http.request('GET', '%s/redirect' % self.base_url,
+ fields={'target': cross_host_location},
+ timeout=0.01, retries=0)
+ self.fail("Request succeeded instead of raising an exception like it should.")
+
+ except MaxRetryError:
+ pass
+
+ r = http.request('GET', '%s/redirect' % self.base_url,
+ fields={'target': '%s/echo?a=b' % self.base_url_alt},
+ timeout=0.01, retries=1)
+
+ self.assertEqual(r._pool.host, self.host_alt)
+
+ def test_too_many_redirects(self):
+ http = PoolManager()
+
+ try:
+ r = http.request('GET', '%s/redirect' % self.base_url,
+ fields={'target': '%s/redirect?target=%s/' % (self.base_url, self.base_url)},
+ retries=1)
+ self.fail("Failed to raise MaxRetryError exception, returned %r" % r.status)
+ except MaxRetryError:
+ pass
+
+ try:
+ r = http.request('GET', '%s/redirect' % self.base_url,
+ fields={'target': '%s/redirect?target=%s/' % (self.base_url, self.base_url)},
+ retries=Retry(total=None, redirect=1))
+ self.fail("Failed to raise MaxRetryError exception, returned %r" % r.status)
+ except MaxRetryError:
+ pass
+
+ def test_raise_on_redirect(self):
+ http = PoolManager()
+
+ r = http.request('GET', '%s/redirect' % self.base_url,
+ fields={'target': '%s/redirect?target=%s/' % (self.base_url, self.base_url)},
+ retries=Retry(total=None, redirect=1, raise_on_redirect=False))
+
+ self.assertEqual(r.status, 303)
+
+ def test_missing_port(self):
+ # Can a URL that lacks an explicit port like ':80' succeed, or
+ # will all such URLs fail with an error?
+
+ http = PoolManager()
+
+ # By globally adjusting `port_by_scheme` we pretend for a moment
+ # that HTTP's default port is not 80, but is the port at which
+ # our test server happens to be listening.
+ port_by_scheme['http'] = self.port
+ try:
+ r = http.request('GET', 'http://%s/' % self.host, retries=0)
+ finally:
+ port_by_scheme['http'] = 80
+
+ self.assertEqual(r.status, 200)
+ self.assertEqual(r.data, b'Dummy server!')
+
+ def test_headers(self):
+ http = PoolManager(headers={'Foo': 'bar'})
+
+ r = http.request('GET', '%s/headers' % self.base_url)
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), 'bar')
+
+ r = http.request('POST', '%s/headers' % self.base_url)
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), 'bar')
+
+ r = http.request_encode_url('GET', '%s/headers' % self.base_url)
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), 'bar')
+
+ r = http.request_encode_body('POST', '%s/headers' % self.base_url)
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), 'bar')
+
+ r = http.request_encode_url('GET', '%s/headers' % self.base_url, headers={'Baz': 'quux'})
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), None)
+ self.assertEqual(returned_headers.get('Baz'), 'quux')
+
+ r = http.request_encode_body('GET', '%s/headers' % self.base_url, headers={'Baz': 'quux'})
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), None)
+ self.assertEqual(returned_headers.get('Baz'), 'quux')
+
+ def test_http_with_ssl_keywords(self):
+ http = PoolManager(ca_certs='REQUIRED')
+
+ r = http.request('GET', 'http://%s:%s/' % (self.host, self.port))
+ self.assertEqual(r.status, 200)
+
+
+class TestIPv6PoolManager(IPv6HTTPDummyServerTestCase):
+ if not HAS_IPV6:
+ raise SkipTest("IPv6 is not supported on this system.")
+
+ def setUp(self):
+ self.base_url = 'http://[%s]:%d' % (self.host, self.port)
+
+ def test_ipv6(self):
+ http = PoolManager()
+ http.request('GET', self.base_url)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/with_dummyserver/test_proxy_poolmanager.py b/test/with_dummyserver/test_proxy_poolmanager.py
new file mode 100644
index 0000000..c593f2d
--- /dev/null
+++ b/test/with_dummyserver/test_proxy_poolmanager.py
@@ -0,0 +1,325 @@
+import json
+import socket
+import unittest
+
+from nose.tools import timed
+
+from dummyserver.testcase import HTTPDummyProxyTestCase, IPv6HTTPDummyProxyTestCase
+from dummyserver.server import (
+ DEFAULT_CA, DEFAULT_CA_BAD, get_unreachable_address)
+from .. import TARPIT_HOST
+
+from urllib3._collections import HTTPHeaderDict
+from urllib3.poolmanager import proxy_from_url, ProxyManager
+from urllib3.exceptions import (
+ MaxRetryError, SSLError, ProxyError, ConnectTimeoutError)
+from urllib3.connectionpool import connection_from_url, VerifiedHTTPSConnection
+
+
+class TestHTTPProxyManager(HTTPDummyProxyTestCase):
+
+ def setUp(self):
+ self.http_url = 'http://%s:%d' % (self.http_host, self.http_port)
+ self.http_url_alt = 'http://%s:%d' % (self.http_host_alt,
+ self.http_port)
+ self.https_url = 'https://%s:%d' % (self.https_host, self.https_port)
+ self.https_url_alt = 'https://%s:%d' % (self.https_host_alt,
+ self.https_port)
+ self.proxy_url = 'http://%s:%d' % (self.proxy_host, self.proxy_port)
+
+ def test_basic_proxy(self):
+ http = proxy_from_url(self.proxy_url)
+
+ r = http.request('GET', '%s/' % self.http_url)
+ self.assertEqual(r.status, 200)
+
+ r = http.request('GET', '%s/' % self.https_url)
+ self.assertEqual(r.status, 200)
+
+ def test_nagle_proxy(self):
+ """ Test that proxy connections do not have TCP_NODELAY turned on """
+ http = proxy_from_url(self.proxy_url)
+ hc2 = http.connection_from_host(self.http_host, self.http_port)
+ conn = hc2._get_conn()
+ hc2._make_request(conn, 'GET', '/')
+ tcp_nodelay_setting = conn.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
+ self.assertEqual(tcp_nodelay_setting, 0,
+ ("Expected TCP_NODELAY for proxies to be set "
+ "to zero, instead was %s" % tcp_nodelay_setting))
+
+ def test_proxy_conn_fail(self):
+ host, port = get_unreachable_address()
+ http = proxy_from_url('http://%s:%s/' % (host, port), retries=1, timeout=0.05)
+ self.assertRaises(MaxRetryError, http.request, 'GET',
+ '%s/' % self.https_url)
+ self.assertRaises(MaxRetryError, http.request, 'GET',
+ '%s/' % self.http_url)
+
+ try:
+ http.request('GET', '%s/' % self.http_url)
+ self.fail("Failed to raise retry error.")
+ except MaxRetryError as e:
+ self.assertEqual(type(e.reason), ProxyError)
+
+ def test_oldapi(self):
+ http = ProxyManager(connection_from_url(self.proxy_url))
+
+ r = http.request('GET', '%s/' % self.http_url)
+ self.assertEqual(r.status, 200)
+
+ r = http.request('GET', '%s/' % self.https_url)
+ self.assertEqual(r.status, 200)
+
+ def test_proxy_verified(self):
+ http = proxy_from_url(self.proxy_url, cert_reqs='REQUIRED',
+ ca_certs=DEFAULT_CA_BAD)
+ https_pool = http._new_pool('https', self.https_host,
+ self.https_port)
+ try:
+ https_pool.request('GET', '/')
+ self.fail("Didn't raise SSL error with wrong CA")
+ except SSLError as e:
+ self.assertTrue('certificate verify failed' in str(e),
+ "Expected 'certificate verify failed',"
+ "instead got: %r" % e)
+
+ http = proxy_from_url(self.proxy_url, cert_reqs='REQUIRED',
+ ca_certs=DEFAULT_CA)
+ https_pool = http._new_pool('https', self.https_host,
+ self.https_port)
+
+ conn = https_pool._new_conn()
+ self.assertEqual(conn.__class__, VerifiedHTTPSConnection)
+ https_pool.request('GET', '/') # Should succeed without exceptions.
+
+ http = proxy_from_url(self.proxy_url, cert_reqs='REQUIRED',
+ ca_certs=DEFAULT_CA)
+ https_fail_pool = http._new_pool('https', '127.0.0.1', self.https_port)
+
+ try:
+ https_fail_pool.request('GET', '/')
+ self.fail("Didn't raise SSL invalid common name")
+ except SSLError as e:
+ self.assertTrue("doesn't match" in str(e))
+
+ def test_redirect(self):
+ http = proxy_from_url(self.proxy_url)
+
+ r = http.request('GET', '%s/redirect' % self.http_url,
+ fields={'target': '%s/' % self.http_url},
+ redirect=False)
+
+ self.assertEqual(r.status, 303)
+
+ r = http.request('GET', '%s/redirect' % self.http_url,
+ fields={'target': '%s/' % self.http_url})
+
+ self.assertEqual(r.status, 200)
+ self.assertEqual(r.data, b'Dummy server!')
+
+ def test_cross_host_redirect(self):
+ http = proxy_from_url(self.proxy_url)
+
+ cross_host_location = '%s/echo?a=b' % self.http_url_alt
+ try:
+ http.request('GET', '%s/redirect' % self.http_url,
+ fields={'target': cross_host_location},
+ timeout=0.1, retries=0)
+ self.fail("We don't want to follow redirects here.")
+
+ except MaxRetryError:
+ pass
+
+ r = http.request('GET', '%s/redirect' % self.http_url,
+ fields={'target': '%s/echo?a=b' % self.http_url_alt},
+ timeout=0.1, retries=1)
+ self.assertNotEqual(r._pool.host, self.http_host_alt)
+
+ def test_cross_protocol_redirect(self):
+ http = proxy_from_url(self.proxy_url)
+
+ cross_protocol_location = '%s/echo?a=b' % self.https_url
+ try:
+ http.request('GET', '%s/redirect' % self.http_url,
+ fields={'target': cross_protocol_location},
+ timeout=0.1, retries=0)
+ self.fail("We don't want to follow redirects here.")
+
+ except MaxRetryError:
+ pass
+
+ r = http.request('GET', '%s/redirect' % self.http_url,
+ fields={'target': '%s/echo?a=b' % self.https_url},
+ timeout=0.1, retries=1)
+ self.assertEqual(r._pool.host, self.https_host)
+
+ def test_headers(self):
+ http = proxy_from_url(self.proxy_url,headers={'Foo': 'bar'},
+ proxy_headers={'Hickory': 'dickory'})
+
+ r = http.request_encode_url('GET', '%s/headers' % self.http_url)
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), 'bar')
+ self.assertEqual(returned_headers.get('Hickory'), 'dickory')
+ self.assertEqual(returned_headers.get('Host'),
+ '%s:%s'%(self.http_host,self.http_port))
+
+ r = http.request_encode_url('GET', '%s/headers' % self.http_url_alt)
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), 'bar')
+ self.assertEqual(returned_headers.get('Hickory'), 'dickory')
+ self.assertEqual(returned_headers.get('Host'),
+ '%s:%s'%(self.http_host_alt,self.http_port))
+
+ r = http.request_encode_url('GET', '%s/headers' % self.https_url)
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), 'bar')
+ self.assertEqual(returned_headers.get('Hickory'), None)
+ self.assertEqual(returned_headers.get('Host'),
+ '%s:%s'%(self.https_host,self.https_port))
+
+ r = http.request_encode_url('GET', '%s/headers' % self.https_url_alt)
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), 'bar')
+ self.assertEqual(returned_headers.get('Hickory'), None)
+ self.assertEqual(returned_headers.get('Host'),
+ '%s:%s'%(self.https_host_alt,self.https_port))
+
+ r = http.request_encode_body('POST', '%s/headers' % self.http_url)
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), 'bar')
+ self.assertEqual(returned_headers.get('Hickory'), 'dickory')
+ self.assertEqual(returned_headers.get('Host'),
+ '%s:%s'%(self.http_host,self.http_port))
+
+ r = http.request_encode_url('GET', '%s/headers' % self.http_url, headers={'Baz': 'quux'})
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), None)
+ self.assertEqual(returned_headers.get('Baz'), 'quux')
+ self.assertEqual(returned_headers.get('Hickory'), 'dickory')
+ self.assertEqual(returned_headers.get('Host'),
+ '%s:%s'%(self.http_host,self.http_port))
+
+ r = http.request_encode_url('GET', '%s/headers' % self.https_url, headers={'Baz': 'quux'})
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), None)
+ self.assertEqual(returned_headers.get('Baz'), 'quux')
+ self.assertEqual(returned_headers.get('Hickory'), None)
+ self.assertEqual(returned_headers.get('Host'),
+ '%s:%s'%(self.https_host,self.https_port))
+
+ r = http.request_encode_body('GET', '%s/headers' % self.http_url, headers={'Baz': 'quux'})
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), None)
+ self.assertEqual(returned_headers.get('Baz'), 'quux')
+ self.assertEqual(returned_headers.get('Hickory'), 'dickory')
+ self.assertEqual(returned_headers.get('Host'),
+ '%s:%s'%(self.http_host,self.http_port))
+
+ r = http.request_encode_body('GET', '%s/headers' % self.https_url, headers={'Baz': 'quux'})
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), None)
+ self.assertEqual(returned_headers.get('Baz'), 'quux')
+ self.assertEqual(returned_headers.get('Hickory'), None)
+ self.assertEqual(returned_headers.get('Host'),
+ '%s:%s'%(self.https_host,self.https_port))
+
+ def test_headerdict(self):
+ default_headers = HTTPHeaderDict(a='b')
+ proxy_headers = HTTPHeaderDict()
+ proxy_headers.add('foo', 'bar')
+
+ http = proxy_from_url(
+ self.proxy_url,
+ headers=default_headers,
+ proxy_headers=proxy_headers)
+
+ request_headers = HTTPHeaderDict(baz='quux')
+ r = http.request('GET', '%s/headers' % self.http_url, headers=request_headers)
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), 'bar')
+ self.assertEqual(returned_headers.get('Baz'), 'quux')
+
+ def test_proxy_pooling(self):
+ http = proxy_from_url(self.proxy_url)
+
+ for x in range(2):
+ r = http.urlopen('GET', self.http_url)
+ self.assertEqual(len(http.pools), 1)
+
+ for x in range(2):
+ r = http.urlopen('GET', self.http_url_alt)
+ self.assertEqual(len(http.pools), 1)
+
+ for x in range(2):
+ r = http.urlopen('GET', self.https_url)
+ self.assertEqual(len(http.pools), 2)
+
+ for x in range(2):
+ r = http.urlopen('GET', self.https_url_alt)
+ self.assertEqual(len(http.pools), 3)
+
+ def test_proxy_pooling_ext(self):
+ http = proxy_from_url(self.proxy_url)
+ hc1 = http.connection_from_url(self.http_url)
+ hc2 = http.connection_from_host(self.http_host, self.http_port)
+ hc3 = http.connection_from_url(self.http_url_alt)
+ hc4 = http.connection_from_host(self.http_host_alt, self.http_port)
+ self.assertEqual(hc1,hc2)
+ self.assertEqual(hc2,hc3)
+ self.assertEqual(hc3,hc4)
+
+ sc1 = http.connection_from_url(self.https_url)
+ sc2 = http.connection_from_host(self.https_host,
+ self.https_port,scheme='https')
+ sc3 = http.connection_from_url(self.https_url_alt)
+ sc4 = http.connection_from_host(self.https_host_alt,
+ self.https_port,scheme='https')
+ self.assertEqual(sc1,sc2)
+ self.assertNotEqual(sc2,sc3)
+ self.assertEqual(sc3,sc4)
+
+
+ @timed(0.5)
+ def test_https_proxy_timeout(self):
+ https = proxy_from_url('https://{host}'.format(host=TARPIT_HOST))
+ try:
+ https.request('GET', self.http_url, timeout=0.001)
+ self.fail("Failed to raise retry error.")
+ except MaxRetryError as e:
+ assert isinstance(e.reason, ConnectTimeoutError)
+
+
+ @timed(0.5)
+ def test_https_proxy_pool_timeout(self):
+ https = proxy_from_url('https://{host}'.format(host=TARPIT_HOST),
+ timeout=0.001)
+ try:
+ https.request('GET', self.http_url)
+ self.fail("Failed to raise retry error.")
+ except MaxRetryError as e:
+ assert isinstance(e.reason, ConnectTimeoutError)
+
+
+class TestIPv6HTTPProxyManager(IPv6HTTPDummyProxyTestCase):
+
+ def setUp(self):
+ self.http_url = 'http://%s:%d' % (self.http_host, self.http_port)
+ self.http_url_alt = 'http://%s:%d' % (self.http_host_alt,
+ self.http_port)
+ self.https_url = 'https://%s:%d' % (self.https_host, self.https_port)
+ self.https_url_alt = 'https://%s:%d' % (self.https_host_alt,
+ self.https_port)
+ self.proxy_url = 'http://[%s]:%d' % (self.proxy_host, self.proxy_port)
+
+ def test_basic_ipv6_proxy(self):
+ http = proxy_from_url(self.proxy_url)
+
+ r = http.request('GET', '%s/' % self.http_url)
+ self.assertEqual(r.status, 200)
+
+ r = http.request('GET', '%s/' % self.https_url)
+ self.assertEqual(r.status, 200)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/with_dummyserver/test_socketlevel.py b/test/with_dummyserver/test_socketlevel.py
new file mode 100644
index 0000000..5af00e0
--- /dev/null
+++ b/test/with_dummyserver/test_socketlevel.py
@@ -0,0 +1,795 @@
+# TODO: Break this module up into pieces. Maybe group by functionality tested
+# rather than the socket level-ness of it.
+
+from urllib3 import HTTPConnectionPool, HTTPSConnectionPool
+from urllib3.poolmanager import proxy_from_url
+from urllib3.exceptions import (
+ MaxRetryError,
+ ProxyError,
+ ReadTimeoutError,
+ SSLError,
+ ProtocolError,
+)
+from urllib3.response import httplib
+from urllib3.util.ssl_ import HAS_SNI
+from urllib3.util.timeout import Timeout
+from urllib3.util.retry import Retry
+from urllib3._collections import HTTPHeaderDict
+
+from dummyserver.testcase import SocketDummyServerTestCase
+from dummyserver.server import (
+ DEFAULT_CERTS, DEFAULT_CA, get_unreachable_address)
+
+from .. import onlyPy3, LogRecorder
+
+from nose.plugins.skip import SkipTest
+try:
+ from mimetools import Message as MimeToolMessage
+except ImportError:
+ class MimeToolMessage(object):
+ pass
+from threading import Event
+import socket
+import ssl
+
+
+class TestCookies(SocketDummyServerTestCase):
+
+ def test_multi_setcookie(self):
+ def multicookie_response_handler(listener):
+ sock = listener.accept()[0]
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += sock.recv(65536)
+
+ sock.send(b'HTTP/1.1 200 OK\r\n'
+ b'Set-Cookie: foo=1\r\n'
+ b'Set-Cookie: bar=1\r\n'
+ b'\r\n')
+ sock.close()
+
+ self._start_server(multicookie_response_handler)
+ pool = HTTPConnectionPool(self.host, self.port)
+ r = pool.request('GET', '/', retries=0)
+ self.assertEqual(r.headers, {'set-cookie': 'foo=1, bar=1'})
+ self.assertEqual(r.headers.getlist('set-cookie'), ['foo=1', 'bar=1'])
+
+
+class TestSNI(SocketDummyServerTestCase):
+
+ def test_hostname_in_first_request_packet(self):
+ if not HAS_SNI:
+ raise SkipTest('SNI-support not available')
+
+ done_receiving = Event()
+ self.buf = b''
+
+ def socket_handler(listener):
+ sock = listener.accept()[0]
+
+ self.buf = sock.recv(65536) # We only accept one packet
+ done_receiving.set() # let the test know it can proceed
+ sock.close()
+
+ self._start_server(socket_handler)
+ pool = HTTPSConnectionPool(self.host, self.port)
+ try:
+ pool.request('GET', '/', retries=0)
+ except SSLError: # We are violating the protocol
+ pass
+ done_receiving.wait()
+ self.assertTrue(self.host.encode() in self.buf,
+ "missing hostname in SSL handshake")
+
+
+class TestSocketClosing(SocketDummyServerTestCase):
+
+ def test_recovery_when_server_closes_connection(self):
+ # Does the pool work seamlessly if an open connection in the
+ # connection pool gets hung up on by the server, then reaches
+ # the front of the queue again?
+
+ done_closing = Event()
+
+ def socket_handler(listener):
+ for i in 0, 1:
+ sock = listener.accept()[0]
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf = sock.recv(65536)
+
+ body = 'Response %d' % i
+ sock.send(('HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n'
+ '%s' % (len(body), body)).encode('utf-8'))
+
+ sock.close() # simulate a server timing out, closing socket
+ done_closing.set() # let the test know it can proceed
+
+ self._start_server(socket_handler)
+ pool = HTTPConnectionPool(self.host, self.port)
+
+ response = pool.request('GET', '/', retries=0)
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.data, b'Response 0')
+
+ done_closing.wait() # wait until the socket in our pool gets closed
+
+ response = pool.request('GET', '/', retries=0)
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.data, b'Response 1')
+
+ def test_connection_refused(self):
+ # Does the pool retry if there is no listener on the port?
+ host, port = get_unreachable_address()
+ http = HTTPConnectionPool(host, port, maxsize=3, block=True)
+ self.assertRaises(MaxRetryError, http.request, 'GET', '/', retries=0, release_conn=False)
+ self.assertEqual(http.pool.qsize(), http.pool.maxsize)
+
+ def test_connection_read_timeout(self):
+ timed_out = Event()
+ def socket_handler(listener):
+ sock = listener.accept()[0]
+ while not sock.recv(65536).endswith(b'\r\n\r\n'):
+ pass
+
+ timed_out.wait()
+ sock.close()
+
+ self._start_server(socket_handler)
+ http = HTTPConnectionPool(self.host, self.port, timeout=0.001, retries=False, maxsize=3, block=True)
+
+ try:
+ self.assertRaises(ReadTimeoutError, http.request, 'GET', '/', release_conn=False)
+ finally:
+ timed_out.set()
+
+ self.assertEqual(http.pool.qsize(), http.pool.maxsize)
+
+ def test_https_connection_read_timeout(self):
+ """ Handshake timeouts should fail with a Timeout"""
+ timed_out = Event()
+ def socket_handler(listener):
+ sock = listener.accept()[0]
+ while not sock.recv(65536):
+ pass
+
+ timed_out.wait()
+ sock.close()
+
+ self._start_server(socket_handler)
+ pool = HTTPSConnectionPool(self.host, self.port, timeout=0.001, retries=False)
+ try:
+ self.assertRaises(ReadTimeoutError, pool.request, 'GET', '/')
+ finally:
+ timed_out.set()
+
+ def test_timeout_errors_cause_retries(self):
+ def socket_handler(listener):
+ sock_timeout = listener.accept()[0]
+
+ # Wait for a second request before closing the first socket.
+ sock = listener.accept()[0]
+ sock_timeout.close()
+
+ # Second request.
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += sock.recv(65536)
+
+ # Now respond immediately.
+ body = 'Response 2'
+ sock.send(('HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n'
+ '%s' % (len(body), body)).encode('utf-8'))
+
+ sock.close()
+
+ # In situations where the main thread throws an exception, the server
+ # thread can hang on an accept() call. This ensures everything times
+ # out within 1 second. This should be long enough for any socket
+ # operations in the test suite to complete
+ default_timeout = socket.getdefaulttimeout()
+ socket.setdefaulttimeout(1)
+
+ try:
+ self._start_server(socket_handler)
+ t = Timeout(connect=0.001, read=0.001)
+ pool = HTTPConnectionPool(self.host, self.port, timeout=t)
+
+ response = pool.request('GET', '/', retries=1)
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.data, b'Response 2')
+ finally:
+ socket.setdefaulttimeout(default_timeout)
+
+ def test_delayed_body_read_timeout(self):
+ timed_out = Event()
+
+ def socket_handler(listener):
+ sock = listener.accept()[0]
+ buf = b''
+ body = 'Hi'
+ while not buf.endswith(b'\r\n\r\n'):
+ buf = sock.recv(65536)
+ sock.send(('HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n' % len(body)).encode('utf-8'))
+
+ timed_out.wait()
+ sock.send(body.encode('utf-8'))
+ sock.close()
+
+ self._start_server(socket_handler)
+ pool = HTTPConnectionPool(self.host, self.port)
+
+ response = pool.urlopen('GET', '/', retries=0, preload_content=False,
+ timeout=Timeout(connect=1, read=0.001))
+ try:
+ self.assertRaises(ReadTimeoutError, response.read)
+ finally:
+ timed_out.set()
+
+ def test_incomplete_response(self):
+ body = 'Response'
+ partial_body = body[:2]
+
+ def socket_handler(listener):
+ sock = listener.accept()[0]
+
+ # Consume request
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf = sock.recv(65536)
+
+ # Send partial response and close socket.
+ sock.send((
+ 'HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n'
+ '%s' % (len(body), partial_body)).encode('utf-8')
+ )
+ sock.close()
+
+ self._start_server(socket_handler)
+ pool = HTTPConnectionPool(self.host, self.port)
+
+ response = pool.request('GET', '/', retries=0, preload_content=False)
+ self.assertRaises(ProtocolError, response.read)
+
+ def test_retry_weird_http_version(self):
+ """ Retry class should handle httplib.BadStatusLine errors properly """
+
+ def socket_handler(listener):
+ sock = listener.accept()[0]
+ # First request.
+ # Pause before responding so the first request times out.
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += sock.recv(65536)
+
+ # send unknown http protocol
+ body = "bad http 0.5 response"
+ sock.send(('HTTP/0.5 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n'
+ '%s' % (len(body), body)).encode('utf-8'))
+ sock.close()
+
+ # Second request.
+ sock = listener.accept()[0]
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += sock.recv(65536)
+
+ # Now respond immediately.
+ sock.send(('HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n'
+ 'foo' % (len('foo'))).encode('utf-8'))
+
+ sock.close() # Close the socket.
+
+ self._start_server(socket_handler)
+ pool = HTTPConnectionPool(self.host, self.port)
+ retry = Retry(read=1)
+ response = pool.request('GET', '/', retries=retry)
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.data, b'foo')
+
+ def test_connection_cleanup_on_read_timeout(self):
+ timed_out = Event()
+
+ def socket_handler(listener):
+ sock = listener.accept()[0]
+ buf = b''
+ body = 'Hi'
+ while not buf.endswith(b'\r\n\r\n'):
+ buf = sock.recv(65536)
+ sock.send(('HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n' % len(body)).encode('utf-8'))
+
+ timed_out.wait()
+ sock.close()
+
+ self._start_server(socket_handler)
+ with HTTPConnectionPool(self.host, self.port) as pool:
+ poolsize = pool.pool.qsize()
+ response = pool.urlopen('GET', '/', retries=0, preload_content=False,
+ timeout=Timeout(connect=1, read=0.001))
+ try:
+ self.assertRaises(ReadTimeoutError, response.read)
+ self.assertEqual(poolsize, pool.pool.qsize())
+ finally:
+ timed_out.set()
+
+ def test_connection_cleanup_on_protocol_error_during_read(self):
+ body = 'Response'
+ partial_body = body[:2]
+
+ def socket_handler(listener):
+ sock = listener.accept()[0]
+
+ # Consume request
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf = sock.recv(65536)
+
+ # Send partial response and close socket.
+ sock.send((
+ 'HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n'
+ '%s' % (len(body), partial_body)).encode('utf-8')
+ )
+ sock.close()
+
+ self._start_server(socket_handler)
+ with HTTPConnectionPool(self.host, self.port) as pool:
+ poolsize = pool.pool.qsize()
+ response = pool.request('GET', '/', retries=0, preload_content=False)
+
+ self.assertRaises(ProtocolError, response.read)
+ self.assertEqual(poolsize, pool.pool.qsize())
+
+
+class TestProxyManager(SocketDummyServerTestCase):
+
+ def test_simple(self):
+ def echo_socket_handler(listener):
+ sock = listener.accept()[0]
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += sock.recv(65536)
+
+ sock.send(('HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n'
+ '%s' % (len(buf), buf.decode('utf-8'))).encode('utf-8'))
+ sock.close()
+
+ self._start_server(echo_socket_handler)
+ base_url = 'http://%s:%d' % (self.host, self.port)
+ proxy = proxy_from_url(base_url)
+
+ r = proxy.request('GET', 'http://google.com/')
+
+ self.assertEqual(r.status, 200)
+ # FIXME: The order of the headers is not predictable right now. We
+ # should fix that someday (maybe when we migrate to
+ # OrderedDict/MultiDict).
+ self.assertEqual(sorted(r.data.split(b'\r\n')),
+ sorted([
+ b'GET http://google.com/ HTTP/1.1',
+ b'Host: google.com',
+ b'Accept-Encoding: identity',
+ b'Accept: */*',
+ b'',
+ b'',
+ ]))
+
+ def test_headers(self):
+ def echo_socket_handler(listener):
+ sock = listener.accept()[0]
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += sock.recv(65536)
+
+ sock.send(('HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n'
+ '%s' % (len(buf), buf.decode('utf-8'))).encode('utf-8'))
+ sock.close()
+
+ self._start_server(echo_socket_handler)
+ base_url = 'http://%s:%d' % (self.host, self.port)
+
+ # Define some proxy headers.
+ proxy_headers = HTTPHeaderDict({'For The Proxy': 'YEAH!'})
+ proxy = proxy_from_url(base_url, proxy_headers=proxy_headers)
+
+ conn = proxy.connection_from_url('http://www.google.com/')
+
+ r = conn.urlopen('GET', 'http://www.google.com/', assert_same_host=False)
+
+ self.assertEqual(r.status, 200)
+ # FIXME: The order of the headers is not predictable right now. We
+ # should fix that someday (maybe when we migrate to
+ # OrderedDict/MultiDict).
+ self.assertTrue(b'For The Proxy: YEAH!\r\n' in r.data)
+
+ def test_retries(self):
+ def echo_socket_handler(listener):
+ sock = listener.accept()[0]
+ # First request, which should fail
+ sock.close()
+
+ # Second request
+ sock = listener.accept()[0]
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += sock.recv(65536)
+
+ sock.send(('HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n'
+ '%s' % (len(buf), buf.decode('utf-8'))).encode('utf-8'))
+ sock.close()
+
+ self._start_server(echo_socket_handler)
+ base_url = 'http://%s:%d' % (self.host, self.port)
+
+ proxy = proxy_from_url(base_url)
+ conn = proxy.connection_from_url('http://www.google.com')
+
+ r = conn.urlopen('GET', 'http://www.google.com',
+ assert_same_host=False, retries=1)
+ self.assertEqual(r.status, 200)
+
+ self.assertRaises(ProxyError, conn.urlopen, 'GET',
+ 'http://www.google.com',
+ assert_same_host=False, retries=False)
+
+ def test_connect_reconn(self):
+ def proxy_ssl_one(listener):
+ sock = listener.accept()[0]
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += sock.recv(65536)
+ s = buf.decode('utf-8')
+ if not s.startswith('CONNECT '):
+ sock.send(('HTTP/1.1 405 Method not allowed\r\n'
+ 'Allow: CONNECT\r\n\r\n').encode('utf-8'))
+ sock.close()
+ return
+
+ if not s.startswith('CONNECT %s:443' % (self.host,)):
+ sock.send(('HTTP/1.1 403 Forbidden\r\n\r\n').encode('utf-8'))
+ sock.close()
+ return
+
+ sock.send(('HTTP/1.1 200 Connection Established\r\n\r\n').encode('utf-8'))
+ ssl_sock = ssl.wrap_socket(sock,
+ server_side=True,
+ keyfile=DEFAULT_CERTS['keyfile'],
+ certfile=DEFAULT_CERTS['certfile'],
+ ca_certs=DEFAULT_CA)
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += ssl_sock.recv(65536)
+
+ ssl_sock.send(('HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: 2\r\n'
+ 'Connection: close\r\n'
+ '\r\n'
+ 'Hi').encode('utf-8'))
+ ssl_sock.close()
+ def echo_socket_handler(listener):
+ proxy_ssl_one(listener)
+ proxy_ssl_one(listener)
+
+ self._start_server(echo_socket_handler)
+ base_url = 'http://%s:%d' % (self.host, self.port)
+
+ proxy = proxy_from_url(base_url)
+
+ url = 'https://{0}'.format(self.host)
+ conn = proxy.connection_from_url(url)
+ r = conn.urlopen('GET', url, retries=0)
+ self.assertEqual(r.status, 200)
+ r = conn.urlopen('GET', url, retries=0)
+ self.assertEqual(r.status, 200)
+
+
+class TestSSL(SocketDummyServerTestCase):
+
+ def test_ssl_failure_midway_through_conn(self):
+ def socket_handler(listener):
+ sock = listener.accept()[0]
+ sock2 = sock.dup()
+ ssl_sock = ssl.wrap_socket(sock,
+ server_side=True,
+ keyfile=DEFAULT_CERTS['keyfile'],
+ certfile=DEFAULT_CERTS['certfile'],
+ ca_certs=DEFAULT_CA)
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += ssl_sock.recv(65536)
+
+ # Deliberately send from the non-SSL socket.
+ sock2.send((
+ 'HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: 2\r\n'
+ '\r\n'
+ 'Hi').encode('utf-8'))
+ sock2.close()
+ ssl_sock.close()
+
+ self._start_server(socket_handler)
+ pool = HTTPSConnectionPool(self.host, self.port)
+
+ self.assertRaises(SSLError, pool.request, 'GET', '/', retries=0)
+
+ def test_ssl_read_timeout(self):
+ timed_out = Event()
+
+ def socket_handler(listener):
+ sock = listener.accept()[0]
+ ssl_sock = ssl.wrap_socket(sock,
+ server_side=True,
+ keyfile=DEFAULT_CERTS['keyfile'],
+ certfile=DEFAULT_CERTS['certfile'],
+ ca_certs=DEFAULT_CA)
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += ssl_sock.recv(65536)
+
+ # Send incomplete message (note Content-Length)
+ ssl_sock.send((
+ 'HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: 10\r\n'
+ '\r\n'
+ 'Hi-').encode('utf-8'))
+ timed_out.wait()
+
+ sock.close()
+ ssl_sock.close()
+
+ self._start_server(socket_handler)
+ pool = HTTPSConnectionPool(self.host, self.port)
+
+ response = pool.urlopen('GET', '/', retries=0, preload_content=False,
+ timeout=Timeout(connect=1, read=0.001))
+ try:
+ self.assertRaises(ReadTimeoutError, response.read)
+ finally:
+ timed_out.set()
+
+ def test_ssl_failed_fingerprint_verification(self):
+ def socket_handler(listener):
+ for i in range(2):
+ sock = listener.accept()[0]
+ ssl_sock = ssl.wrap_socket(sock,
+ server_side=True,
+ keyfile=DEFAULT_CERTS['keyfile'],
+ certfile=DEFAULT_CERTS['certfile'],
+ ca_certs=DEFAULT_CA)
+
+ ssl_sock.send(b'HTTP/1.1 200 OK\r\n'
+ b'Content-Type: text/plain\r\n'
+ b'Content-Length: 5\r\n\r\n'
+ b'Hello')
+
+ ssl_sock.close()
+ sock.close()
+
+ self._start_server(socket_handler)
+ # GitHub's fingerprint. Valid, but not matching.
+ fingerprint = ('A0:C4:A7:46:00:ED:A7:2D:C0:BE:CB'
+ ':9A:8C:B6:07:CA:58:EE:74:5E')
+
+ def request():
+ try:
+ pool = HTTPSConnectionPool(self.host, self.port,
+ assert_fingerprint=fingerprint)
+ response = pool.urlopen('GET', '/', preload_content=False,
+ timeout=Timeout(connect=1, read=0.001))
+ response.read()
+ finally:
+ pool.close()
+
+ self.assertRaises(SSLError, request)
+ # Should not hang, see https://github.com/shazow/urllib3/issues/529
+ self.assertRaises(SSLError, request)
+
+
+def consume_socket(sock, chunks=65536):
+ while not sock.recv(chunks).endswith(b'\r\n\r\n'):
+ pass
+
+
+def create_response_handler(response, num=1):
+ def socket_handler(listener):
+ for _ in range(num):
+ sock = listener.accept()[0]
+ consume_socket(sock)
+
+ sock.send(response)
+ sock.close()
+
+ return socket_handler
+
+
+class TestErrorWrapping(SocketDummyServerTestCase):
+
+ def test_bad_statusline(self):
+ handler = create_response_handler(
+ b'HTTP/1.1 Omg What Is This?\r\n'
+ b'Content-Length: 0\r\n'
+ b'\r\n'
+ )
+ self._start_server(handler)
+ pool = HTTPConnectionPool(self.host, self.port, retries=False)
+ self.assertRaises(ProtocolError, pool.request, 'GET', '/')
+
+ def test_unknown_protocol(self):
+ handler = create_response_handler(
+ b'HTTP/1000 200 OK\r\n'
+ b'Content-Length: 0\r\n'
+ b'\r\n'
+ )
+ self._start_server(handler)
+ pool = HTTPConnectionPool(self.host, self.port, retries=False)
+ self.assertRaises(ProtocolError, pool.request, 'GET', '/')
+
+class TestHeaders(SocketDummyServerTestCase):
+
+ @onlyPy3
+ def test_httplib_headers_case_insensitive(self):
+ handler = create_response_handler(
+ b'HTTP/1.1 200 OK\r\n'
+ b'Content-Length: 0\r\n'
+ b'Content-type: text/plain\r\n'
+ b'\r\n'
+ )
+ self._start_server(handler)
+ pool = HTTPConnectionPool(self.host, self.port, retries=False)
+ HEADERS = {'Content-Length': '0', 'Content-type': 'text/plain'}
+ r = pool.request('GET', '/')
+ self.assertEqual(HEADERS, dict(r.headers.items())) # to preserve case sensitivity
+
+ def test_headers_are_sent_with_the_original_case(self):
+ headers = {'foo': 'bar', 'bAz': 'quux'}
+ parsed_headers = {}
+
+ def socket_handler(listener):
+ sock = listener.accept()[0]
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += sock.recv(65536)
+
+ headers_list = [header for header in buf.split(b'\r\n')[1:] if header]
+
+ for header in headers_list:
+ (key, value) = header.split(b': ')
+ parsed_headers[key.decode()] = value.decode()
+
+ # Send incomplete message (note Content-Length)
+ sock.send((
+ 'HTTP/1.1 204 No Content\r\n'
+ 'Content-Length: 0\r\n'
+ '\r\n').encode('utf-8'))
+
+ sock.close()
+
+ self._start_server(socket_handler)
+ expected_headers = {'Accept-Encoding': 'identity',
+ 'Host': '{0}:{1}'.format(self.host, self.port)}
+ expected_headers.update(headers)
+
+ pool = HTTPConnectionPool(self.host, self.port, retries=False)
+ pool.request('GET', '/', headers=HTTPHeaderDict(headers))
+ self.assertEqual(expected_headers, parsed_headers)
+
+
+class TestBrokenHeaders(SocketDummyServerTestCase):
+ def setUp(self):
+ if issubclass(httplib.HTTPMessage, MimeToolMessage):
+ raise SkipTest('Header parsing errors not available')
+
+ super(TestBrokenHeaders, self).setUp()
+
+ def _test_broken_header_parsing(self, headers):
+ handler = create_response_handler((
+ b'HTTP/1.1 200 OK\r\n'
+ b'Content-Length: 0\r\n'
+ b'Content-type: text/plain\r\n'
+ ) + b'\r\n'.join(headers) + b'\r\n'
+ )
+
+ self._start_server(handler)
+ pool = HTTPConnectionPool(self.host, self.port, retries=False)
+
+ with LogRecorder() as logs:
+ pool.request('GET', '/')
+
+ for record in logs:
+ if 'Failed to parse headers' in record.msg and \
+ pool._absolute_url('/') == record.args[0]:
+ return
+ self.fail('Missing log about unparsed headers')
+
+ def test_header_without_name(self):
+ self._test_broken_header_parsing([
+ b': Value\r\n',
+ b'Another: Header\r\n',
+ ])
+
+ def test_header_without_name_or_value(self):
+ self._test_broken_header_parsing([
+ b':\r\n',
+ b'Another: Header\r\n',
+ ])
+
+ def test_header_without_colon_or_value(self):
+ self._test_broken_header_parsing([
+ b'Broken Header',
+ b'Another: Header',
+ ])
+
+
+class TestHEAD(SocketDummyServerTestCase):
+ def test_chunked_head_response_does_not_hang(self):
+ handler = create_response_handler(
+ b'HTTP/1.1 200 OK\r\n'
+ b'Transfer-Encoding: chunked\r\n'
+ b'Content-type: text/plain\r\n'
+ b'\r\n'
+ )
+ self._start_server(handler)
+ pool = HTTPConnectionPool(self.host, self.port, retries=False)
+ r = pool.request('HEAD', '/', timeout=1, preload_content=False)
+
+ # stream will use the read_chunked method here.
+ self.assertEqual([], list(r.stream()))
+
+ def test_empty_head_response_does_not_hang(self):
+ handler = create_response_handler(
+ b'HTTP/1.1 200 OK\r\n'
+ b'Content-Length: 256\r\n'
+ b'Content-type: text/plain\r\n'
+ b'\r\n'
+ )
+ self._start_server(handler)
+ pool = HTTPConnectionPool(self.host, self.port, retries=False)
+ r = pool.request('HEAD', '/', timeout=1, preload_content=False)
+
+ # stream will use the read method here.
+ self.assertEqual([], list(r.stream()))