aboutsummaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/__init__.py92
-rw-r--r--test/__init__.pycbin0 -> 3946 bytes
-rw-r--r--test/benchmark.py77
-rw-r--r--test/contrib/__init__.py0
-rw-r--r--test/contrib/__init__.pycbin0 -> 142 bytes
-rw-r--r--test/contrib/test_pyopenssl.py23
-rw-r--r--test/contrib/test_pyopenssl.pycbin0 -> 1143 bytes
-rw-r--r--test/port_helpers.py100
-rw-r--r--test/port_helpers.pycbin0 -> 5719 bytes
-rw-r--r--test/test_collections.py180
-rw-r--r--test/test_collections.pycbin0 -> 6842 bytes
-rw-r--r--test/test_compatibility.py23
-rw-r--r--test/test_compatibility.pycbin0 -> 1372 bytes
-rw-r--r--test/test_connectionpool.py211
-rw-r--r--test/test_connectionpool.pycbin0 -> 8862 bytes
-rw-r--r--test/test_exceptions.py46
-rw-r--r--test/test_exceptions.pycbin0 -> 1931 bytes
-rw-r--r--test/test_fields.py49
-rw-r--r--test/test_fields.pycbin0 -> 2739 bytes
-rw-r--r--test/test_filepost.py133
-rw-r--r--test/test_filepost.pycbin0 -> 4916 bytes
-rw-r--r--test/test_poolmanager.py76
-rw-r--r--test/test_poolmanager.pycbin0 -> 2499 bytes
-rw-r--r--test/test_proxymanager.py43
-rw-r--r--test/test_proxymanager.pycbin0 -> 1670 bytes
-rw-r--r--test/test_response.py398
-rw-r--r--test/test_response.pycbin0 -> 14619 bytes
-rw-r--r--test/test_retry.py156
-rw-r--r--test/test_retry.pycbin0 -> 6491 bytes
-rw-r--r--test/test_util.py357
-rw-r--r--test/test_util.pycbin0 -> 15036 bytes
-rw-r--r--test/with_dummyserver/__init__.py0
-rw-r--r--test/with_dummyserver/__init__.pycbin0 -> 151 bytes
-rw-r--r--test/with_dummyserver/test_connectionpool.py706
-rw-r--r--test/with_dummyserver/test_connectionpool.pycbin0 -> 27640 bytes
-rw-r--r--test/with_dummyserver/test_https.py374
-rw-r--r--test/with_dummyserver/test_https.pycbin0 -> 15651 bytes
-rw-r--r--test/with_dummyserver/test_poolmanager.py136
-rw-r--r--test/with_dummyserver/test_poolmanager.pycbin0 -> 5591 bytes
-rw-r--r--test/with_dummyserver/test_proxy_poolmanager.py263
-rw-r--r--test/with_dummyserver/test_proxy_poolmanager.pycbin0 -> 9891 bytes
-rw-r--r--test/with_dummyserver/test_socketlevel.py544
-rw-r--r--test/with_dummyserver/test_socketlevel.pycbin0 -> 18715 bytes
43 files changed, 3987 insertions, 0 deletions
diff --git a/test/__init__.py b/test/__init__.py
new file mode 100644
index 0000000..d56a4d3
--- /dev/null
+++ b/test/__init__.py
@@ -0,0 +1,92 @@
+import warnings
+import sys
+import errno
+import functools
+import socket
+
+from nose.plugins.skip import SkipTest
+
+from urllib3.exceptions import MaxRetryError, HTTPWarning
+from urllib3.packages import six
+
+# We need a host that will not immediately close the connection with a TCP
+# Reset. SO suggests this hostname
+TARPIT_HOST = '10.255.255.1'
+
+VALID_SOURCE_ADDRESSES = [('::1', 0), ('127.0.0.1', 0)]
+# RFC 5737: 192.0.2.0/24 is for testing only.
+# RFC 3849: 2001:db8::/32 is for documentation only.
+INVALID_SOURCE_ADDRESSES = [('192.0.2.255', 0), ('2001:db8::1', 0)]
+
+
+def clear_warnings(cls=HTTPWarning):
+ new_filters = []
+ for f in warnings.filters:
+ if issubclass(f[2], cls):
+ continue
+ new_filters.append(f)
+ warnings.filters[:] = new_filters
+
+def setUp():
+ clear_warnings()
+ warnings.simplefilter('ignore', HTTPWarning)
+
+
+def onlyPy26OrOlder(test):
+ """Skips this test unless you are on Python2.6.x or earlier."""
+
+ @functools.wraps(test)
+ def wrapper(*args, **kwargs):
+ msg = "{name} only runs on Python2.6.x or older".format(name=test.__name__)
+ if sys.version_info >= (2, 7):
+ raise SkipTest(msg)
+ return test(*args, **kwargs)
+ return wrapper
+
+def onlyPy27OrNewer(test):
+ """Skips this test unless you are on Python 2.7.x or later."""
+
+ @functools.wraps(test)
+ def wrapper(*args, **kwargs):
+ msg = "{name} requires Python 2.7.x+ to run".format(name=test.__name__)
+ if sys.version_info < (2, 7):
+ raise SkipTest(msg)
+ return test(*args, **kwargs)
+ return wrapper
+
+def onlyPy3(test):
+ """Skips this test unless you are on Python3.x"""
+
+ @functools.wraps(test)
+ def wrapper(*args, **kwargs):
+ msg = "{name} requires Python3.x to run".format(name=test.__name__)
+ if not six.PY3:
+ raise SkipTest(msg)
+ return test(*args, **kwargs)
+ return wrapper
+
+def requires_network(test):
+ """Helps you skip tests that require the network"""
+
+ def _is_unreachable_err(err):
+ return getattr(err, 'errno', None) in (errno.ENETUNREACH,
+ errno.EHOSTUNREACH) # For OSX
+
+ @functools.wraps(test)
+ def wrapper(*args, **kwargs):
+ msg = "Can't run {name} because the network is unreachable".format(
+ name=test.__name__)
+ try:
+ return test(*args, **kwargs)
+ except socket.error as e:
+ # This test needs an initial network connection to attempt the
+ # connection to the TARPIT_HOST. This fails if you are in a place
+ # without an Internet connection, so we skip the test in that case.
+ if _is_unreachable_err(e):
+ raise SkipTest(msg)
+ raise
+ except MaxRetryError as e:
+ if _is_unreachable_err(e.reason):
+ raise SkipTest(msg)
+ raise
+ return wrapper
diff --git a/test/__init__.pyc b/test/__init__.pyc
new file mode 100644
index 0000000..38b9317
--- /dev/null
+++ b/test/__init__.pyc
Binary files differ
diff --git a/test/benchmark.py b/test/benchmark.py
new file mode 100644
index 0000000..242e72f
--- /dev/null
+++ b/test/benchmark.py
@@ -0,0 +1,77 @@
+#!/usr/bin/env python
+
+"""
+Really simple rudimentary benchmark to compare ConnectionPool versus standard
+urllib to demonstrate the usefulness of connection re-using.
+"""
+from __future__ import print_function
+
+import sys
+import time
+import urllib
+
+sys.path.append('../')
+import urllib3
+
+
+# URLs to download. Doesn't matter as long as they're from the same host, so we
+# can take advantage of connection re-using.
+TO_DOWNLOAD = [
+ 'http://code.google.com/apis/apps/',
+ 'http://code.google.com/apis/base/',
+ 'http://code.google.com/apis/blogger/',
+ 'http://code.google.com/apis/calendar/',
+ 'http://code.google.com/apis/codesearch/',
+ 'http://code.google.com/apis/contact/',
+ 'http://code.google.com/apis/books/',
+ 'http://code.google.com/apis/documents/',
+ 'http://code.google.com/apis/finance/',
+ 'http://code.google.com/apis/health/',
+ 'http://code.google.com/apis/notebook/',
+ 'http://code.google.com/apis/picasaweb/',
+ 'http://code.google.com/apis/spreadsheets/',
+ 'http://code.google.com/apis/webmastertools/',
+ 'http://code.google.com/apis/youtube/',
+]
+
+
+def urllib_get(url_list):
+ assert url_list
+ for url in url_list:
+ now = time.time()
+ r = urllib.urlopen(url)
+ elapsed = time.time() - now
+ print("Got in %0.3f: %s" % (elapsed, url))
+
+
+def pool_get(url_list):
+ assert url_list
+ pool = urllib3.PoolManager()
+ for url in url_list:
+ now = time.time()
+ r = pool.request('GET', url, assert_same_host=False)
+ elapsed = time.time() - now
+ print("Got in %0.3fs: %s" % (elapsed, url))
+
+
+if __name__ == '__main__':
+ print("Running pool_get ...")
+ now = time.time()
+ pool_get(TO_DOWNLOAD)
+ pool_elapsed = time.time() - now
+
+ print("Running urllib_get ...")
+ now = time.time()
+ urllib_get(TO_DOWNLOAD)
+ urllib_elapsed = time.time() - now
+
+ print("Completed pool_get in %0.3fs" % pool_elapsed)
+ print("Completed urllib_get in %0.3fs" % urllib_elapsed)
+
+
+"""
+Example results:
+
+Completed pool_get in 1.163s
+Completed urllib_get in 2.318s
+"""
diff --git a/test/contrib/__init__.py b/test/contrib/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/contrib/__init__.py
diff --git a/test/contrib/__init__.pyc b/test/contrib/__init__.pyc
new file mode 100644
index 0000000..2d2fd5d
--- /dev/null
+++ b/test/contrib/__init__.pyc
Binary files differ
diff --git a/test/contrib/test_pyopenssl.py b/test/contrib/test_pyopenssl.py
new file mode 100644
index 0000000..5d57527
--- /dev/null
+++ b/test/contrib/test_pyopenssl.py
@@ -0,0 +1,23 @@
+from nose.plugins.skip import SkipTest
+from urllib3.packages import six
+
+if six.PY3:
+ raise SkipTest('Testing of PyOpenSSL disabled on PY3')
+
+try:
+ from urllib3.contrib.pyopenssl import (inject_into_urllib3,
+ extract_from_urllib3)
+except ImportError as e:
+ raise SkipTest('Could not import PyOpenSSL: %r' % e)
+
+
+from ..with_dummyserver.test_https import TestHTTPS, TestHTTPS_TLSv1
+from ..with_dummyserver.test_socketlevel import TestSNI, TestSocketClosing
+
+
+def setup_module():
+ inject_into_urllib3()
+
+
+def teardown_module():
+ extract_from_urllib3()
diff --git a/test/contrib/test_pyopenssl.pyc b/test/contrib/test_pyopenssl.pyc
new file mode 100644
index 0000000..6441273
--- /dev/null
+++ b/test/contrib/test_pyopenssl.pyc
Binary files differ
diff --git a/test/port_helpers.py b/test/port_helpers.py
new file mode 100644
index 0000000..e818a9b
--- /dev/null
+++ b/test/port_helpers.py
@@ -0,0 +1,100 @@
+# These helpers are copied from test_support.py in the Python 2.7 standard
+# library test suite.
+
+import socket
+
+
+# Don't use "localhost", since resolving it uses the DNS under recent
+# Windows versions (see issue #18792).
+HOST = "127.0.0.1"
+HOSTv6 = "::1"
+
+def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
+ """Returns an unused port that should be suitable for binding. This is
+ achieved by creating a temporary socket with the same family and type as
+ the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
+ the specified host address (defaults to 0.0.0.0) with the port set to 0,
+ eliciting an unused ephemeral port from the OS. The temporary socket is
+ then closed and deleted, and the ephemeral port is returned.
+
+ Either this method or bind_port() should be used for any tests where a
+ server socket needs to be bound to a particular port for the duration of
+ the test. Which one to use depends on whether the calling code is creating
+ a python socket, or if an unused port needs to be provided in a constructor
+ or passed to an external program (i.e. the -accept argument to openssl's
+ s_server mode). Always prefer bind_port() over find_unused_port() where
+ possible. Hard coded ports should *NEVER* be used. As soon as a server
+ socket is bound to a hard coded port, the ability to run multiple instances
+ of the test simultaneously on the same host is compromised, which makes the
+ test a ticking time bomb in a buildbot environment. On Unix buildbots, this
+ may simply manifest as a failed test, which can be recovered from without
+ intervention in most cases, but on Windows, the entire python process can
+ completely and utterly wedge, requiring someone to log in to the buildbot
+ and manually kill the affected process.
+
+ (This is easy to reproduce on Windows, unfortunately, and can be traced to
+ the SO_REUSEADDR socket option having different semantics on Windows versus
+ Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
+ listen and then accept connections on identical host/ports. An EADDRINUSE
+ socket.error will be raised at some point (depending on the platform and
+ the order bind and listen were called on each socket).
+
+ However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
+ will ever be raised when attempting to bind two identical host/ports. When
+ accept() is called on each socket, the second caller's process will steal
+ the port from the first caller, leaving them both in an awkwardly wedged
+ state where they'll no longer respond to any signals or graceful kills, and
+ must be forcibly killed via OpenProcess()/TerminateProcess().
+
+ The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
+ instead of SO_REUSEADDR, which effectively affords the same semantics as
+ SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
+ Source world compared to Windows ones, this is a common mistake. A quick
+ look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
+ openssl.exe is called with the 's_server' option, for example. See
+ http://bugs.python.org/issue2550 for more info. The following site also
+ has a very thorough description about the implications of both REUSEADDR
+ and EXCLUSIVEADDRUSE on Windows:
+ http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
+
+ XXX: although this approach is a vast improvement on previous attempts to
+ elicit unused ports, it rests heavily on the assumption that the ephemeral
+ port returned to us by the OS won't immediately be dished back out to some
+ other process when we close and delete our temporary socket but before our
+ calling code has a chance to bind the returned port. We can deal with this
+ issue if/when we come across it."""
+ tempsock = socket.socket(family, socktype)
+ port = bind_port(tempsock)
+ tempsock.close()
+ del tempsock
+ return port
+
+def bind_port(sock, host=HOST):
+ """Bind the socket to a free port and return the port number. Relies on
+ ephemeral ports in order to ensure we are using an unbound port. This is
+ important as many tests may be running simultaneously, especially in a
+ buildbot environment. This method raises an exception if the sock.family
+ is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
+ or SO_REUSEPORT set on it. Tests should *never* set these socket options
+ for TCP/IP sockets. The only case for setting these options is testing
+ multicasting via multiple UDP sockets.
+
+ Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
+ on Windows), it will be set on the socket. This will prevent anyone else
+ from bind()'ing to our host/port for the duration of the test.
+ """
+ if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
+ if hasattr(socket, 'SO_REUSEADDR'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
+ raise ValueError("tests should never set the SO_REUSEADDR " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_REUSEPORT'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
+ raise ValueError("tests should never set the SO_REUSEPORT " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
+
+ sock.bind((host, 0))
+ port = sock.getsockname()[1]
+ return port
diff --git a/test/port_helpers.pyc b/test/port_helpers.pyc
new file mode 100644
index 0000000..7a1c425
--- /dev/null
+++ b/test/port_helpers.pyc
Binary files differ
diff --git a/test/test_collections.py b/test/test_collections.py
new file mode 100644
index 0000000..4d173ac
--- /dev/null
+++ b/test/test_collections.py
@@ -0,0 +1,180 @@
+import unittest
+
+from urllib3._collections import (
+ HTTPHeaderDict,
+ RecentlyUsedContainer as Container
+)
+from urllib3.packages import six
+xrange = six.moves.xrange
+
+
+class TestLRUContainer(unittest.TestCase):
+ def test_maxsize(self):
+ d = Container(5)
+
+ for i in xrange(5):
+ d[i] = str(i)
+
+ self.assertEqual(len(d), 5)
+
+ for i in xrange(5):
+ self.assertEqual(d[i], str(i))
+
+ d[i+1] = str(i+1)
+
+ self.assertEqual(len(d), 5)
+ self.assertFalse(0 in d)
+ self.assertTrue(i+1 in d)
+
+ def test_expire(self):
+ d = Container(5)
+
+ for i in xrange(5):
+ d[i] = str(i)
+
+ for i in xrange(5):
+ d.get(0)
+
+ # Add one more entry
+ d[5] = '5'
+
+ # Check state
+ self.assertEqual(list(d.keys()), [2, 3, 4, 0, 5])
+
+ def test_same_key(self):
+ d = Container(5)
+
+ for i in xrange(10):
+ d['foo'] = i
+
+ self.assertEqual(list(d.keys()), ['foo'])
+ self.assertEqual(len(d), 1)
+
+ def test_access_ordering(self):
+ d = Container(5)
+
+ for i in xrange(10):
+ d[i] = True
+
+ # Keys should be ordered by access time
+ self.assertEqual(list(d.keys()), [5, 6, 7, 8, 9])
+
+ new_order = [7,8,6,9,5]
+ for k in new_order:
+ d[k]
+
+ self.assertEqual(list(d.keys()), new_order)
+
+ def test_delete(self):
+ d = Container(5)
+
+ for i in xrange(5):
+ d[i] = True
+
+ del d[0]
+ self.assertFalse(0 in d)
+
+ d.pop(1)
+ self.assertFalse(1 in d)
+
+ d.pop(1, None)
+
+ def test_get(self):
+ d = Container(5)
+
+ for i in xrange(5):
+ d[i] = True
+
+ r = d.get(4)
+ self.assertEqual(r, True)
+
+ r = d.get(5)
+ self.assertEqual(r, None)
+
+ r = d.get(5, 42)
+ self.assertEqual(r, 42)
+
+ self.assertRaises(KeyError, lambda: d[5])
+
+ def test_disposal(self):
+ evicted_items = []
+
+ def dispose_func(arg):
+ # Save the evicted datum for inspection
+ evicted_items.append(arg)
+
+ d = Container(5, dispose_func=dispose_func)
+ for i in xrange(5):
+ d[i] = i
+ self.assertEqual(list(d.keys()), list(xrange(5)))
+ self.assertEqual(evicted_items, []) # Nothing disposed
+
+ d[5] = 5
+ self.assertEqual(list(d.keys()), list(xrange(1, 6)))
+ self.assertEqual(evicted_items, [0])
+
+ del d[1]
+ self.assertEqual(evicted_items, [0, 1])
+
+ d.clear()
+ self.assertEqual(evicted_items, [0, 1, 2, 3, 4, 5])
+
+ def test_iter(self):
+ d = Container()
+
+ self.assertRaises(NotImplementedError, d.__iter__)
+
+
+class TestHTTPHeaderDict(unittest.TestCase):
+ def setUp(self):
+ self.d = HTTPHeaderDict(A='foo')
+ self.d.add('a', 'bar')
+
+ def test_overwriting_with_setitem_replaces(self):
+ d = HTTPHeaderDict()
+
+ d['A'] = 'foo'
+ self.assertEqual(d['a'], 'foo')
+
+ d['a'] = 'bar'
+ self.assertEqual(d['A'], 'bar')
+
+ def test_copy(self):
+ h = self.d.copy()
+ self.assertTrue(self.d is not h)
+ self.assertEqual(self.d, h)
+
+ def test_add(self):
+ d = HTTPHeaderDict()
+
+ d['A'] = 'foo'
+ d.add('a', 'bar')
+
+ self.assertEqual(d['a'], 'foo, bar')
+ self.assertEqual(d['A'], 'foo, bar')
+
+ def test_getlist(self):
+ self.assertEqual(self.d.getlist('a'), ['foo', 'bar'])
+ self.assertEqual(self.d.getlist('A'), ['foo', 'bar'])
+ self.assertEqual(self.d.getlist('b'), [])
+
+ def test_delitem(self):
+ del self.d['a']
+ self.assertFalse('a' in self.d)
+ self.assertFalse('A' in self.d)
+
+ def test_equal(self):
+ b = HTTPHeaderDict({'a': 'foo, bar'})
+ self.assertEqual(self.d, b)
+ c = [('a', 'foo, bar')]
+ self.assertNotEqual(self.d, c)
+
+ def test_len(self):
+ self.assertEqual(len(self.d), 1)
+
+ def test_repr(self):
+ rep = "HTTPHeaderDict({'A': 'foo, bar'})"
+ self.assertEqual(repr(self.d), rep)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_collections.pyc b/test/test_collections.pyc
new file mode 100644
index 0000000..d1ecd73
--- /dev/null
+++ b/test/test_collections.pyc
Binary files differ
diff --git a/test/test_compatibility.py b/test/test_compatibility.py
new file mode 100644
index 0000000..05ee4de
--- /dev/null
+++ b/test/test_compatibility.py
@@ -0,0 +1,23 @@
+import unittest
+import warnings
+
+from urllib3.connection import HTTPConnection
+
+
+class TestVersionCompatibility(unittest.TestCase):
+ def test_connection_strict(self):
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter("always")
+
+ # strict=True is deprecated in Py33+
+ conn = HTTPConnection('localhost', 12345, strict=True)
+
+ if w:
+ self.fail('HTTPConnection raised warning on strict=True: %r' % w[0].message)
+
+ def test_connection_source_address(self):
+ try:
+ # source_address does not exist in Py26-
+ conn = HTTPConnection('localhost', 12345, source_address='127.0.0.1')
+ except TypeError as e:
+ self.fail('HTTPConnection raised TypeError on source_adddress: %r' % e)
diff --git a/test/test_compatibility.pyc b/test/test_compatibility.pyc
new file mode 100644
index 0000000..2dfdf75
--- /dev/null
+++ b/test/test_compatibility.pyc
Binary files differ
diff --git a/test/test_connectionpool.py b/test/test_connectionpool.py
new file mode 100644
index 0000000..28fb89b
--- /dev/null
+++ b/test/test_connectionpool.py
@@ -0,0 +1,211 @@
+import unittest
+
+from urllib3.connectionpool import (
+ connection_from_url,
+ HTTPConnection,
+ HTTPConnectionPool,
+)
+from urllib3.util.timeout import Timeout
+from urllib3.packages.ssl_match_hostname import CertificateError
+from urllib3.exceptions import (
+ ClosedPoolError,
+ EmptyPoolError,
+ HostChangedError,
+ LocationValueError,
+ MaxRetryError,
+ ProtocolError,
+ SSLError,
+)
+
+from socket import error as SocketError
+from ssl import SSLError as BaseSSLError
+
+try: # Python 3
+ from queue import Empty
+ from http.client import HTTPException
+except ImportError:
+ from Queue import Empty
+ from httplib import HTTPException
+
+
+class TestConnectionPool(unittest.TestCase):
+ """
+ Tests in this suite should exercise the ConnectionPool functionality
+ without actually making any network requests or connections.
+ """
+ def test_same_host(self):
+ same_host = [
+ ('http://google.com/', '/'),
+ ('http://google.com/', 'http://google.com/'),
+ ('http://google.com/', 'http://google.com'),
+ ('http://google.com/', 'http://google.com/abra/cadabra'),
+ ('http://google.com:42/', 'http://google.com:42/abracadabra'),
+ # Test comparison using default ports
+ ('http://google.com:80/', 'http://google.com/abracadabra'),
+ ('http://google.com/', 'http://google.com:80/abracadabra'),
+ ('https://google.com:443/', 'https://google.com/abracadabra'),
+ ('https://google.com/', 'https://google.com:443/abracadabra'),
+ ]
+
+ for a, b in same_host:
+ c = connection_from_url(a)
+ self.assertTrue(c.is_same_host(b), "%s =? %s" % (a, b))
+
+ not_same_host = [
+ ('https://google.com/', 'http://google.com/'),
+ ('http://google.com/', 'https://google.com/'),
+ ('http://yahoo.com/', 'http://google.com/'),
+ ('http://google.com:42', 'https://google.com/abracadabra'),
+ ('http://google.com', 'https://google.net/'),
+ # Test comparison with default ports
+ ('http://google.com:42', 'http://google.com'),
+ ('https://google.com:42', 'https://google.com'),
+ ('http://google.com:443', 'http://google.com'),
+ ('https://google.com:80', 'https://google.com'),
+ ('http://google.com:443', 'https://google.com'),
+ ('https://google.com:80', 'http://google.com'),
+ ('https://google.com:443', 'http://google.com'),
+ ('http://google.com:80', 'https://google.com'),
+ ]
+
+ for a, b in not_same_host:
+ c = connection_from_url(a)
+ self.assertFalse(c.is_same_host(b), "%s =? %s" % (a, b))
+ c = connection_from_url(b)
+ self.assertFalse(c.is_same_host(a), "%s =? %s" % (b, a))
+
+
+ def test_max_connections(self):
+ pool = HTTPConnectionPool(host='localhost', maxsize=1, block=True)
+
+ pool._get_conn(timeout=0.01)
+
+ try:
+ pool._get_conn(timeout=0.01)
+ self.fail("Managed to get a connection without EmptyPoolError")
+ except EmptyPoolError:
+ pass
+
+ try:
+ pool.request('GET', '/', pool_timeout=0.01)
+ self.fail("Managed to get a connection without EmptyPoolError")
+ except EmptyPoolError:
+ pass
+
+ self.assertEqual(pool.num_connections, 1)
+
+ def test_pool_edgecases(self):
+ pool = HTTPConnectionPool(host='localhost', maxsize=1, block=False)
+
+ conn1 = pool._get_conn()
+ conn2 = pool._get_conn() # New because block=False
+
+ pool._put_conn(conn1)
+ pool._put_conn(conn2) # Should be discarded
+
+ self.assertEqual(conn1, pool._get_conn())
+ self.assertNotEqual(conn2, pool._get_conn())
+
+ self.assertEqual(pool.num_connections, 3)
+
+ def test_exception_str(self):
+ self.assertEqual(
+ str(EmptyPoolError(HTTPConnectionPool(host='localhost'), "Test.")),
+ "HTTPConnectionPool(host='localhost', port=None): Test.")
+
+ def test_retry_exception_str(self):
+ self.assertEqual(
+ str(MaxRetryError(
+ HTTPConnectionPool(host='localhost'), "Test.", None)),
+ "HTTPConnectionPool(host='localhost', port=None): "
+ "Max retries exceeded with url: Test. (Caused by redirect)")
+
+ err = SocketError("Test")
+
+ # using err.__class__ here, as socket.error is an alias for OSError
+ # since Py3.3 and gets printed as this
+ self.assertEqual(
+ str(MaxRetryError(
+ HTTPConnectionPool(host='localhost'), "Test.", err)),
+ "HTTPConnectionPool(host='localhost', port=None): "
+ "Max retries exceeded with url: Test. "
+ "(Caused by %r)" % err)
+
+
+ def test_pool_size(self):
+ POOL_SIZE = 1
+ pool = HTTPConnectionPool(host='localhost', maxsize=POOL_SIZE, block=True)
+
+ def _raise(ex):
+ raise ex()
+
+ def _test(exception, expect):
+ pool._make_request = lambda *args, **kwargs: _raise(exception)
+ self.assertRaises(expect, pool.request, 'GET', '/')
+
+ self.assertEqual(pool.pool.qsize(), POOL_SIZE)
+
+ # Make sure that all of the exceptions return the connection to the pool
+ _test(Empty, EmptyPoolError)
+ _test(BaseSSLError, SSLError)
+ _test(CertificateError, SSLError)
+
+ # The pool should never be empty, and with these two exceptions being raised,
+ # a retry will be triggered, but that retry will fail, eventually raising
+ # MaxRetryError, not EmptyPoolError
+ # See: https://github.com/shazow/urllib3/issues/76
+ pool._make_request = lambda *args, **kwargs: _raise(HTTPException)
+ self.assertRaises(MaxRetryError, pool.request,
+ 'GET', '/', retries=1, pool_timeout=0.01)
+ self.assertEqual(pool.pool.qsize(), POOL_SIZE)
+
+ def test_assert_same_host(self):
+ c = connection_from_url('http://google.com:80')
+
+ self.assertRaises(HostChangedError, c.request,
+ 'GET', 'http://yahoo.com:80', assert_same_host=True)
+
+ def test_pool_close(self):
+ pool = connection_from_url('http://google.com:80')
+
+ # Populate with some connections
+ conn1 = pool._get_conn()
+ conn2 = pool._get_conn()
+ conn3 = pool._get_conn()
+ pool._put_conn(conn1)
+ pool._put_conn(conn2)
+
+ old_pool_queue = pool.pool
+
+ pool.close()
+ self.assertEqual(pool.pool, None)
+
+ self.assertRaises(ClosedPoolError, pool._get_conn)
+
+ pool._put_conn(conn3)
+
+ self.assertRaises(ClosedPoolError, pool._get_conn)
+
+ self.assertRaises(Empty, old_pool_queue.get, block=False)
+
+ def test_pool_timeouts(self):
+ pool = HTTPConnectionPool(host='localhost')
+ conn = pool._new_conn()
+ self.assertEqual(conn.__class__, HTTPConnection)
+ self.assertEqual(pool.timeout.__class__, Timeout)
+ self.assertEqual(pool.timeout._read, Timeout.DEFAULT_TIMEOUT)
+ self.assertEqual(pool.timeout._connect, Timeout.DEFAULT_TIMEOUT)
+ self.assertEqual(pool.timeout.total, None)
+
+ pool = HTTPConnectionPool(host='localhost', timeout=3)
+ self.assertEqual(pool.timeout._read, 3)
+ self.assertEqual(pool.timeout._connect, 3)
+ self.assertEqual(pool.timeout.total, None)
+
+ def test_no_host(self):
+ self.assertRaises(LocationValueError, HTTPConnectionPool, None)
+
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_connectionpool.pyc b/test/test_connectionpool.pyc
new file mode 100644
index 0000000..e87a3b3
--- /dev/null
+++ b/test/test_connectionpool.pyc
Binary files differ
diff --git a/test/test_exceptions.py b/test/test_exceptions.py
new file mode 100644
index 0000000..4190a61
--- /dev/null
+++ b/test/test_exceptions.py
@@ -0,0 +1,46 @@
+import unittest
+import pickle
+
+from urllib3.exceptions import (HTTPError, MaxRetryError, LocationParseError,
+ ClosedPoolError, EmptyPoolError,
+ HostChangedError, ReadTimeoutError,
+ ConnectTimeoutError)
+from urllib3.connectionpool import HTTPConnectionPool
+
+
+
+class TestPickle(unittest.TestCase):
+
+ def verify_pickling(self, item):
+ return pickle.loads(pickle.dumps(item))
+
+ def test_exceptions(self):
+ assert self.verify_pickling(HTTPError(None))
+ assert self.verify_pickling(MaxRetryError(None, None, None))
+ assert self.verify_pickling(LocationParseError(None))
+ assert self.verify_pickling(ConnectTimeoutError(None))
+
+ def test_exceptions_with_objects(self):
+ assert self.verify_pickling(
+ HTTPError('foo'))
+
+ assert self.verify_pickling(
+ HTTPError('foo', IOError('foo')))
+
+ assert self.verify_pickling(
+ MaxRetryError(HTTPConnectionPool('localhost'), '/', None))
+
+ assert self.verify_pickling(
+ LocationParseError('fake location'))
+
+ assert self.verify_pickling(
+ ClosedPoolError(HTTPConnectionPool('localhost'), None))
+
+ assert self.verify_pickling(
+ EmptyPoolError(HTTPConnectionPool('localhost'), None))
+
+ assert self.verify_pickling(
+ HostChangedError(HTTPConnectionPool('localhost'), '/', None))
+
+ assert self.verify_pickling(
+ ReadTimeoutError(HTTPConnectionPool('localhost'), '/', None))
diff --git a/test/test_exceptions.pyc b/test/test_exceptions.pyc
new file mode 100644
index 0000000..3274e34
--- /dev/null
+++ b/test/test_exceptions.pyc
Binary files differ
diff --git a/test/test_fields.py b/test/test_fields.py
new file mode 100644
index 0000000..cdec68b
--- /dev/null
+++ b/test/test_fields.py
@@ -0,0 +1,49 @@
+import unittest
+
+from urllib3.fields import guess_content_type, RequestField
+from urllib3.packages.six import u
+
+
+class TestRequestField(unittest.TestCase):
+
+ def test_guess_content_type(self):
+ self.assertTrue(guess_content_type('image.jpg') in
+ ['image/jpeg', 'image/pjpeg'])
+ self.assertEqual(guess_content_type('notsure'),
+ 'application/octet-stream')
+ self.assertEqual(guess_content_type(None), 'application/octet-stream')
+
+ def test_create(self):
+ simple_field = RequestField('somename', 'data')
+ self.assertEqual(simple_field.render_headers(), '\r\n')
+ filename_field = RequestField('somename', 'data',
+ filename='somefile.txt')
+ self.assertEqual(filename_field.render_headers(), '\r\n')
+ headers_field = RequestField('somename', 'data',
+ headers={'Content-Length': 4})
+ self.assertEqual(
+ headers_field.render_headers(), 'Content-Length: 4\r\n\r\n')
+
+ def test_make_multipart(self):
+ field = RequestField('somename', 'data')
+ field.make_multipart(content_type='image/jpg',
+ content_location='/test')
+ self.assertEqual(
+ field.render_headers(),
+ 'Content-Disposition: form-data; name="somename"\r\n'
+ 'Content-Type: image/jpg\r\n'
+ 'Content-Location: /test\r\n'
+ '\r\n')
+
+ def test_render_parts(self):
+ field = RequestField('somename', 'data')
+ parts = field._render_parts({'name': 'value', 'filename': 'value'})
+ self.assertTrue('name="value"' in parts)
+ self.assertTrue('filename="value"' in parts)
+ parts = field._render_parts([('name', 'value'), ('filename', 'value')])
+ self.assertEqual(parts, 'name="value"; filename="value"')
+
+ def test_render_part(self):
+ field = RequestField('somename', 'data')
+ param = field._render_part('filename', u('n\u00e4me'))
+ self.assertEqual(param, "filename*=utf-8''n%C3%A4me")
diff --git a/test/test_fields.pyc b/test/test_fields.pyc
new file mode 100644
index 0000000..4622899
--- /dev/null
+++ b/test/test_fields.pyc
Binary files differ
diff --git a/test/test_filepost.py b/test/test_filepost.py
new file mode 100644
index 0000000..390dbb3
--- /dev/null
+++ b/test/test_filepost.py
@@ -0,0 +1,133 @@
+import unittest
+
+from urllib3.filepost import encode_multipart_formdata, iter_fields
+from urllib3.fields import RequestField
+from urllib3.packages.six import b, u
+
+
+BOUNDARY = '!! test boundary !!'
+
+
+class TestIterfields(unittest.TestCase):
+
+ def test_dict(self):
+ for fieldname, value in iter_fields(dict(a='b')):
+ self.assertEqual((fieldname, value), ('a', 'b'))
+
+ self.assertEqual(
+ list(sorted(iter_fields(dict(a='b', c='d')))),
+ [('a', 'b'), ('c', 'd')])
+
+ def test_tuple_list(self):
+ for fieldname, value in iter_fields([('a', 'b')]):
+ self.assertEqual((fieldname, value), ('a', 'b'))
+
+ self.assertEqual(
+ list(iter_fields([('a', 'b'), ('c', 'd')])),
+ [('a', 'b'), ('c', 'd')])
+
+
+class TestMultipartEncoding(unittest.TestCase):
+
+ def test_input_datastructures(self):
+ fieldsets = [
+ dict(k='v', k2='v2'),
+ [('k', 'v'), ('k2', 'v2')],
+ ]
+
+ for fields in fieldsets:
+ encoded, _ = encode_multipart_formdata(fields, boundary=BOUNDARY)
+ self.assertEqual(encoded.count(b(BOUNDARY)), 3)
+
+
+ def test_field_encoding(self):
+ fieldsets = [
+ [('k', 'v'), ('k2', 'v2')],
+ [('k', b'v'), (u('k2'), b'v2')],
+ [('k', b'v'), (u('k2'), 'v2')],
+ ]
+
+ for fields in fieldsets:
+ encoded, content_type = encode_multipart_formdata(fields, boundary=BOUNDARY)
+
+ self.assertEqual(encoded,
+ b'--' + b(BOUNDARY) + b'\r\n'
+ b'Content-Disposition: form-data; name="k"\r\n'
+ b'\r\n'
+ b'v\r\n'
+ b'--' + b(BOUNDARY) + b'\r\n'
+ b'Content-Disposition: form-data; name="k2"\r\n'
+ b'\r\n'
+ b'v2\r\n'
+ b'--' + b(BOUNDARY) + b'--\r\n'
+ , fields)
+
+ self.assertEqual(content_type,
+ 'multipart/form-data; boundary=' + str(BOUNDARY))
+
+
+ def test_filename(self):
+ fields = [('k', ('somename', b'v'))]
+
+ encoded, content_type = encode_multipart_formdata(fields, boundary=BOUNDARY)
+
+ self.assertEqual(encoded,
+ b'--' + b(BOUNDARY) + b'\r\n'
+ b'Content-Disposition: form-data; name="k"; filename="somename"\r\n'
+ b'Content-Type: application/octet-stream\r\n'
+ b'\r\n'
+ b'v\r\n'
+ b'--' + b(BOUNDARY) + b'--\r\n'
+ )
+
+ self.assertEqual(content_type,
+ 'multipart/form-data; boundary=' + str(BOUNDARY))
+
+
+ def test_textplain(self):
+ fields = [('k', ('somefile.txt', b'v'))]
+
+ encoded, content_type = encode_multipart_formdata(fields, boundary=BOUNDARY)
+
+ self.assertEqual(encoded,
+ b'--' + b(BOUNDARY) + b'\r\n'
+ b'Content-Disposition: form-data; name="k"; filename="somefile.txt"\r\n'
+ b'Content-Type: text/plain\r\n'
+ b'\r\n'
+ b'v\r\n'
+ b'--' + b(BOUNDARY) + b'--\r\n'
+ )
+
+ self.assertEqual(content_type,
+ 'multipart/form-data; boundary=' + str(BOUNDARY))
+
+
+ def test_explicit(self):
+ fields = [('k', ('somefile.txt', b'v', 'image/jpeg'))]
+
+ encoded, content_type = encode_multipart_formdata(fields, boundary=BOUNDARY)
+
+ self.assertEqual(encoded,
+ b'--' + b(BOUNDARY) + b'\r\n'
+ b'Content-Disposition: form-data; name="k"; filename="somefile.txt"\r\n'
+ b'Content-Type: image/jpeg\r\n'
+ b'\r\n'
+ b'v\r\n'
+ b'--' + b(BOUNDARY) + b'--\r\n'
+ )
+
+ self.assertEqual(content_type,
+ 'multipart/form-data; boundary=' + str(BOUNDARY))
+
+ def test_request_fields(self):
+ fields = [RequestField('k', b'v', filename='somefile.txt', headers={'Content-Type': 'image/jpeg'})]
+
+ encoded, content_type = encode_multipart_formdata(fields, boundary=BOUNDARY)
+
+ self.assertEqual(encoded,
+ b'--' + b(BOUNDARY) + b'\r\n'
+ b'Content-Type: image/jpeg\r\n'
+ b'\r\n'
+ b'v\r\n'
+ b'--' + b(BOUNDARY) + b'--\r\n'
+ )
diff --git a/test/test_filepost.pyc b/test/test_filepost.pyc
new file mode 100644
index 0000000..ec54472
--- /dev/null
+++ b/test/test_filepost.pyc
Binary files differ
diff --git a/test/test_poolmanager.py b/test/test_poolmanager.py
new file mode 100644
index 0000000..754ee8a
--- /dev/null
+++ b/test/test_poolmanager.py
@@ -0,0 +1,76 @@
+import unittest
+
+from urllib3.poolmanager import PoolManager
+from urllib3 import connection_from_url
+from urllib3.exceptions import (
+ ClosedPoolError,
+ LocationValueError,
+)
+
+
+class TestPoolManager(unittest.TestCase):
+ def test_same_url(self):
+ # Convince ourselves that normally we don't get the same object
+ conn1 = connection_from_url('http://localhost:8081/foo')
+ conn2 = connection_from_url('http://localhost:8081/bar')
+
+ self.assertNotEqual(conn1, conn2)
+
+ # Now try again using the PoolManager
+ p = PoolManager(1)
+
+ conn1 = p.connection_from_url('http://localhost:8081/foo')
+ conn2 = p.connection_from_url('http://localhost:8081/bar')
+
+ self.assertEqual(conn1, conn2)
+
+ def test_many_urls(self):
+ urls = [
+ "http://localhost:8081/foo",
+ "http://www.google.com/mail",
+ "http://localhost:8081/bar",
+ "https://www.google.com/",
+ "https://www.google.com/mail",
+ "http://yahoo.com",
+ "http://bing.com",
+ "http://yahoo.com/",
+ ]
+
+ connections = set()
+
+ p = PoolManager(10)
+
+ for url in urls:
+ conn = p.connection_from_url(url)
+ connections.add(conn)
+
+ self.assertEqual(len(connections), 5)
+
+ def test_manager_clear(self):
+ p = PoolManager(5)
+
+ conn_pool = p.connection_from_url('http://google.com')
+ self.assertEqual(len(p.pools), 1)
+
+ conn = conn_pool._get_conn()
+
+ p.clear()
+ self.assertEqual(len(p.pools), 0)
+
+ self.assertRaises(ClosedPoolError, conn_pool._get_conn)
+
+ conn_pool._put_conn(conn)
+
+ self.assertRaises(ClosedPoolError, conn_pool._get_conn)
+
+ self.assertEqual(len(p.pools), 0)
+
+
+ def test_nohost(self):
+ p = PoolManager(5)
+ self.assertRaises(LocationValueError, p.connection_from_url, 'http://@')
+ self.assertRaises(LocationValueError, p.connection_from_url, None)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_poolmanager.pyc b/test/test_poolmanager.pyc
new file mode 100644
index 0000000..077c2ac
--- /dev/null
+++ b/test/test_poolmanager.pyc
Binary files differ
diff --git a/test/test_proxymanager.py b/test/test_proxymanager.py
new file mode 100644
index 0000000..e7b5c48
--- /dev/null
+++ b/test/test_proxymanager.py
@@ -0,0 +1,43 @@
+import unittest
+
+from urllib3.poolmanager import ProxyManager
+
+
+class TestProxyManager(unittest.TestCase):
+ def test_proxy_headers(self):
+ p = ProxyManager('http://something:1234')
+ url = 'http://pypi.python.org/test'
+
+ # Verify default headers
+ default_headers = {'Accept': '*/*',
+ 'Host': 'pypi.python.org'}
+ headers = p._set_proxy_headers(url)
+
+ self.assertEqual(headers, default_headers)
+
+ # Verify default headers don't overwrite provided headers
+ provided_headers = {'Accept': 'application/json',
+ 'custom': 'header',
+ 'Host': 'test.python.org'}
+ headers = p._set_proxy_headers(url, provided_headers)
+
+ self.assertEqual(headers, provided_headers)
+
+ # Verify proxy with nonstandard port
+ provided_headers = {'Accept': 'application/json'}
+ expected_headers = provided_headers.copy()
+ expected_headers.update({'Host': 'pypi.python.org:8080'})
+ url_with_port = 'http://pypi.python.org:8080/test'
+ headers = p._set_proxy_headers(url_with_port, provided_headers)
+
+ self.assertEqual(headers, expected_headers)
+
+ def test_default_port(self):
+ p = ProxyManager('http://something')
+ self.assertEqual(p.proxy.port, 80)
+ p = ProxyManager('https://something')
+ self.assertEqual(p.proxy.port, 443)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_proxymanager.pyc b/test/test_proxymanager.pyc
new file mode 100644
index 0000000..3696ee8
--- /dev/null
+++ b/test/test_proxymanager.pyc
Binary files differ
diff --git a/test/test_response.py b/test/test_response.py
new file mode 100644
index 0000000..7d67c93
--- /dev/null
+++ b/test/test_response.py
@@ -0,0 +1,398 @@
+import unittest
+
+from io import BytesIO, BufferedReader
+
+from urllib3.response import HTTPResponse
+from urllib3.exceptions import DecodeError
+
+
+from base64 import b64decode
+
+# A known random (i.e, not-too-compressible) payload generated with:
+# "".join(random.choice(string.printable) for i in xrange(512))
+# .encode("zlib").encode("base64")
+# Randomness in tests == bad, and fixing a seed may not be sufficient.
+ZLIB_PAYLOAD = b64decode(b"""\
+eJwFweuaoQAAANDfineQhiKLUiaiCzvuTEmNNlJGiL5QhnGpZ99z8luQfe1AHoMioB+QSWHQu/L+
+lzd7W5CipqYmeVTBjdgSATdg4l4Z2zhikbuF+EKn69Q0DTpdmNJz8S33odfJoVEexw/l2SS9nFdi
+pis7KOwXzfSqarSo9uJYgbDGrs1VNnQpT9f8zAorhYCEZronZQF9DuDFfNK3Hecc+WHLnZLQptwk
+nufw8S9I43sEwxsT71BiqedHo0QeIrFE01F/4atVFXuJs2yxIOak3bvtXjUKAA6OKnQJ/nNvDGKZ
+Khe5TF36JbnKVjdcL1EUNpwrWVfQpFYJ/WWm2b74qNeSZeQv5/xBhRdOmKTJFYgO96PwrHBlsnLn
+a3l0LwJsloWpMbzByU5WLbRE6X5INFqjQOtIwYz5BAlhkn+kVqJvWM5vBlfrwP42ifonM5yF4ciJ
+auHVks62997mNGOsM7WXNG3P98dBHPo2NhbTvHleL0BI5dus2JY81MUOnK3SGWLH8HeWPa1t5KcW
+S5moAj5HexY/g/F8TctpxwsvyZp38dXeLDjSQvEQIkF7XR3YXbeZgKk3V34KGCPOAeeuQDIgyVhV
+nP4HF2uWHA==""")
+
+
+class TestLegacyResponse(unittest.TestCase):
+ def test_getheaders(self):
+ headers = {'host': 'example.com'}
+ r = HTTPResponse(headers=headers)
+ self.assertEqual(r.getheaders(), headers)
+
+ def test_getheader(self):
+ headers = {'host': 'example.com'}
+ r = HTTPResponse(headers=headers)
+ self.assertEqual(r.getheader('host'), 'example.com')
+
+
+class TestResponse(unittest.TestCase):
+ def test_cache_content(self):
+ r = HTTPResponse('foo')
+ self.assertEqual(r.data, 'foo')
+ self.assertEqual(r._body, 'foo')
+
+ def test_default(self):
+ r = HTTPResponse()
+ self.assertEqual(r.data, None)
+
+ def test_none(self):
+ r = HTTPResponse(None)
+ self.assertEqual(r.data, None)
+
+ def test_preload(self):
+ fp = BytesIO(b'foo')
+
+ r = HTTPResponse(fp, preload_content=True)
+
+ self.assertEqual(fp.tell(), len(b'foo'))
+ self.assertEqual(r.data, b'foo')
+
+ def test_no_preload(self):
+ fp = BytesIO(b'foo')
+
+ r = HTTPResponse(fp, preload_content=False)
+
+ self.assertEqual(fp.tell(), 0)
+ self.assertEqual(r.data, b'foo')
+ self.assertEqual(fp.tell(), len(b'foo'))
+
+ def test_decode_bad_data(self):
+ fp = BytesIO(b'\x00' * 10)
+ self.assertRaises(DecodeError, HTTPResponse, fp, headers={
+ 'content-encoding': 'deflate'
+ })
+
+ def test_decode_deflate(self):
+ import zlib
+ data = zlib.compress(b'foo')
+
+ fp = BytesIO(data)
+ r = HTTPResponse(fp, headers={'content-encoding': 'deflate'})
+
+ self.assertEqual(r.data, b'foo')
+
+ def test_decode_deflate_case_insensitve(self):
+ import zlib
+ data = zlib.compress(b'foo')
+
+ fp = BytesIO(data)
+ r = HTTPResponse(fp, headers={'content-encoding': 'DeFlAtE'})
+
+ self.assertEqual(r.data, b'foo')
+
+ def test_chunked_decoding_deflate(self):
+ import zlib
+ data = zlib.compress(b'foo')
+
+ fp = BytesIO(data)
+ r = HTTPResponse(fp, headers={'content-encoding': 'deflate'},
+ preload_content=False)
+
+ self.assertEqual(r.read(3), b'')
+ self.assertEqual(r.read(1), b'f')
+ self.assertEqual(r.read(2), b'oo')
+
+ def test_chunked_decoding_deflate2(self):
+ import zlib
+ compress = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS)
+ data = compress.compress(b'foo')
+ data += compress.flush()
+
+ fp = BytesIO(data)
+ r = HTTPResponse(fp, headers={'content-encoding': 'deflate'},
+ preload_content=False)
+
+ self.assertEqual(r.read(1), b'')
+ self.assertEqual(r.read(1), b'f')
+ self.assertEqual(r.read(2), b'oo')
+
+ def test_chunked_decoding_gzip(self):
+ import zlib
+ compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
+ data = compress.compress(b'foo')
+ data += compress.flush()
+
+ fp = BytesIO(data)
+ r = HTTPResponse(fp, headers={'content-encoding': 'gzip'},
+ preload_content=False)
+
+ self.assertEqual(r.read(11), b'')
+ self.assertEqual(r.read(1), b'f')
+ self.assertEqual(r.read(2), b'oo')
+
+ def test_body_blob(self):
+ resp = HTTPResponse(b'foo')
+ self.assertEqual(resp.data, b'foo')
+ self.assertTrue(resp.closed)
+
+ def test_io(self):
+ import socket
+ try:
+ from http.client import HTTPResponse as OldHTTPResponse
+ except:
+ from httplib import HTTPResponse as OldHTTPResponse
+
+ fp = BytesIO(b'foo')
+ resp = HTTPResponse(fp, preload_content=False)
+
+ self.assertEqual(resp.closed, False)
+ self.assertEqual(resp.readable(), True)
+ self.assertEqual(resp.writable(), False)
+ self.assertRaises(IOError, resp.fileno)
+
+ resp.close()
+ self.assertEqual(resp.closed, True)
+
+ # Try closing with an `httplib.HTTPResponse`, because it has an
+ # `isclosed` method.
+ hlr = OldHTTPResponse(socket.socket())
+ resp2 = HTTPResponse(hlr, preload_content=False)
+ self.assertEqual(resp2.closed, False)
+ resp2.close()
+ self.assertEqual(resp2.closed, True)
+
+ #also try when only data is present.
+ resp3 = HTTPResponse('foodata')
+ self.assertRaises(IOError, resp3.fileno)
+
+ resp3._fp = 2
+ # A corner case where _fp is present but doesn't have `closed`,
+ # `isclosed`, or `fileno`. Unlikely, but possible.
+ self.assertEqual(resp3.closed, True)
+ self.assertRaises(IOError, resp3.fileno)
+
+ def test_io_bufferedreader(self):
+ fp = BytesIO(b'foo')
+ resp = HTTPResponse(fp, preload_content=False)
+ br = BufferedReader(resp)
+
+ self.assertEqual(br.read(), b'foo')
+
+ br.close()
+ self.assertEqual(resp.closed, True)
+
+ b = b'fooandahalf'
+ fp = BytesIO(b)
+ resp = HTTPResponse(fp, preload_content=False)
+ br = BufferedReader(resp, 5)
+
+ br.read(1) # sets up the buffer, reading 5
+ self.assertEqual(len(fp.read()), len(b) - 5)
+
+ # This is necessary to make sure the "no bytes left" part of `readinto`
+ # gets tested.
+ while not br.closed:
+ br.read(5)
+
+ def test_io_readinto(self):
+ # This test is necessary because in py2.6, `readinto` doesn't get called
+ # in `test_io_bufferedreader` like it does for all the other python
+ # versions. Probably this is because the `io` module in py2.6 is an
+ # old version that has a different underlying implementation.
+
+
+ fp = BytesIO(b'foo')
+ resp = HTTPResponse(fp, preload_content=False)
+
+ barr = bytearray(3)
+ assert resp.readinto(barr) == 3
+ assert b'foo' == barr
+
+ # The reader should already be empty, so this should read nothing.
+ assert resp.readinto(barr) == 0
+ assert b'foo' == barr
+
+ def test_streaming(self):
+ fp = BytesIO(b'foo')
+ resp = HTTPResponse(fp, preload_content=False)
+ stream = resp.stream(2, decode_content=False)
+
+ self.assertEqual(next(stream), b'fo')
+ self.assertEqual(next(stream), b'o')
+ self.assertRaises(StopIteration, next, stream)
+
+ def test_streaming_tell(self):
+ fp = BytesIO(b'foo')
+ resp = HTTPResponse(fp, preload_content=False)
+ stream = resp.stream(2, decode_content=False)
+
+ position = 0
+
+ position += len(next(stream))
+ self.assertEqual(2, position)
+ self.assertEqual(position, resp.tell())
+
+ position += len(next(stream))
+ self.assertEqual(3, position)
+ self.assertEqual(position, resp.tell())
+
+ self.assertRaises(StopIteration, next, stream)
+
+ def test_gzipped_streaming(self):
+ import zlib
+ compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
+ data = compress.compress(b'foo')
+ data += compress.flush()
+
+ fp = BytesIO(data)
+ resp = HTTPResponse(fp, headers={'content-encoding': 'gzip'},
+ preload_content=False)
+ stream = resp.stream(2)
+
+ self.assertEqual(next(stream), b'f')
+ self.assertEqual(next(stream), b'oo')
+ self.assertRaises(StopIteration, next, stream)
+
+ def test_gzipped_streaming_tell(self):
+ import zlib
+ compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS)
+ uncompressed_data = b'foo'
+ data = compress.compress(uncompressed_data)
+ data += compress.flush()
+
+ fp = BytesIO(data)
+ resp = HTTPResponse(fp, headers={'content-encoding': 'gzip'},
+ preload_content=False)
+ stream = resp.stream()
+
+ # Read everything
+ payload = next(stream)
+ self.assertEqual(payload, uncompressed_data)
+
+ self.assertEqual(len(data), resp.tell())
+
+ self.assertRaises(StopIteration, next, stream)
+
+ def test_deflate_streaming_tell_intermediate_point(self):
+ # Ensure that ``tell()`` returns the correct number of bytes when
+ # part-way through streaming compressed content.
+ import zlib
+
+ NUMBER_OF_READS = 10
+
+ class MockCompressedDataReading(BytesIO):
+ """
+ A ByteIO-like reader returning ``payload`` in ``NUMBER_OF_READS``
+ calls to ``read``.
+ """
+
+ def __init__(self, payload, payload_part_size):
+ self.payloads = [
+ payload[i*payload_part_size:(i+1)*payload_part_size]
+ for i in range(NUMBER_OF_READS+1)]
+
+ assert b"".join(self.payloads) == payload
+
+ def read(self, _):
+ # Amount is unused.
+ if len(self.payloads) > 0:
+ return self.payloads.pop(0)
+ return b""
+
+ uncompressed_data = zlib.decompress(ZLIB_PAYLOAD)
+
+ payload_part_size = len(ZLIB_PAYLOAD) // NUMBER_OF_READS
+ fp = MockCompressedDataReading(ZLIB_PAYLOAD, payload_part_size)
+ resp = HTTPResponse(fp, headers={'content-encoding': 'deflate'},
+ preload_content=False)
+ stream = resp.stream()
+
+ parts_positions = [(part, resp.tell()) for part in stream]
+ end_of_stream = resp.tell()
+
+ self.assertRaises(StopIteration, next, stream)
+
+ parts, positions = zip(*parts_positions)
+
+ # Check that the payload is equal to the uncompressed data
+ payload = b"".join(parts)
+ self.assertEqual(uncompressed_data, payload)
+
+ # Check that the positions in the stream are correct
+ expected = [(i+1)*payload_part_size for i in range(NUMBER_OF_READS)]
+ self.assertEqual(expected, list(positions))
+
+ # Check that the end of the stream is in the correct place
+ self.assertEqual(len(ZLIB_PAYLOAD), end_of_stream)
+
+ def test_deflate_streaming(self):
+ import zlib
+ data = zlib.compress(b'foo')
+
+ fp = BytesIO(data)
+ resp = HTTPResponse(fp, headers={'content-encoding': 'deflate'},
+ preload_content=False)
+ stream = resp.stream(2)
+
+ self.assertEqual(next(stream), b'f')
+ self.assertEqual(next(stream), b'oo')
+ self.assertRaises(StopIteration, next, stream)
+
+ def test_deflate2_streaming(self):
+ import zlib
+ compress = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS)
+ data = compress.compress(b'foo')
+ data += compress.flush()
+
+ fp = BytesIO(data)
+ resp = HTTPResponse(fp, headers={'content-encoding': 'deflate'},
+ preload_content=False)
+ stream = resp.stream(2)
+
+ self.assertEqual(next(stream), b'f')
+ self.assertEqual(next(stream), b'oo')
+ self.assertRaises(StopIteration, next, stream)
+
+ def test_empty_stream(self):
+ fp = BytesIO(b'')
+ resp = HTTPResponse(fp, preload_content=False)
+ stream = resp.stream(2, decode_content=False)
+
+ self.assertRaises(StopIteration, next, stream)
+
+ def test_mock_httpresponse_stream(self):
+ # Mock out a HTTP Request that does enough to make it through urllib3's
+ # read() and close() calls, and also exhausts and underlying file
+ # object.
+ class MockHTTPRequest(object):
+ self.fp = None
+
+ def read(self, amt):
+ data = self.fp.read(amt)
+ if not data:
+ self.fp = None
+
+ return data
+
+ def close(self):
+ self.fp = None
+
+ bio = BytesIO(b'foo')
+ fp = MockHTTPRequest()
+ fp.fp = bio
+ resp = HTTPResponse(fp, preload_content=False)
+ stream = resp.stream(2)
+
+ self.assertEqual(next(stream), b'fo')
+ self.assertEqual(next(stream), b'o')
+ self.assertRaises(StopIteration, next, stream)
+
+ def test_get_case_insensitive_headers(self):
+ headers = {'host': 'example.com'}
+ r = HTTPResponse(headers=headers)
+ self.assertEqual(r.headers.get('host'), 'example.com')
+ self.assertEqual(r.headers.get('Host'), 'example.com')
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_response.pyc b/test/test_response.pyc
new file mode 100644
index 0000000..99e5c0e
--- /dev/null
+++ b/test/test_response.pyc
Binary files differ
diff --git a/test/test_retry.py b/test/test_retry.py
new file mode 100644
index 0000000..7a3aa40
--- /dev/null
+++ b/test/test_retry.py
@@ -0,0 +1,156 @@
+import unittest
+
+from urllib3.packages.six.moves import xrange
+from urllib3.util.retry import Retry
+from urllib3.exceptions import (
+ ConnectTimeoutError,
+ ReadTimeoutError,
+ MaxRetryError
+)
+
+
+class RetryTest(unittest.TestCase):
+
+ def test_string(self):
+ """ Retry string representation looks the way we expect """
+ retry = Retry()
+ self.assertEqual(str(retry), 'Retry(total=10, connect=None, read=None, redirect=None)')
+ for _ in range(3):
+ retry = retry.increment()
+ self.assertEqual(str(retry), 'Retry(total=7, connect=None, read=None, redirect=None)')
+
+ def test_retry_both_specified(self):
+ """Total can win if it's lower than the connect value"""
+ error = ConnectTimeoutError()
+ retry = Retry(connect=3, total=2)
+ retry = retry.increment(error=error)
+ retry = retry.increment(error=error)
+ try:
+ retry.increment(error=error)
+ self.fail("Failed to raise error.")
+ except MaxRetryError as e:
+ self.assertEqual(e.reason, error)
+
+ def test_retry_higher_total_loses(self):
+ """ A lower connect timeout than the total is honored """
+ error = ConnectTimeoutError()
+ retry = Retry(connect=2, total=3)
+ retry = retry.increment(error=error)
+ retry = retry.increment(error=error)
+ self.assertRaises(MaxRetryError, retry.increment, error=error)
+
+ def test_retry_higher_total_loses_vs_read(self):
+ """ A lower read timeout than the total is honored """
+ error = ReadTimeoutError(None, "/", "read timed out")
+ retry = Retry(read=2, total=3)
+ retry = retry.increment(error=error)
+ retry = retry.increment(error=error)
+ self.assertRaises(MaxRetryError, retry.increment, error=error)
+
+ def test_retry_total_none(self):
+ """ if Total is none, connect error should take precedence """
+ error = ConnectTimeoutError()
+ retry = Retry(connect=2, total=None)
+ retry = retry.increment(error=error)
+ retry = retry.increment(error=error)
+ try:
+ retry.increment(error=error)
+ self.fail("Failed to raise error.")
+ except MaxRetryError as e:
+ self.assertEqual(e.reason, error)
+
+ error = ReadTimeoutError(None, "/", "read timed out")
+ retry = Retry(connect=2, total=None)
+ retry = retry.increment(error=error)
+ retry = retry.increment(error=error)
+ retry = retry.increment(error=error)
+ self.assertFalse(retry.is_exhausted())
+
+ def test_retry_default(self):
+ """ If no value is specified, should retry connects 3 times """
+ retry = Retry()
+ self.assertEqual(retry.total, 10)
+ self.assertEqual(retry.connect, None)
+ self.assertEqual(retry.read, None)
+ self.assertEqual(retry.redirect, None)
+
+ error = ConnectTimeoutError()
+ retry = Retry(connect=1)
+ retry = retry.increment(error=error)
+ self.assertRaises(MaxRetryError, retry.increment, error=error)
+
+ retry = Retry(connect=1)
+ retry = retry.increment(error=error)
+ self.assertFalse(retry.is_exhausted())
+
+ self.assertTrue(Retry(0).raise_on_redirect)
+ self.assertFalse(Retry(False).raise_on_redirect)
+
+ def test_retry_read_zero(self):
+ """ No second chances on read timeouts, by default """
+ error = ReadTimeoutError(None, "/", "read timed out")
+ retry = Retry(read=0)
+ try:
+ retry.increment(error=error)
+ self.fail("Failed to raise error.")
+ except MaxRetryError as e:
+ self.assertEqual(e.reason, error)
+
+ def test_backoff(self):
+ """ Backoff is computed correctly """
+ max_backoff = Retry.BACKOFF_MAX
+
+ retry = Retry(total=100, backoff_factor=0.2)
+ self.assertEqual(retry.get_backoff_time(), 0) # First request
+
+ retry = retry.increment()
+ self.assertEqual(retry.get_backoff_time(), 0) # First retry
+
+ retry = retry.increment()
+ self.assertEqual(retry.backoff_factor, 0.2)
+ self.assertEqual(retry.total, 98)
+ self.assertEqual(retry.get_backoff_time(), 0.4) # Start backoff
+
+ retry = retry.increment()
+ self.assertEqual(retry.get_backoff_time(), 0.8)
+
+ retry = retry.increment()
+ self.assertEqual(retry.get_backoff_time(), 1.6)
+
+ for i in xrange(10):
+ retry = retry.increment()
+
+ self.assertEqual(retry.get_backoff_time(), max_backoff)
+
+ def test_zero_backoff(self):
+ retry = Retry()
+ self.assertEqual(retry.get_backoff_time(), 0)
+ retry = retry.increment()
+ retry = retry.increment()
+ self.assertEqual(retry.get_backoff_time(), 0)
+
+ def test_sleep(self):
+ # sleep a very small amount of time so our code coverage is happy
+ retry = Retry(backoff_factor=0.0001)
+ retry = retry.increment()
+ retry = retry.increment()
+ retry.sleep()
+
+ def test_status_forcelist(self):
+ retry = Retry(status_forcelist=xrange(500,600))
+ self.assertFalse(retry.is_forced_retry('GET', status_code=200))
+ self.assertFalse(retry.is_forced_retry('GET', status_code=400))
+ self.assertTrue(retry.is_forced_retry('GET', status_code=500))
+
+ retry = Retry(total=1, status_forcelist=[418])
+ self.assertFalse(retry.is_forced_retry('GET', status_code=400))
+ self.assertTrue(retry.is_forced_retry('GET', status_code=418))
+
+ def test_exhausted(self):
+ self.assertFalse(Retry(0).is_exhausted())
+ self.assertTrue(Retry(-1).is_exhausted())
+ self.assertEqual(Retry(1).increment().total, 0)
+
+ def test_disabled(self):
+ self.assertRaises(MaxRetryError, Retry(-1).increment)
+ self.assertRaises(MaxRetryError, Retry(0).increment)
diff --git a/test/test_retry.pyc b/test/test_retry.pyc
new file mode 100644
index 0000000..398c010
--- /dev/null
+++ b/test/test_retry.pyc
Binary files differ
diff --git a/test/test_util.py b/test/test_util.py
new file mode 100644
index 0000000..1811dbd
--- /dev/null
+++ b/test/test_util.py
@@ -0,0 +1,357 @@
+import warnings
+import logging
+import unittest
+import ssl
+
+from mock import patch
+
+from urllib3 import add_stderr_logger, disable_warnings
+from urllib3.util.request import make_headers
+from urllib3.util.timeout import Timeout
+from urllib3.util.url import (
+ get_host,
+ parse_url,
+ split_first,
+ Url,
+)
+from urllib3.util.ssl_ import resolve_cert_reqs
+from urllib3.exceptions import (
+ LocationParseError,
+ TimeoutStateError,
+ InsecureRequestWarning,
+)
+
+from urllib3.util import is_fp_closed
+
+from . import clear_warnings
+
+# This number represents a time in seconds, it doesn't mean anything in
+# isolation. Setting to a high-ish value to avoid conflicts with the smaller
+# numbers used for timeouts
+TIMEOUT_EPOCH = 1000
+
+class TestUtil(unittest.TestCase):
+ def test_get_host(self):
+ url_host_map = {
+ # Hosts
+ 'http://google.com/mail': ('http', 'google.com', None),
+ 'http://google.com/mail/': ('http', 'google.com', None),
+ 'google.com/mail': ('http', 'google.com', None),
+ 'http://google.com/': ('http', 'google.com', None),
+ 'http://google.com': ('http', 'google.com', None),
+ 'http://www.google.com': ('http', 'www.google.com', None),
+ 'http://mail.google.com': ('http', 'mail.google.com', None),
+ 'http://google.com:8000/mail/': ('http', 'google.com', 8000),
+ 'http://google.com:8000': ('http', 'google.com', 8000),
+ 'https://google.com': ('https', 'google.com', None),
+ 'https://google.com:8000': ('https', 'google.com', 8000),
+ 'http://user:password@127.0.0.1:1234': ('http', '127.0.0.1', 1234),
+ 'http://google.com/foo=http://bar:42/baz': ('http', 'google.com', None),
+ 'http://google.com?foo=http://bar:42/baz': ('http', 'google.com', None),
+ 'http://google.com#foo=http://bar:42/baz': ('http', 'google.com', None),
+
+ # IPv4
+ '173.194.35.7': ('http', '173.194.35.7', None),
+ 'http://173.194.35.7': ('http', '173.194.35.7', None),
+ 'http://173.194.35.7/test': ('http', '173.194.35.7', None),
+ 'http://173.194.35.7:80': ('http', '173.194.35.7', 80),
+ 'http://173.194.35.7:80/test': ('http', '173.194.35.7', 80),
+
+ # IPv6
+ '[2a00:1450:4001:c01::67]': ('http', '[2a00:1450:4001:c01::67]', None),
+ 'http://[2a00:1450:4001:c01::67]': ('http', '[2a00:1450:4001:c01::67]', None),
+ 'http://[2a00:1450:4001:c01::67]/test': ('http', '[2a00:1450:4001:c01::67]', None),
+ 'http://[2a00:1450:4001:c01::67]:80': ('http', '[2a00:1450:4001:c01::67]', 80),
+ 'http://[2a00:1450:4001:c01::67]:80/test': ('http', '[2a00:1450:4001:c01::67]', 80),
+
+ # More IPv6 from http://www.ietf.org/rfc/rfc2732.txt
+ 'http://[FEDC:BA98:7654:3210:FEDC:BA98:7654:3210]:8000/index.html': ('http', '[FEDC:BA98:7654:3210:FEDC:BA98:7654:3210]', 8000),
+ 'http://[1080:0:0:0:8:800:200C:417A]/index.html': ('http', '[1080:0:0:0:8:800:200C:417A]', None),
+ 'http://[3ffe:2a00:100:7031::1]': ('http', '[3ffe:2a00:100:7031::1]', None),
+ 'http://[1080::8:800:200C:417A]/foo': ('http', '[1080::8:800:200C:417A]', None),
+ 'http://[::192.9.5.5]/ipng': ('http', '[::192.9.5.5]', None),
+ 'http://[::FFFF:129.144.52.38]:42/index.html': ('http', '[::FFFF:129.144.52.38]', 42),
+ 'http://[2010:836B:4179::836B:4179]': ('http', '[2010:836B:4179::836B:4179]', None),
+ }
+ for url, expected_host in url_host_map.items():
+ returned_host = get_host(url)
+ self.assertEqual(returned_host, expected_host)
+
+ def test_invalid_host(self):
+ # TODO: Add more tests
+ invalid_host = [
+ 'http://google.com:foo',
+ 'http://::1/',
+ 'http://::1:80/',
+ ]
+
+ for location in invalid_host:
+ self.assertRaises(LocationParseError, get_host, location)
+
+
+ def test_parse_url(self):
+ url_host_map = {
+ 'http://google.com/mail': Url('http', host='google.com', path='/mail'),
+ 'http://google.com/mail/': Url('http', host='google.com', path='/mail/'),
+ 'google.com/mail': Url(host='google.com', path='/mail'),
+ 'http://google.com/': Url('http', host='google.com', path='/'),
+ 'http://google.com': Url('http', host='google.com'),
+ 'http://google.com?foo': Url('http', host='google.com', path='', query='foo'),
+
+ # Path/query/fragment
+ '': Url(),
+ '/': Url(path='/'),
+ '?': Url(path='', query=''),
+ '#': Url(path='', fragment=''),
+ '#?/!google.com/?foo#bar': Url(path='', fragment='?/!google.com/?foo#bar'),
+ '/foo': Url(path='/foo'),
+ '/foo?bar=baz': Url(path='/foo', query='bar=baz'),
+ '/foo?bar=baz#banana?apple/orange': Url(path='/foo', query='bar=baz', fragment='banana?apple/orange'),
+
+ # Port
+ 'http://google.com/': Url('http', host='google.com', path='/'),
+ 'http://google.com:80/': Url('http', host='google.com', port=80, path='/'),
+ 'http://google.com:/': Url('http', host='google.com', path='/'),
+ 'http://google.com:80': Url('http', host='google.com', port=80),
+ 'http://google.com:': Url('http', host='google.com'),
+
+ # Auth
+ 'http://foo:bar@localhost/': Url('http', auth='foo:bar', host='localhost', path='/'),
+ 'http://foo@localhost/': Url('http', auth='foo', host='localhost', path='/'),
+ 'http://foo:bar@baz@localhost/': Url('http', auth='foo:bar@baz', host='localhost', path='/'),
+ 'http://@': Url('http', host=None, auth='')
+ }
+ for url, expected_url in url_host_map.items():
+ returned_url = parse_url(url)
+ self.assertEqual(returned_url, expected_url)
+
+ def test_parse_url_invalid_IPv6(self):
+ self.assertRaises(ValueError, parse_url, '[::1')
+
+ def test_request_uri(self):
+ url_host_map = {
+ 'http://google.com/mail': '/mail',
+ 'http://google.com/mail/': '/mail/',
+ 'http://google.com/': '/',
+ 'http://google.com': '/',
+ '': '/',
+ '/': '/',
+ '?': '/?',
+ '#': '/',
+ '/foo?bar=baz': '/foo?bar=baz',
+ }
+ for url, expected_request_uri in url_host_map.items():
+ returned_url = parse_url(url)
+ self.assertEqual(returned_url.request_uri, expected_request_uri)
+
+ def test_netloc(self):
+ url_netloc_map = {
+ 'http://google.com/mail': 'google.com',
+ 'http://google.com:80/mail': 'google.com:80',
+ 'google.com/foobar': 'google.com',
+ 'google.com:12345': 'google.com:12345',
+ }
+
+ for url, expected_netloc in url_netloc_map.items():
+ self.assertEqual(parse_url(url).netloc, expected_netloc)
+
+ def test_make_headers(self):
+ self.assertEqual(
+ make_headers(accept_encoding=True),
+ {'accept-encoding': 'gzip,deflate'})
+
+ self.assertEqual(
+ make_headers(accept_encoding='foo,bar'),
+ {'accept-encoding': 'foo,bar'})
+
+ self.assertEqual(
+ make_headers(accept_encoding=['foo', 'bar']),
+ {'accept-encoding': 'foo,bar'})
+
+ self.assertEqual(
+ make_headers(accept_encoding=True, user_agent='banana'),
+ {'accept-encoding': 'gzip,deflate', 'user-agent': 'banana'})
+
+ self.assertEqual(
+ make_headers(user_agent='banana'),
+ {'user-agent': 'banana'})
+
+ self.assertEqual(
+ make_headers(keep_alive=True),
+ {'connection': 'keep-alive'})
+
+ self.assertEqual(
+ make_headers(basic_auth='foo:bar'),
+ {'authorization': 'Basic Zm9vOmJhcg=='})
+
+ self.assertEqual(
+ make_headers(proxy_basic_auth='foo:bar'),
+ {'proxy-authorization': 'Basic Zm9vOmJhcg=='})
+
+ self.assertEqual(
+ make_headers(disable_cache=True),
+ {'cache-control': 'no-cache'})
+
+ def test_split_first(self):
+ test_cases = {
+ ('abcd', 'b'): ('a', 'cd', 'b'),
+ ('abcd', 'cb'): ('a', 'cd', 'b'),
+ ('abcd', ''): ('abcd', '', None),
+ ('abcd', 'a'): ('', 'bcd', 'a'),
+ ('abcd', 'ab'): ('', 'bcd', 'a'),
+ }
+ for input, expected in test_cases.items():
+ output = split_first(*input)
+ self.assertEqual(output, expected)
+
+ def test_add_stderr_logger(self):
+ handler = add_stderr_logger(level=logging.INFO) # Don't actually print debug
+ logger = logging.getLogger('urllib3')
+ self.assertTrue(handler in logger.handlers)
+
+ logger.debug('Testing add_stderr_logger')
+ logger.removeHandler(handler)
+
+ def test_disable_warnings(self):
+ with warnings.catch_warnings(record=True) as w:
+ clear_warnings()
+ warnings.warn('This is a test.', InsecureRequestWarning)
+ self.assertEqual(len(w), 1)
+ disable_warnings()
+ warnings.warn('This is a test.', InsecureRequestWarning)
+ self.assertEqual(len(w), 1)
+
+ def _make_time_pass(self, seconds, timeout, time_mock):
+ """ Make some time pass for the timeout object """
+ time_mock.return_value = TIMEOUT_EPOCH
+ timeout.start_connect()
+ time_mock.return_value = TIMEOUT_EPOCH + seconds
+ return timeout
+
+ def test_invalid_timeouts(self):
+ try:
+ Timeout(total=-1)
+ self.fail("negative value should throw exception")
+ except ValueError as e:
+ self.assertTrue('less than' in str(e))
+ try:
+ Timeout(connect=2, total=-1)
+ self.fail("negative value should throw exception")
+ except ValueError as e:
+ self.assertTrue('less than' in str(e))
+
+ try:
+ Timeout(read=-1)
+ self.fail("negative value should throw exception")
+ except ValueError as e:
+ self.assertTrue('less than' in str(e))
+
+ # Booleans are allowed also by socket.settimeout and converted to the
+ # equivalent float (1.0 for True, 0.0 for False)
+ Timeout(connect=False, read=True)
+
+ try:
+ Timeout(read="foo")
+ self.fail("string value should not be allowed")
+ except ValueError as e:
+ self.assertTrue('int or float' in str(e))
+
+
+ @patch('urllib3.util.timeout.current_time')
+ def test_timeout(self, current_time):
+ timeout = Timeout(total=3)
+
+ # make 'no time' elapse
+ timeout = self._make_time_pass(seconds=0, timeout=timeout,
+ time_mock=current_time)
+ self.assertEqual(timeout.read_timeout, 3)
+ self.assertEqual(timeout.connect_timeout, 3)
+
+ timeout = Timeout(total=3, connect=2)
+ self.assertEqual(timeout.connect_timeout, 2)
+
+ timeout = Timeout()
+ self.assertEqual(timeout.connect_timeout, Timeout.DEFAULT_TIMEOUT)
+
+ # Connect takes 5 seconds, leaving 5 seconds for read
+ timeout = Timeout(total=10, read=7)
+ timeout = self._make_time_pass(seconds=5, timeout=timeout,
+ time_mock=current_time)
+ self.assertEqual(timeout.read_timeout, 5)
+
+ # Connect takes 2 seconds, read timeout still 7 seconds
+ timeout = Timeout(total=10, read=7)
+ timeout = self._make_time_pass(seconds=2, timeout=timeout,
+ time_mock=current_time)
+ self.assertEqual(timeout.read_timeout, 7)
+
+ timeout = Timeout(total=10, read=7)
+ self.assertEqual(timeout.read_timeout, 7)
+
+ timeout = Timeout(total=None, read=None, connect=None)
+ self.assertEqual(timeout.connect_timeout, None)
+ self.assertEqual(timeout.read_timeout, None)
+ self.assertEqual(timeout.total, None)
+
+ timeout = Timeout(5)
+ self.assertEqual(timeout.total, 5)
+
+
+ def test_timeout_str(self):
+ timeout = Timeout(connect=1, read=2, total=3)
+ self.assertEqual(str(timeout), "Timeout(connect=1, read=2, total=3)")
+ timeout = Timeout(connect=1, read=None, total=3)
+ self.assertEqual(str(timeout), "Timeout(connect=1, read=None, total=3)")
+
+
+ @patch('urllib3.util.timeout.current_time')
+ def test_timeout_elapsed(self, current_time):
+ current_time.return_value = TIMEOUT_EPOCH
+ timeout = Timeout(total=3)
+ self.assertRaises(TimeoutStateError, timeout.get_connect_duration)
+
+ timeout.start_connect()
+ self.assertRaises(TimeoutStateError, timeout.start_connect)
+
+ current_time.return_value = TIMEOUT_EPOCH + 2
+ self.assertEqual(timeout.get_connect_duration(), 2)
+ current_time.return_value = TIMEOUT_EPOCH + 37
+ self.assertEqual(timeout.get_connect_duration(), 37)
+
+ def test_resolve_cert_reqs(self):
+ self.assertEqual(resolve_cert_reqs(None), ssl.CERT_NONE)
+ self.assertEqual(resolve_cert_reqs(ssl.CERT_NONE), ssl.CERT_NONE)
+
+ self.assertEqual(resolve_cert_reqs(ssl.CERT_REQUIRED), ssl.CERT_REQUIRED)
+ self.assertEqual(resolve_cert_reqs('REQUIRED'), ssl.CERT_REQUIRED)
+ self.assertEqual(resolve_cert_reqs('CERT_REQUIRED'), ssl.CERT_REQUIRED)
+
+ def test_is_fp_closed_object_supports_closed(self):
+ class ClosedFile(object):
+ @property
+ def closed(self):
+ return True
+
+ self.assertTrue(is_fp_closed(ClosedFile()))
+
+ def test_is_fp_closed_object_has_none_fp(self):
+ class NoneFpFile(object):
+ @property
+ def fp(self):
+ return None
+
+ self.assertTrue(is_fp_closed(NoneFpFile()))
+
+ def test_is_fp_closed_object_has_fp(self):
+ class FpFile(object):
+ @property
+ def fp(self):
+ return True
+
+ self.assertTrue(not is_fp_closed(FpFile()))
+
+ def test_is_fp_closed_object_has_neither_fp_nor_closed(self):
+ class NotReallyAFile(object):
+ pass
+
+ self.assertRaises(ValueError, is_fp_closed, NotReallyAFile())
diff --git a/test/test_util.pyc b/test/test_util.pyc
new file mode 100644
index 0000000..0500c3b
--- /dev/null
+++ b/test/test_util.pyc
Binary files differ
diff --git a/test/with_dummyserver/__init__.py b/test/with_dummyserver/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/with_dummyserver/__init__.py
diff --git a/test/with_dummyserver/__init__.pyc b/test/with_dummyserver/__init__.pyc
new file mode 100644
index 0000000..833be60
--- /dev/null
+++ b/test/with_dummyserver/__init__.pyc
Binary files differ
diff --git a/test/with_dummyserver/test_connectionpool.py b/test/with_dummyserver/test_connectionpool.py
new file mode 100644
index 0000000..7d54fbf
--- /dev/null
+++ b/test/with_dummyserver/test_connectionpool.py
@@ -0,0 +1,706 @@
+import errno
+import logging
+import socket
+import sys
+import unittest
+import time
+
+import mock
+
+try:
+ from urllib.parse import urlencode
+except:
+ from urllib import urlencode
+
+from .. import (
+ requires_network,
+ onlyPy3, onlyPy27OrNewer, onlyPy26OrOlder,
+ TARPIT_HOST, VALID_SOURCE_ADDRESSES, INVALID_SOURCE_ADDRESSES,
+)
+from ..port_helpers import find_unused_port
+from urllib3 import (
+ encode_multipart_formdata,
+ HTTPConnectionPool,
+)
+from urllib3.exceptions import (
+ ConnectTimeoutError,
+ EmptyPoolError,
+ DecodeError,
+ MaxRetryError,
+ ReadTimeoutError,
+ ProtocolError,
+)
+from urllib3.packages.six import b, u
+from urllib3.util.retry import Retry
+from urllib3.util.timeout import Timeout
+
+import tornado
+from dummyserver.testcase import HTTPDummyServerTestCase
+
+from nose.tools import timed
+
+log = logging.getLogger('urllib3.connectionpool')
+log.setLevel(logging.NOTSET)
+log.addHandler(logging.StreamHandler(sys.stdout))
+
+
+class TestConnectionPool(HTTPDummyServerTestCase):
+
+ def setUp(self):
+ self.pool = HTTPConnectionPool(self.host, self.port)
+
+ def test_get(self):
+ r = self.pool.request('GET', '/specific_method',
+ fields={'method': 'GET'})
+ self.assertEqual(r.status, 200, r.data)
+
+ def test_post_url(self):
+ r = self.pool.request('POST', '/specific_method',
+ fields={'method': 'POST'})
+ self.assertEqual(r.status, 200, r.data)
+
+ def test_urlopen_put(self):
+ r = self.pool.urlopen('PUT', '/specific_method?method=PUT')
+ self.assertEqual(r.status, 200, r.data)
+
+ def test_wrong_specific_method(self):
+ # To make sure the dummy server is actually returning failed responses
+ r = self.pool.request('GET', '/specific_method',
+ fields={'method': 'POST'})
+ self.assertEqual(r.status, 400, r.data)
+
+ r = self.pool.request('POST', '/specific_method',
+ fields={'method': 'GET'})
+ self.assertEqual(r.status, 400, r.data)
+
+ def test_upload(self):
+ data = "I'm in ur multipart form-data, hazing a cheezburgr"
+ fields = {
+ 'upload_param': 'filefield',
+ 'upload_filename': 'lolcat.txt',
+ 'upload_size': len(data),
+ 'filefield': ('lolcat.txt', data),
+ }
+
+ r = self.pool.request('POST', '/upload', fields=fields)
+ self.assertEqual(r.status, 200, r.data)
+
+ def test_one_name_multiple_values(self):
+ fields = [
+ ('foo', 'a'),
+ ('foo', 'b'),
+ ]
+
+ # urlencode
+ r = self.pool.request('GET', '/echo', fields=fields)
+ self.assertEqual(r.data, b'foo=a&foo=b')
+
+ # multipart
+ r = self.pool.request('POST', '/echo', fields=fields)
+ self.assertEqual(r.data.count(b'name="foo"'), 2)
+
+
+ def test_unicode_upload(self):
+ fieldname = u('myfile')
+ filename = u('\xe2\x99\xa5.txt')
+ data = u('\xe2\x99\xa5').encode('utf8')
+ size = len(data)
+
+ fields = {
+ u('upload_param'): fieldname,
+ u('upload_filename'): filename,
+ u('upload_size'): size,
+ fieldname: (filename, data),
+ }
+
+ r = self.pool.request('POST', '/upload', fields=fields)
+ self.assertEqual(r.status, 200, r.data)
+
+ def test_timeout_float(self):
+ url = '/sleep?seconds=0.005'
+ # Pool-global timeout
+ pool = HTTPConnectionPool(self.host, self.port, timeout=0.001, retries=False)
+ self.assertRaises(ReadTimeoutError, pool.request, 'GET', url)
+
+ def test_conn_closed(self):
+ pool = HTTPConnectionPool(self.host, self.port, timeout=0.001, retries=False)
+ conn = pool._get_conn()
+ pool._put_conn(conn)
+ try:
+ url = '/sleep?seconds=0.005'
+ pool.urlopen('GET', url)
+ self.fail("The request should fail with a timeout error.")
+ except ReadTimeoutError:
+ if conn.sock:
+ self.assertRaises(socket.error, conn.sock.recv, 1024)
+ finally:
+ pool._put_conn(conn)
+
+ def test_nagle(self):
+ """ Test that connections have TCP_NODELAY turned on """
+ # This test needs to be here in order to be run. socket.create_connection actually tries to
+ # connect to the host provided so we need a dummyserver to be running.
+ pool = HTTPConnectionPool(self.host, self.port)
+ conn = pool._get_conn()
+ pool._make_request(conn, 'GET', '/')
+ tcp_nodelay_setting = conn.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
+ assert tcp_nodelay_setting > 0, ("Expected TCP_NODELAY to be set on the "
+ "socket (with value greater than 0) "
+ "but instead was %s" %
+ tcp_nodelay_setting)
+
+ def test_socket_options(self):
+ """Test that connections accept socket options."""
+ # This test needs to be here in order to be run. socket.create_connection actually tries to
+ # connect to the host provided so we need a dummyserver to be running.
+ pool = HTTPConnectionPool(self.host, self.port, socket_options=[
+ (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ ])
+ s = pool._new_conn()._new_conn() # Get the socket
+ using_keepalive = s.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE) > 0
+ self.assertTrue(using_keepalive)
+ s.close()
+
+ def test_disable_default_socket_options(self):
+ """Test that passing None disables all socket options."""
+ # This test needs to be here in order to be run. socket.create_connection actually tries to
+ # connect to the host provided so we need a dummyserver to be running.
+ pool = HTTPConnectionPool(self.host, self.port, socket_options=None)
+ s = pool._new_conn()._new_conn()
+ using_nagle = s.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) == 0
+ self.assertTrue(using_nagle)
+ s.close()
+
+ def test_defaults_are_applied(self):
+ """Test that modifying the default socket options works."""
+ # This test needs to be here in order to be run. socket.create_connection actually tries to
+ # connect to the host provided so we need a dummyserver to be running.
+ pool = HTTPConnectionPool(self.host, self.port)
+ # Get the HTTPConnection instance
+ conn = pool._new_conn()
+ # Update the default socket options
+ conn.default_socket_options += [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)]
+ s = conn._new_conn()
+ nagle_disabled = s.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) > 0
+ using_keepalive = s.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE) > 0
+ self.assertTrue(nagle_disabled)
+ self.assertTrue(using_keepalive)
+
+ @timed(0.5)
+ def test_timeout(self):
+ """ Requests should time out when expected """
+ url = '/sleep?seconds=0.002'
+ timeout = Timeout(read=0.001)
+
+ # Pool-global timeout
+ pool = HTTPConnectionPool(self.host, self.port, timeout=timeout, retries=False)
+
+ conn = pool._get_conn()
+ self.assertRaises(ReadTimeoutError, pool._make_request,
+ conn, 'GET', url)
+ pool._put_conn(conn)
+
+ time.sleep(0.02) # Wait for server to start receiving again. :(
+
+ self.assertRaises(ReadTimeoutError, pool.request, 'GET', url)
+
+ # Request-specific timeouts should raise errors
+ pool = HTTPConnectionPool(self.host, self.port, timeout=0.1, retries=False)
+
+ conn = pool._get_conn()
+ self.assertRaises(ReadTimeoutError, pool._make_request,
+ conn, 'GET', url, timeout=timeout)
+ pool._put_conn(conn)
+
+ time.sleep(0.02) # Wait for server to start receiving again. :(
+
+ self.assertRaises(ReadTimeoutError, pool.request,
+ 'GET', url, timeout=timeout)
+
+ # Timeout int/float passed directly to request and _make_request should
+ # raise a request timeout
+ self.assertRaises(ReadTimeoutError, pool.request,
+ 'GET', url, timeout=0.001)
+ conn = pool._new_conn()
+ self.assertRaises(ReadTimeoutError, pool._make_request, conn,
+ 'GET', url, timeout=0.001)
+ pool._put_conn(conn)
+
+ # Timeout int/float passed directly to _make_request should not raise a
+ # request timeout if it's a high value
+ pool.request('GET', url, timeout=1)
+
+ @requires_network
+ @timed(0.5)
+ def test_connect_timeout(self):
+ url = '/sleep?seconds=0.005'
+ timeout = Timeout(connect=0.001)
+
+ # Pool-global timeout
+ pool = HTTPConnectionPool(TARPIT_HOST, self.port, timeout=timeout)
+ conn = pool._get_conn()
+ self.assertRaises(ConnectTimeoutError, pool._make_request, conn, 'GET', url)
+
+ # Retries
+ retries = Retry(connect=0)
+ self.assertRaises(MaxRetryError, pool.request, 'GET', url,
+ retries=retries)
+
+ # Request-specific connection timeouts
+ big_timeout = Timeout(read=0.2, connect=0.2)
+ pool = HTTPConnectionPool(TARPIT_HOST, self.port,
+ timeout=big_timeout, retries=False)
+ conn = pool._get_conn()
+ self.assertRaises(ConnectTimeoutError, pool._make_request, conn, 'GET',
+ url, timeout=timeout)
+
+ pool._put_conn(conn)
+ self.assertRaises(ConnectTimeoutError, pool.request, 'GET', url,
+ timeout=timeout)
+
+
+ def test_connection_error_retries(self):
+ """ ECONNREFUSED error should raise a connection error, with retries """
+ port = find_unused_port()
+ pool = HTTPConnectionPool(self.host, port)
+ try:
+ pool.request('GET', '/', retries=Retry(connect=3))
+ self.fail("Should have failed with a connection error.")
+ except MaxRetryError as e:
+ self.assertTrue(isinstance(e.reason, ProtocolError))
+ self.assertEqual(e.reason.args[1].errno, errno.ECONNREFUSED)
+
+ def test_timeout_reset(self):
+ """ If the read timeout isn't set, socket timeout should reset """
+ url = '/sleep?seconds=0.005'
+ timeout = Timeout(connect=0.001)
+ pool = HTTPConnectionPool(self.host, self.port, timeout=timeout)
+ conn = pool._get_conn()
+ try:
+ pool._make_request(conn, 'GET', url)
+ except ReadTimeoutError:
+ self.fail("This request shouldn't trigger a read timeout.")
+
+ @requires_network
+ @timed(5.0)
+ def test_total_timeout(self):
+ url = '/sleep?seconds=0.005'
+
+ timeout = Timeout(connect=3, read=5, total=0.001)
+ pool = HTTPConnectionPool(TARPIT_HOST, self.port, timeout=timeout)
+ conn = pool._get_conn()
+ self.assertRaises(ConnectTimeoutError, pool._make_request, conn, 'GET', url)
+
+ # This will get the socket to raise an EAGAIN on the read
+ timeout = Timeout(connect=3, read=0)
+ pool = HTTPConnectionPool(self.host, self.port, timeout=timeout)
+ conn = pool._get_conn()
+ self.assertRaises(ReadTimeoutError, pool._make_request, conn, 'GET', url)
+
+ # The connect should succeed and this should hit the read timeout
+ timeout = Timeout(connect=3, read=5, total=0.002)
+ pool = HTTPConnectionPool(self.host, self.port, timeout=timeout)
+ conn = pool._get_conn()
+ self.assertRaises(ReadTimeoutError, pool._make_request, conn, 'GET', url)
+
+ @requires_network
+ def test_none_total_applies_connect(self):
+ url = '/sleep?seconds=0.005'
+ timeout = Timeout(total=None, connect=0.001)
+ pool = HTTPConnectionPool(TARPIT_HOST, self.port, timeout=timeout)
+ conn = pool._get_conn()
+ self.assertRaises(ConnectTimeoutError, pool._make_request, conn, 'GET',
+ url)
+
+ def test_timeout_success(self):
+ timeout = Timeout(connect=3, read=5, total=None)
+ pool = HTTPConnectionPool(self.host, self.port, timeout=timeout)
+ pool.request('GET', '/')
+ # This should not raise a "Timeout already started" error
+ pool.request('GET', '/')
+
+ pool = HTTPConnectionPool(self.host, self.port, timeout=timeout)
+ # This should also not raise a "Timeout already started" error
+ pool.request('GET', '/')
+
+ timeout = Timeout(total=None)
+ pool = HTTPConnectionPool(self.host, self.port, timeout=timeout)
+ pool.request('GET', '/')
+
+ def test_tunnel(self):
+ # note the actual httplib.py has no tests for this functionality
+ timeout = Timeout(total=None)
+ pool = HTTPConnectionPool(self.host, self.port, timeout=timeout)
+ conn = pool._get_conn()
+ try:
+ conn.set_tunnel(self.host, self.port)
+ except AttributeError: # python 2.6
+ conn._set_tunnel(self.host, self.port)
+
+ conn._tunnel = mock.Mock(return_value=None)
+ pool._make_request(conn, 'GET', '/')
+ conn._tunnel.assert_called_once_with()
+
+ # test that it's not called when tunnel is not set
+ timeout = Timeout(total=None)
+ pool = HTTPConnectionPool(self.host, self.port, timeout=timeout)
+ conn = pool._get_conn()
+
+ conn._tunnel = mock.Mock(return_value=None)
+ pool._make_request(conn, 'GET', '/')
+ self.assertEqual(conn._tunnel.called, False)
+
+ def test_redirect(self):
+ r = self.pool.request('GET', '/redirect', fields={'target': '/'}, redirect=False)
+ self.assertEqual(r.status, 303)
+
+ r = self.pool.request('GET', '/redirect', fields={'target': '/'})
+ self.assertEqual(r.status, 200)
+ self.assertEqual(r.data, b'Dummy server!')
+
+ def test_bad_connect(self):
+ pool = HTTPConnectionPool('badhost.invalid', self.port)
+ try:
+ pool.request('GET', '/', retries=5)
+ self.fail("should raise timeout exception here")
+ except MaxRetryError as e:
+ self.assertTrue(isinstance(e.reason, ProtocolError), e.reason)
+
+ def test_keepalive(self):
+ pool = HTTPConnectionPool(self.host, self.port, block=True, maxsize=1)
+
+ r = pool.request('GET', '/keepalive?close=0')
+ r = pool.request('GET', '/keepalive?close=0')
+
+ self.assertEqual(r.status, 200)
+ self.assertEqual(pool.num_connections, 1)
+ self.assertEqual(pool.num_requests, 2)
+
+ def test_keepalive_close(self):
+ pool = HTTPConnectionPool(self.host, self.port,
+ block=True, maxsize=1, timeout=2)
+
+ r = pool.request('GET', '/keepalive?close=1', retries=0,
+ headers={
+ "Connection": "close",
+ })
+
+ self.assertEqual(pool.num_connections, 1)
+
+ # The dummyserver will have responded with Connection:close,
+ # and httplib will properly cleanup the socket.
+
+ # We grab the HTTPConnection object straight from the Queue,
+ # because _get_conn() is where the check & reset occurs
+ # pylint: disable-msg=W0212
+ conn = pool.pool.get()
+ self.assertEqual(conn.sock, None)
+ pool._put_conn(conn)
+
+ # Now with keep-alive
+ r = pool.request('GET', '/keepalive?close=0', retries=0,
+ headers={
+ "Connection": "keep-alive",
+ })
+
+ # The dummyserver responded with Connection:keep-alive, the connection
+ # persists.
+ conn = pool.pool.get()
+ self.assertNotEqual(conn.sock, None)
+ pool._put_conn(conn)
+
+ # Another request asking the server to close the connection. This one
+ # should get cleaned up for the next request.
+ r = pool.request('GET', '/keepalive?close=1', retries=0,
+ headers={
+ "Connection": "close",
+ })
+
+ self.assertEqual(r.status, 200)
+
+ conn = pool.pool.get()
+ self.assertEqual(conn.sock, None)
+ pool._put_conn(conn)
+
+ # Next request
+ r = pool.request('GET', '/keepalive?close=0')
+
+ def test_post_with_urlencode(self):
+ data = {'banana': 'hammock', 'lol': 'cat'}
+ r = self.pool.request('POST', '/echo', fields=data, encode_multipart=False)
+ self.assertEqual(r.data.decode('utf-8'), urlencode(data))
+
+ def test_post_with_multipart(self):
+ data = {'banana': 'hammock', 'lol': 'cat'}
+ r = self.pool.request('POST', '/echo',
+ fields=data,
+ encode_multipart=True)
+ body = r.data.split(b'\r\n')
+
+ encoded_data = encode_multipart_formdata(data)[0]
+ expected_body = encoded_data.split(b'\r\n')
+
+ # TODO: Get rid of extra parsing stuff when you can specify
+ # a custom boundary to encode_multipart_formdata
+ """
+ We need to loop the return lines because a timestamp is attached
+ from within encode_multipart_formdata. When the server echos back
+ the data, it has the timestamp from when the data was encoded, which
+ is not equivalent to when we run encode_multipart_formdata on
+ the data again.
+ """
+ for i, line in enumerate(body):
+ if line.startswith(b'--'):
+ continue
+
+ self.assertEqual(body[i], expected_body[i])
+
+ def test_check_gzip(self):
+ r = self.pool.request('GET', '/encodingrequest',
+ headers={'accept-encoding': 'gzip'})
+ self.assertEqual(r.headers.get('content-encoding'), 'gzip')
+ self.assertEqual(r.data, b'hello, world!')
+
+ def test_check_deflate(self):
+ r = self.pool.request('GET', '/encodingrequest',
+ headers={'accept-encoding': 'deflate'})
+ self.assertEqual(r.headers.get('content-encoding'), 'deflate')
+ self.assertEqual(r.data, b'hello, world!')
+
+ def test_bad_decode(self):
+ self.assertRaises(DecodeError, self.pool.request,
+ 'GET', '/encodingrequest',
+ headers={'accept-encoding': 'garbage-deflate'})
+
+ self.assertRaises(DecodeError, self.pool.request,
+ 'GET', '/encodingrequest',
+ headers={'accept-encoding': 'garbage-gzip'})
+
+ def test_connection_count(self):
+ pool = HTTPConnectionPool(self.host, self.port, maxsize=1)
+
+ pool.request('GET', '/')
+ pool.request('GET', '/')
+ pool.request('GET', '/')
+
+ self.assertEqual(pool.num_connections, 1)
+ self.assertEqual(pool.num_requests, 3)
+
+ def test_connection_count_bigpool(self):
+ http_pool = HTTPConnectionPool(self.host, self.port, maxsize=16)
+
+ http_pool.request('GET', '/')
+ http_pool.request('GET', '/')
+ http_pool.request('GET', '/')
+
+ self.assertEqual(http_pool.num_connections, 1)
+ self.assertEqual(http_pool.num_requests, 3)
+
+ def test_partial_response(self):
+ pool = HTTPConnectionPool(self.host, self.port, maxsize=1)
+
+ req_data = {'lol': 'cat'}
+ resp_data = urlencode(req_data).encode('utf-8')
+
+ r = pool.request('GET', '/echo', fields=req_data, preload_content=False)
+
+ self.assertEqual(r.read(5), resp_data[:5])
+ self.assertEqual(r.read(), resp_data[5:])
+
+ def test_lazy_load_twice(self):
+ # This test is sad and confusing. Need to figure out what's
+ # going on with partial reads and socket reuse.
+
+ pool = HTTPConnectionPool(self.host, self.port, block=True, maxsize=1, timeout=2)
+
+ payload_size = 1024 * 2
+ first_chunk = 512
+
+ boundary = 'foo'
+
+ req_data = {'count': 'a' * payload_size}
+ resp_data = encode_multipart_formdata(req_data, boundary=boundary)[0]
+
+ req2_data = {'count': 'b' * payload_size}
+ resp2_data = encode_multipart_formdata(req2_data, boundary=boundary)[0]
+
+ r1 = pool.request('POST', '/echo', fields=req_data, multipart_boundary=boundary, preload_content=False)
+
+ self.assertEqual(r1.read(first_chunk), resp_data[:first_chunk])
+
+ try:
+ r2 = pool.request('POST', '/echo', fields=req2_data, multipart_boundary=boundary,
+ preload_content=False, pool_timeout=0.001)
+
+ # This branch should generally bail here, but maybe someday it will
+ # work? Perhaps by some sort of magic. Consider it a TODO.
+
+ self.assertEqual(r2.read(first_chunk), resp2_data[:first_chunk])
+
+ self.assertEqual(r1.read(), resp_data[first_chunk:])
+ self.assertEqual(r2.read(), resp2_data[first_chunk:])
+ self.assertEqual(pool.num_requests, 2)
+
+ except EmptyPoolError:
+ self.assertEqual(r1.read(), resp_data[first_chunk:])
+ self.assertEqual(pool.num_requests, 1)
+
+ self.assertEqual(pool.num_connections, 1)
+
+ def test_for_double_release(self):
+ MAXSIZE=5
+
+ # Check default state
+ pool = HTTPConnectionPool(self.host, self.port, maxsize=MAXSIZE)
+ self.assertEqual(pool.num_connections, 0)
+ self.assertEqual(pool.pool.qsize(), MAXSIZE)
+
+ # Make an empty slot for testing
+ pool.pool.get()
+ self.assertEqual(pool.pool.qsize(), MAXSIZE-1)
+
+ # Check state after simple request
+ pool.urlopen('GET', '/')
+ self.assertEqual(pool.pool.qsize(), MAXSIZE-1)
+
+ # Check state without release
+ pool.urlopen('GET', '/', preload_content=False)
+ self.assertEqual(pool.pool.qsize(), MAXSIZE-2)
+
+ pool.urlopen('GET', '/')
+ self.assertEqual(pool.pool.qsize(), MAXSIZE-2)
+
+ # Check state after read
+ pool.urlopen('GET', '/').data
+ self.assertEqual(pool.pool.qsize(), MAXSIZE-2)
+
+ pool.urlopen('GET', '/')
+ self.assertEqual(pool.pool.qsize(), MAXSIZE-2)
+
+ def test_release_conn_parameter(self):
+ MAXSIZE=5
+ pool = HTTPConnectionPool(self.host, self.port, maxsize=MAXSIZE)
+ self.assertEqual(pool.pool.qsize(), MAXSIZE)
+
+ # Make request without releasing connection
+ pool.request('GET', '/', release_conn=False, preload_content=False)
+ self.assertEqual(pool.pool.qsize(), MAXSIZE-1)
+
+ def test_dns_error(self):
+ pool = HTTPConnectionPool('thishostdoesnotexist.invalid', self.port, timeout=0.001)
+ self.assertRaises(MaxRetryError, pool.request, 'GET', '/test', retries=2)
+
+ def test_source_address(self):
+ for addr in VALID_SOURCE_ADDRESSES:
+ pool = HTTPConnectionPool(self.host, self.port,
+ source_address=addr, retries=False)
+ r = pool.request('GET', '/source_address')
+ assert r.data == b(addr[0]), (
+ "expected the response to contain the source address {addr}, "
+ "but was {data}".format(data=r.data, addr=b(addr[0])))
+
+ def test_source_address_error(self):
+ for addr in INVALID_SOURCE_ADDRESSES:
+ pool = HTTPConnectionPool(self.host, self.port,
+ source_address=addr, retries=False)
+ self.assertRaises(ProtocolError,
+ pool.request, 'GET', '/source_address')
+
+ @onlyPy3
+ def test_httplib_headers_case_insensitive(self):
+ HEADERS = {'Content-Length': '0', 'Content-type': 'text/plain',
+ 'Server': 'TornadoServer/%s' % tornado.version}
+ r = self.pool.request('GET', '/specific_method',
+ fields={'method': 'GET'})
+ self.assertEqual(HEADERS, dict(r.headers.items())) # to preserve case sensitivity
+
+
+class TestRetry(HTTPDummyServerTestCase):
+ def setUp(self):
+ self.pool = HTTPConnectionPool(self.host, self.port)
+
+ def test_max_retry(self):
+ try:
+ r = self.pool.request('GET', '/redirect',
+ fields={'target': '/'},
+ retries=0)
+ self.fail("Failed to raise MaxRetryError exception, returned %r" % r.status)
+ except MaxRetryError:
+ pass
+
+ def test_disabled_retry(self):
+ """ Disabled retries should disable redirect handling. """
+ r = self.pool.request('GET', '/redirect',
+ fields={'target': '/'},
+ retries=False)
+ self.assertEqual(r.status, 303)
+
+ r = self.pool.request('GET', '/redirect',
+ fields={'target': '/'},
+ retries=Retry(redirect=False))
+ self.assertEqual(r.status, 303)
+
+ pool = HTTPConnectionPool('thishostdoesnotexist.invalid', self.port, timeout=0.001)
+ self.assertRaises(ProtocolError, pool.request, 'GET', '/test', retries=False)
+
+ def test_read_retries(self):
+ """ Should retry for status codes in the whitelist """
+ retry = Retry(read=1, status_forcelist=[418])
+ resp = self.pool.request('GET', '/successful_retry',
+ headers={'test-name': 'test_read_retries'},
+ retries=retry)
+ self.assertEqual(resp.status, 200)
+
+ def test_read_total_retries(self):
+ """ HTTP response w/ status code in the whitelist should be retried """
+ headers = {'test-name': 'test_read_total_retries'}
+ retry = Retry(total=1, status_forcelist=[418])
+ resp = self.pool.request('GET', '/successful_retry',
+ headers=headers, retries=retry)
+ self.assertEqual(resp.status, 200)
+
+ def test_retries_wrong_whitelist(self):
+ """HTTP response w/ status code not in whitelist shouldn't be retried"""
+ retry = Retry(total=1, status_forcelist=[202])
+ resp = self.pool.request('GET', '/successful_retry',
+ headers={'test-name': 'test_wrong_whitelist'},
+ retries=retry)
+ self.assertEqual(resp.status, 418)
+
+ def test_default_method_whitelist_retried(self):
+ """ urllib3 should retry methods in the default method whitelist """
+ retry = Retry(total=1, status_forcelist=[418])
+ resp = self.pool.request('OPTIONS', '/successful_retry',
+ headers={'test-name': 'test_default_whitelist'},
+ retries=retry)
+ self.assertEqual(resp.status, 200)
+
+ def test_retries_wrong_method_list(self):
+ """Method not in our whitelist should not be retried, even if code matches"""
+ headers = {'test-name': 'test_wrong_method_whitelist'}
+ retry = Retry(total=1, status_forcelist=[418],
+ method_whitelist=['POST'])
+ resp = self.pool.request('GET', '/successful_retry',
+ headers=headers, retries=retry)
+ self.assertEqual(resp.status, 418)
+
+ def test_read_retries_unsuccessful(self):
+ headers = {'test-name': 'test_read_retries_unsuccessful'}
+ resp = self.pool.request('GET', '/successful_retry',
+ headers=headers, retries=1)
+ self.assertEqual(resp.status, 418)
+
+ def test_retry_reuse_safe(self):
+ """ It should be possible to reuse a Retry object across requests """
+ headers = {'test-name': 'test_retry_safe'}
+ retry = Retry(total=1, status_forcelist=[418])
+ resp = self.pool.request('GET', '/successful_retry',
+ headers=headers, retries=retry)
+ self.assertEqual(resp.status, 200)
+ resp = self.pool.request('GET', '/successful_retry',
+ headers=headers, retries=retry)
+ self.assertEqual(resp.status, 200)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/with_dummyserver/test_connectionpool.pyc b/test/with_dummyserver/test_connectionpool.pyc
new file mode 100644
index 0000000..b8c38e9
--- /dev/null
+++ b/test/with_dummyserver/test_connectionpool.pyc
Binary files differ
diff --git a/test/with_dummyserver/test_https.py b/test/with_dummyserver/test_https.py
new file mode 100644
index 0000000..cf3eee7
--- /dev/null
+++ b/test/with_dummyserver/test_https.py
@@ -0,0 +1,374 @@
+import datetime
+import logging
+import ssl
+import sys
+import unittest
+import warnings
+
+import mock
+from nose.plugins.skip import SkipTest
+
+from dummyserver.testcase import HTTPSDummyServerTestCase
+from dummyserver.server import DEFAULT_CA, DEFAULT_CA_BAD, DEFAULT_CERTS
+
+from test import (
+ onlyPy26OrOlder,
+ requires_network,
+ TARPIT_HOST,
+ clear_warnings,
+)
+from urllib3 import HTTPSConnectionPool
+from urllib3.connection import (
+ VerifiedHTTPSConnection,
+ UnverifiedHTTPSConnection,
+ RECENT_DATE,
+)
+from urllib3.exceptions import (
+ SSLError,
+ ReadTimeoutError,
+ ConnectTimeoutError,
+ InsecureRequestWarning,
+ SystemTimeWarning,
+)
+from urllib3.util.timeout import Timeout
+
+
+log = logging.getLogger('urllib3.connectionpool')
+log.setLevel(logging.NOTSET)
+log.addHandler(logging.StreamHandler(sys.stdout))
+
+
+
+class TestHTTPS(HTTPSDummyServerTestCase):
+ def setUp(self):
+ self._pool = HTTPSConnectionPool(self.host, self.port)
+
+ def test_simple(self):
+ r = self._pool.request('GET', '/')
+ self.assertEqual(r.status, 200, r.data)
+
+ def test_set_ssl_version_to_tlsv1(self):
+ self._pool.ssl_version = ssl.PROTOCOL_TLSv1
+ r = self._pool.request('GET', '/')
+ self.assertEqual(r.status, 200, r.data)
+
+ def test_verified(self):
+ https_pool = HTTPSConnectionPool(self.host, self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ conn = https_pool._new_conn()
+ self.assertEqual(conn.__class__, VerifiedHTTPSConnection)
+
+ with mock.patch('warnings.warn') as warn:
+ r = https_pool.request('GET', '/')
+ self.assertEqual(r.status, 200)
+ self.assertFalse(warn.called, warn.call_args_list)
+
+ def test_invalid_common_name(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+ try:
+ https_pool.request('GET', '/')
+ self.fail("Didn't raise SSL invalid common name")
+ except SSLError as e:
+ self.assertTrue("doesn't match" in str(e))
+
+ def test_verified_with_bad_ca_certs(self):
+ https_pool = HTTPSConnectionPool(self.host, self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA_BAD)
+
+ try:
+ https_pool.request('GET', '/')
+ self.fail("Didn't raise SSL error with bad CA certs")
+ except SSLError as e:
+ self.assertTrue('certificate verify failed' in str(e),
+ "Expected 'certificate verify failed',"
+ "instead got: %r" % e)
+
+ def test_verified_without_ca_certs(self):
+ # default is cert_reqs=None which is ssl.CERT_NONE
+ https_pool = HTTPSConnectionPool(self.host, self.port,
+ cert_reqs='CERT_REQUIRED')
+
+ try:
+ https_pool.request('GET', '/')
+ self.fail("Didn't raise SSL error with no CA certs when"
+ "CERT_REQUIRED is set")
+ except SSLError as e:
+ # there is a different error message depending on whether or
+ # not pyopenssl is injected
+ self.assertTrue('No root certificates specified' in str(e) or
+ 'certificate verify failed' in str(e),
+ "Expected 'No root certificates specified' or "
+ "'certificate verify failed', "
+ "instead got: %r" % e)
+
+ def test_no_ssl(self):
+ pool = HTTPSConnectionPool(self.host, self.port)
+ pool.ConnectionCls = None
+ self.assertRaises(SSLError, pool._new_conn)
+ self.assertRaises(SSLError, pool.request, 'GET', '/')
+
+ def test_unverified_ssl(self):
+ """ Test that bare HTTPSConnection can connect, make requests """
+ pool = HTTPSConnectionPool(self.host, self.port)
+ pool.ConnectionCls = UnverifiedHTTPSConnection
+
+ with mock.patch('warnings.warn') as warn:
+ r = pool.request('GET', '/')
+ self.assertEqual(r.status, 200)
+ self.assertTrue(warn.called)
+
+ call, = warn.call_args_list
+ category = call[0][1]
+ self.assertEqual(category, InsecureRequestWarning)
+
+ def test_ssl_unverified_with_ca_certs(self):
+ pool = HTTPSConnectionPool(self.host, self.port,
+ cert_reqs='CERT_NONE',
+ ca_certs=DEFAULT_CA_BAD)
+
+ with mock.patch('warnings.warn') as warn:
+ r = pool.request('GET', '/')
+ self.assertEqual(r.status, 200)
+ self.assertTrue(warn.called)
+
+ call, = warn.call_args_list
+ category = call[0][1]
+ self.assertEqual(category, InsecureRequestWarning)
+
+ @requires_network
+ def test_ssl_verified_with_platform_ca_certs(self):
+ """
+ We should rely on the platform CA file to validate authenticity of SSL
+ certificates. Since this file is used by many components of the OS,
+ such as curl, apt-get, etc., we decided to not touch it, in order to
+ not compromise the security of the OS running the test suite (typically
+ urllib3 developer's OS).
+
+ This test assumes that httpbin.org uses a certificate signed by a well
+ known Certificate Authority.
+ """
+ try:
+ import urllib3.contrib.pyopenssl
+ except ImportError:
+ raise SkipTest('Test requires PyOpenSSL')
+ if (urllib3.connection.ssl_wrap_socket is
+ urllib3.contrib.pyopenssl.orig_connection_ssl_wrap_socket):
+ # Not patched
+ raise SkipTest('Test should only be run after PyOpenSSL '
+ 'monkey patching')
+
+ https_pool = HTTPSConnectionPool('httpbin.org', 443,
+ cert_reqs=ssl.CERT_REQUIRED)
+
+ https_pool.request('HEAD', '/')
+
+ def test_assert_hostname_false(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ https_pool.assert_hostname = False
+ https_pool.request('GET', '/')
+
+ def test_assert_specific_hostname(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ https_pool.assert_hostname = 'localhost'
+ https_pool.request('GET', '/')
+
+ def test_assert_fingerprint_md5(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ https_pool.assert_fingerprint = 'CA:84:E1:AD0E5a:ef:2f:C3:09' \
+ ':E7:30:F8:CD:C8:5B'
+ https_pool.request('GET', '/')
+
+ def test_assert_fingerprint_sha1(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ https_pool.assert_fingerprint = 'CC:45:6A:90:82:F7FF:C0:8218:8e:' \
+ '7A:F2:8A:D7:1E:07:33:67:DE'
+ https_pool.request('GET', '/')
+
+ def test_assert_invalid_fingerprint(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ https_pool.assert_fingerprint = 'AA:AA:AA:AA:AA:AAAA:AA:AAAA:AA:' \
+ 'AA:AA:AA:AA:AA:AA:AA:AA:AA'
+
+ self.assertRaises(SSLError, https_pool.request, 'GET', '/')
+ https_pool._get_conn()
+
+ # Uneven length
+ https_pool.assert_fingerprint = 'AA:A'
+ self.assertRaises(SSLError, https_pool.request, 'GET', '/')
+ https_pool._get_conn()
+
+ # Invalid length
+ https_pool.assert_fingerprint = 'AA'
+ self.assertRaises(SSLError, https_pool.request, 'GET', '/')
+
+ def test_verify_none_and_bad_fingerprint(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_NONE',
+ ca_certs=DEFAULT_CA_BAD)
+
+ https_pool.assert_fingerprint = 'AA:AA:AA:AA:AA:AAAA:AA:AAAA:AA:' \
+ 'AA:AA:AA:AA:AA:AA:AA:AA:AA'
+ self.assertRaises(SSLError, https_pool.request, 'GET', '/')
+
+ def test_verify_none_and_good_fingerprint(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_NONE',
+ ca_certs=DEFAULT_CA_BAD)
+
+ https_pool.assert_fingerprint = 'CC:45:6A:90:82:F7FF:C0:8218:8e:' \
+ '7A:F2:8A:D7:1E:07:33:67:DE'
+ https_pool.request('GET', '/')
+
+ @requires_network
+ def test_https_timeout(self):
+ timeout = Timeout(connect=0.001)
+ https_pool = HTTPSConnectionPool(TARPIT_HOST, self.port,
+ timeout=timeout, retries=False,
+ cert_reqs='CERT_REQUIRED')
+
+ timeout = Timeout(total=None, connect=0.001)
+ https_pool = HTTPSConnectionPool(TARPIT_HOST, self.port,
+ timeout=timeout, retries=False,
+ cert_reqs='CERT_REQUIRED')
+ self.assertRaises(ConnectTimeoutError, https_pool.request, 'GET', '/')
+
+ timeout = Timeout(read=0.001)
+ https_pool = HTTPSConnectionPool(self.host, self.port,
+ timeout=timeout, retries=False,
+ cert_reqs='CERT_REQUIRED')
+ https_pool.ca_certs = DEFAULT_CA
+ https_pool.assert_fingerprint = 'CC:45:6A:90:82:F7FF:C0:8218:8e:' \
+ '7A:F2:8A:D7:1E:07:33:67:DE'
+ url = '/sleep?seconds=0.005'
+ self.assertRaises(ReadTimeoutError, https_pool.request, 'GET', url)
+
+ timeout = Timeout(total=None)
+ https_pool = HTTPSConnectionPool(self.host, self.port, timeout=timeout,
+ cert_reqs='CERT_NONE')
+ https_pool.request('GET', '/')
+
+ def test_tunnel(self):
+ """ test the _tunnel behavior """
+ timeout = Timeout(total=None)
+ https_pool = HTTPSConnectionPool(self.host, self.port, timeout=timeout,
+ cert_reqs='CERT_NONE')
+ conn = https_pool._new_conn()
+ try:
+ conn.set_tunnel(self.host, self.port)
+ except AttributeError: # python 2.6
+ conn._set_tunnel(self.host, self.port)
+ conn._tunnel = mock.Mock()
+ https_pool._make_request(conn, 'GET', '/')
+ conn._tunnel.assert_called_once_with()
+
+ @onlyPy26OrOlder
+ def test_tunnel_old_python(self):
+ """HTTPSConnection can still make connections if _tunnel_host isn't set
+
+ The _tunnel_host attribute was added in 2.6.3 - because our test runners
+ generally use the latest Python 2.6, we simulate the old version by
+ deleting the attribute from the HTTPSConnection.
+ """
+ conn = self._pool._new_conn()
+ del conn._tunnel_host
+ self._pool._make_request(conn, 'GET', '/')
+
+ @requires_network
+ def test_enhanced_timeout(self):
+ def new_pool(timeout, cert_reqs='CERT_REQUIRED'):
+ https_pool = HTTPSConnectionPool(TARPIT_HOST, self.port,
+ timeout=timeout,
+ retries=False,
+ cert_reqs=cert_reqs)
+ return https_pool
+
+ https_pool = new_pool(Timeout(connect=0.001))
+ conn = https_pool._new_conn()
+ self.assertRaises(ConnectTimeoutError, https_pool.request, 'GET', '/')
+ self.assertRaises(ConnectTimeoutError, https_pool._make_request, conn,
+ 'GET', '/')
+
+ https_pool = new_pool(Timeout(connect=5))
+ self.assertRaises(ConnectTimeoutError, https_pool.request, 'GET', '/',
+ timeout=Timeout(connect=0.001))
+
+ t = Timeout(total=None)
+ https_pool = new_pool(t)
+ conn = https_pool._new_conn()
+ self.assertRaises(ConnectTimeoutError, https_pool.request, 'GET', '/',
+ timeout=Timeout(total=None, connect=0.001))
+
+ def test_enhanced_ssl_connection(self):
+ fingerprint = 'CC:45:6A:90:82:F7FF:C0:8218:8e:7A:F2:8A:D7:1E:07:33:67:DE'
+
+ conn = VerifiedHTTPSConnection(self.host, self.port)
+ https_pool = HTTPSConnectionPool(self.host, self.port,
+ cert_reqs='CERT_REQUIRED', ca_certs=DEFAULT_CA,
+ assert_fingerprint=fingerprint)
+
+ https_pool._make_request(conn, 'GET', '/')
+
+ def test_ssl_correct_system_time(self):
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter('always')
+ self._pool.request('GET', '/')
+
+ self.assertEqual([], w)
+
+ def test_ssl_wrong_system_time(self):
+ with mock.patch('urllib3.connection.datetime') as mock_date:
+ mock_date.date.today.return_value = datetime.date(1970, 1, 1)
+
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter('always')
+ self._pool.request('GET', '/')
+
+ self.assertEqual(len(w), 1)
+ warning = w[0]
+
+ self.assertEqual(SystemTimeWarning, warning.category)
+ self.assertTrue(str(RECENT_DATE) in warning.message.args[0])
+
+
+class TestHTTPS_TLSv1(HTTPSDummyServerTestCase):
+ certs = DEFAULT_CERTS.copy()
+ certs['ssl_version'] = ssl.PROTOCOL_TLSv1
+
+ def setUp(self):
+ self._pool = HTTPSConnectionPool(self.host, self.port)
+
+ def test_set_ssl_version_to_sslv3(self):
+ self._pool.ssl_version = ssl.PROTOCOL_SSLv3
+ self.assertRaises(SSLError, self._pool.request, 'GET', '/')
+
+ def test_ssl_version_as_string(self):
+ self._pool.ssl_version = 'PROTOCOL_SSLv3'
+ self.assertRaises(SSLError, self._pool.request, 'GET', '/')
+
+ def test_ssl_version_as_short_string(self):
+ self._pool.ssl_version = 'SSLv3'
+ self.assertRaises(SSLError, self._pool.request, 'GET', '/')
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/with_dummyserver/test_https.pyc b/test/with_dummyserver/test_https.pyc
new file mode 100644
index 0000000..6d85316
--- /dev/null
+++ b/test/with_dummyserver/test_https.pyc
Binary files differ
diff --git a/test/with_dummyserver/test_poolmanager.py b/test/with_dummyserver/test_poolmanager.py
new file mode 100644
index 0000000..52ff974
--- /dev/null
+++ b/test/with_dummyserver/test_poolmanager.py
@@ -0,0 +1,136 @@
+import unittest
+import json
+
+from dummyserver.testcase import (HTTPDummyServerTestCase,
+ IPv6HTTPDummyServerTestCase)
+from urllib3.poolmanager import PoolManager
+from urllib3.connectionpool import port_by_scheme
+from urllib3.exceptions import MaxRetryError, SSLError
+
+
+class TestPoolManager(HTTPDummyServerTestCase):
+
+ def setUp(self):
+ self.base_url = 'http://%s:%d' % (self.host, self.port)
+ self.base_url_alt = 'http://%s:%d' % (self.host_alt, self.port)
+
+ def test_redirect(self):
+ http = PoolManager()
+
+ r = http.request('GET', '%s/redirect' % self.base_url,
+ fields={'target': '%s/' % self.base_url},
+ redirect=False)
+
+ self.assertEqual(r.status, 303)
+
+ r = http.request('GET', '%s/redirect' % self.base_url,
+ fields={'target': '%s/' % self.base_url})
+
+ self.assertEqual(r.status, 200)
+ self.assertEqual(r.data, b'Dummy server!')
+
+ def test_redirect_twice(self):
+ http = PoolManager()
+
+ r = http.request('GET', '%s/redirect' % self.base_url,
+ fields={'target': '%s/redirect' % self.base_url},
+ redirect=False)
+
+ self.assertEqual(r.status, 303)
+
+ r = http.request('GET', '%s/redirect' % self.base_url,
+ fields={'target': '%s/redirect?target=%s/' % (self.base_url, self.base_url)})
+
+ self.assertEqual(r.status, 200)
+ self.assertEqual(r.data, b'Dummy server!')
+
+ def test_redirect_to_relative_url(self):
+ http = PoolManager()
+
+ r = http.request('GET', '%s/redirect' % self.base_url,
+ fields = {'target': '/redirect'},
+ redirect = False)
+
+ self.assertEqual(r.status, 303)
+
+ r = http.request('GET', '%s/redirect' % self.base_url,
+ fields = {'target': '/redirect'})
+
+ self.assertEqual(r.status, 200)
+ self.assertEqual(r.data, b'Dummy server!')
+
+ def test_cross_host_redirect(self):
+ http = PoolManager()
+
+ cross_host_location = '%s/echo?a=b' % self.base_url_alt
+ try:
+ http.request('GET', '%s/redirect' % self.base_url,
+ fields={'target': cross_host_location},
+ timeout=0.01, retries=0)
+ self.fail("Request succeeded instead of raising an exception like it should.")
+
+ except MaxRetryError:
+ pass
+
+ r = http.request('GET', '%s/redirect' % self.base_url,
+ fields={'target': '%s/echo?a=b' % self.base_url_alt},
+ timeout=0.01, retries=1)
+
+ self.assertEqual(r._pool.host, self.host_alt)
+
+ def test_missing_port(self):
+ # Can a URL that lacks an explicit port like ':80' succeed, or
+ # will all such URLs fail with an error?
+
+ http = PoolManager()
+
+ # By globally adjusting `port_by_scheme` we pretend for a moment
+ # that HTTP's default port is not 80, but is the port at which
+ # our test server happens to be listening.
+ port_by_scheme['http'] = self.port
+ try:
+ r = http.request('GET', 'http://%s/' % self.host, retries=0)
+ finally:
+ port_by_scheme['http'] = 80
+
+ self.assertEqual(r.status, 200)
+ self.assertEqual(r.data, b'Dummy server!')
+
+ def test_headers(self):
+ http = PoolManager(headers={'Foo': 'bar'})
+
+ r = http.request_encode_url('GET', '%s/headers' % self.base_url)
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), 'bar')
+
+ r = http.request_encode_body('POST', '%s/headers' % self.base_url)
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), 'bar')
+
+ r = http.request_encode_url('GET', '%s/headers' % self.base_url, headers={'Baz': 'quux'})
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), None)
+ self.assertEqual(returned_headers.get('Baz'), 'quux')
+
+ r = http.request_encode_body('GET', '%s/headers' % self.base_url, headers={'Baz': 'quux'})
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), None)
+ self.assertEqual(returned_headers.get('Baz'), 'quux')
+
+ def test_http_with_ssl_keywords(self):
+ http = PoolManager(ca_certs='REQUIRED')
+
+ r = http.request('GET', 'http://%s:%s/' % (self.host, self.port))
+ self.assertEqual(r.status, 200)
+
+
+class TestIPv6PoolManager(IPv6HTTPDummyServerTestCase):
+ def setUp(self):
+ self.base_url = 'http://[%s]:%d' % (self.host, self.port)
+
+ def test_ipv6(self):
+ http = PoolManager()
+ http.request('GET', self.base_url)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/with_dummyserver/test_poolmanager.pyc b/test/with_dummyserver/test_poolmanager.pyc
new file mode 100644
index 0000000..26c52e9
--- /dev/null
+++ b/test/with_dummyserver/test_poolmanager.pyc
Binary files differ
diff --git a/test/with_dummyserver/test_proxy_poolmanager.py b/test/with_dummyserver/test_proxy_poolmanager.py
new file mode 100644
index 0000000..61eedf1
--- /dev/null
+++ b/test/with_dummyserver/test_proxy_poolmanager.py
@@ -0,0 +1,263 @@
+import unittest
+import json
+import socket
+
+from dummyserver.testcase import HTTPDummyProxyTestCase
+from dummyserver.server import (
+ DEFAULT_CA, DEFAULT_CA_BAD, get_unreachable_address)
+
+from urllib3.poolmanager import proxy_from_url, ProxyManager
+from urllib3.exceptions import MaxRetryError, SSLError, ProxyError
+from urllib3.connectionpool import connection_from_url, VerifiedHTTPSConnection
+
+
+class TestHTTPProxyManager(HTTPDummyProxyTestCase):
+
+ def setUp(self):
+ self.http_url = 'http://%s:%d' % (self.http_host, self.http_port)
+ self.http_url_alt = 'http://%s:%d' % (self.http_host_alt,
+ self.http_port)
+ self.https_url = 'https://%s:%d' % (self.https_host, self.https_port)
+ self.https_url_alt = 'https://%s:%d' % (self.https_host_alt,
+ self.https_port)
+ self.proxy_url = 'http://%s:%d' % (self.proxy_host, self.proxy_port)
+
+ def test_basic_proxy(self):
+ http = proxy_from_url(self.proxy_url)
+
+ r = http.request('GET', '%s/' % self.http_url)
+ self.assertEqual(r.status, 200)
+
+ r = http.request('GET', '%s/' % self.https_url)
+ self.assertEqual(r.status, 200)
+
+ def test_nagle_proxy(self):
+ """ Test that proxy connections do not have TCP_NODELAY turned on """
+ http = proxy_from_url(self.proxy_url)
+ hc2 = http.connection_from_host(self.http_host, self.http_port)
+ conn = hc2._get_conn()
+ hc2._make_request(conn, 'GET', '/')
+ tcp_nodelay_setting = conn.sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
+ self.assertEqual(tcp_nodelay_setting, 0,
+ ("Expected TCP_NODELAY for proxies to be set "
+ "to zero, instead was %s" % tcp_nodelay_setting))
+
+ def test_proxy_conn_fail(self):
+ host, port = get_unreachable_address()
+ http = proxy_from_url('http://%s:%s/' % (host, port), retries=1)
+ self.assertRaises(MaxRetryError, http.request, 'GET',
+ '%s/' % self.https_url)
+ self.assertRaises(MaxRetryError, http.request, 'GET',
+ '%s/' % self.http_url)
+
+ try:
+ http.request('GET', '%s/' % self.http_url)
+ self.fail("Failed to raise retry error.")
+ except MaxRetryError as e:
+ self.assertEqual(type(e.reason), ProxyError)
+
+ def test_oldapi(self):
+ http = ProxyManager(connection_from_url(self.proxy_url))
+
+ r = http.request('GET', '%s/' % self.http_url)
+ self.assertEqual(r.status, 200)
+
+ r = http.request('GET', '%s/' % self.https_url)
+ self.assertEqual(r.status, 200)
+
+ def test_proxy_verified(self):
+ http = proxy_from_url(self.proxy_url, cert_reqs='REQUIRED',
+ ca_certs=DEFAULT_CA_BAD)
+ https_pool = http._new_pool('https', self.https_host,
+ self.https_port)
+ try:
+ https_pool.request('GET', '/')
+ self.fail("Didn't raise SSL error with wrong CA")
+ except SSLError as e:
+ self.assertTrue('certificate verify failed' in str(e),
+ "Expected 'certificate verify failed',"
+ "instead got: %r" % e)
+
+ http = proxy_from_url(self.proxy_url, cert_reqs='REQUIRED',
+ ca_certs=DEFAULT_CA)
+ https_pool = http._new_pool('https', self.https_host,
+ self.https_port)
+
+ conn = https_pool._new_conn()
+ self.assertEqual(conn.__class__, VerifiedHTTPSConnection)
+ https_pool.request('GET', '/') # Should succeed without exceptions.
+
+ http = proxy_from_url(self.proxy_url, cert_reqs='REQUIRED',
+ ca_certs=DEFAULT_CA)
+ https_fail_pool = http._new_pool('https', '127.0.0.1', self.https_port)
+
+ try:
+ https_fail_pool.request('GET', '/')
+ self.fail("Didn't raise SSL invalid common name")
+ except SSLError as e:
+ self.assertTrue("doesn't match" in str(e))
+
+ def test_redirect(self):
+ http = proxy_from_url(self.proxy_url)
+
+ r = http.request('GET', '%s/redirect' % self.http_url,
+ fields={'target': '%s/' % self.http_url},
+ redirect=False)
+
+ self.assertEqual(r.status, 303)
+
+ r = http.request('GET', '%s/redirect' % self.http_url,
+ fields={'target': '%s/' % self.http_url})
+
+ self.assertEqual(r.status, 200)
+ self.assertEqual(r.data, b'Dummy server!')
+
+ def test_cross_host_redirect(self):
+ http = proxy_from_url(self.proxy_url)
+
+ cross_host_location = '%s/echo?a=b' % self.http_url_alt
+ try:
+ http.request('GET', '%s/redirect' % self.http_url,
+ fields={'target': cross_host_location},
+ timeout=0.1, retries=0)
+ self.fail("We don't want to follow redirects here.")
+
+ except MaxRetryError:
+ pass
+
+ r = http.request('GET', '%s/redirect' % self.http_url,
+ fields={'target': '%s/echo?a=b' % self.http_url_alt},
+ timeout=0.1, retries=1)
+ self.assertNotEqual(r._pool.host, self.http_host_alt)
+
+ def test_cross_protocol_redirect(self):
+ http = proxy_from_url(self.proxy_url)
+
+ cross_protocol_location = '%s/echo?a=b' % self.https_url
+ try:
+ http.request('GET', '%s/redirect' % self.http_url,
+ fields={'target': cross_protocol_location},
+ timeout=0.1, retries=0)
+ self.fail("We don't want to follow redirects here.")
+
+ except MaxRetryError:
+ pass
+
+ r = http.request('GET', '%s/redirect' % self.http_url,
+ fields={'target': '%s/echo?a=b' % self.https_url},
+ timeout=0.1, retries=1)
+ self.assertEqual(r._pool.host, self.https_host)
+
+ def test_headers(self):
+ http = proxy_from_url(self.proxy_url,headers={'Foo': 'bar'},
+ proxy_headers={'Hickory': 'dickory'})
+
+ r = http.request_encode_url('GET', '%s/headers' % self.http_url)
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), 'bar')
+ self.assertEqual(returned_headers.get('Hickory'), 'dickory')
+ self.assertEqual(returned_headers.get('Host'),
+ '%s:%s'%(self.http_host,self.http_port))
+
+ r = http.request_encode_url('GET', '%s/headers' % self.http_url_alt)
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), 'bar')
+ self.assertEqual(returned_headers.get('Hickory'), 'dickory')
+ self.assertEqual(returned_headers.get('Host'),
+ '%s:%s'%(self.http_host_alt,self.http_port))
+
+ r = http.request_encode_url('GET', '%s/headers' % self.https_url)
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), 'bar')
+ self.assertEqual(returned_headers.get('Hickory'), None)
+ self.assertEqual(returned_headers.get('Host'),
+ '%s:%s'%(self.https_host,self.https_port))
+
+ r = http.request_encode_url('GET', '%s/headers' % self.https_url_alt)
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), 'bar')
+ self.assertEqual(returned_headers.get('Hickory'), None)
+ self.assertEqual(returned_headers.get('Host'),
+ '%s:%s'%(self.https_host_alt,self.https_port))
+
+ r = http.request_encode_body('POST', '%s/headers' % self.http_url)
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), 'bar')
+ self.assertEqual(returned_headers.get('Hickory'), 'dickory')
+ self.assertEqual(returned_headers.get('Host'),
+ '%s:%s'%(self.http_host,self.http_port))
+
+ r = http.request_encode_url('GET', '%s/headers' % self.http_url, headers={'Baz': 'quux'})
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), None)
+ self.assertEqual(returned_headers.get('Baz'), 'quux')
+ self.assertEqual(returned_headers.get('Hickory'), 'dickory')
+ self.assertEqual(returned_headers.get('Host'),
+ '%s:%s'%(self.http_host,self.http_port))
+
+ r = http.request_encode_url('GET', '%s/headers' % self.https_url, headers={'Baz': 'quux'})
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), None)
+ self.assertEqual(returned_headers.get('Baz'), 'quux')
+ self.assertEqual(returned_headers.get('Hickory'), None)
+ self.assertEqual(returned_headers.get('Host'),
+ '%s:%s'%(self.https_host,self.https_port))
+
+ r = http.request_encode_body('GET', '%s/headers' % self.http_url, headers={'Baz': 'quux'})
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), None)
+ self.assertEqual(returned_headers.get('Baz'), 'quux')
+ self.assertEqual(returned_headers.get('Hickory'), 'dickory')
+ self.assertEqual(returned_headers.get('Host'),
+ '%s:%s'%(self.http_host,self.http_port))
+
+ r = http.request_encode_body('GET', '%s/headers' % self.https_url, headers={'Baz': 'quux'})
+ returned_headers = json.loads(r.data.decode())
+ self.assertEqual(returned_headers.get('Foo'), None)
+ self.assertEqual(returned_headers.get('Baz'), 'quux')
+ self.assertEqual(returned_headers.get('Hickory'), None)
+ self.assertEqual(returned_headers.get('Host'),
+ '%s:%s'%(self.https_host,self.https_port))
+
+ def test_proxy_pooling(self):
+ http = proxy_from_url(self.proxy_url)
+
+ for x in range(2):
+ r = http.urlopen('GET', self.http_url)
+ self.assertEqual(len(http.pools), 1)
+
+ for x in range(2):
+ r = http.urlopen('GET', self.http_url_alt)
+ self.assertEqual(len(http.pools), 1)
+
+ for x in range(2):
+ r = http.urlopen('GET', self.https_url)
+ self.assertEqual(len(http.pools), 2)
+
+ for x in range(2):
+ r = http.urlopen('GET', self.https_url_alt)
+ self.assertEqual(len(http.pools), 3)
+
+ def test_proxy_pooling_ext(self):
+ http = proxy_from_url(self.proxy_url)
+ hc1 = http.connection_from_url(self.http_url)
+ hc2 = http.connection_from_host(self.http_host, self.http_port)
+ hc3 = http.connection_from_url(self.http_url_alt)
+ hc4 = http.connection_from_host(self.http_host_alt, self.http_port)
+ self.assertEqual(hc1,hc2)
+ self.assertEqual(hc2,hc3)
+ self.assertEqual(hc3,hc4)
+
+ sc1 = http.connection_from_url(self.https_url)
+ sc2 = http.connection_from_host(self.https_host,
+ self.https_port,scheme='https')
+ sc3 = http.connection_from_url(self.https_url_alt)
+ sc4 = http.connection_from_host(self.https_host_alt,
+ self.https_port,scheme='https')
+ self.assertEqual(sc1,sc2)
+ self.assertNotEqual(sc2,sc3)
+ self.assertEqual(sc3,sc4)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/with_dummyserver/test_proxy_poolmanager.pyc b/test/with_dummyserver/test_proxy_poolmanager.pyc
new file mode 100644
index 0000000..12c320c
--- /dev/null
+++ b/test/with_dummyserver/test_proxy_poolmanager.pyc
Binary files differ
diff --git a/test/with_dummyserver/test_socketlevel.py b/test/with_dummyserver/test_socketlevel.py
new file mode 100644
index 0000000..e1ac1c6
--- /dev/null
+++ b/test/with_dummyserver/test_socketlevel.py
@@ -0,0 +1,544 @@
+# TODO: Break this module up into pieces. Maybe group by functionality tested
+# rather than the socket level-ness of it.
+
+from urllib3 import HTTPConnectionPool, HTTPSConnectionPool
+from urllib3.poolmanager import proxy_from_url
+from urllib3.exceptions import (
+ MaxRetryError,
+ ProxyError,
+ ReadTimeoutError,
+ SSLError,
+ ProtocolError,
+)
+from urllib3.util.ssl_ import HAS_SNI
+from urllib3.util.timeout import Timeout
+from urllib3.util.retry import Retry
+
+from dummyserver.testcase import SocketDummyServerTestCase
+from dummyserver.server import (
+ DEFAULT_CERTS, DEFAULT_CA, get_unreachable_address)
+
+from nose.plugins.skip import SkipTest
+from threading import Event
+import socket
+import ssl
+
+
+class TestCookies(SocketDummyServerTestCase):
+
+ def test_multi_setcookie(self):
+ def multicookie_response_handler(listener):
+ sock = listener.accept()[0]
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += sock.recv(65536)
+
+ sock.send(b'HTTP/1.1 200 OK\r\n'
+ b'Set-Cookie: foo=1\r\n'
+ b'Set-Cookie: bar=1\r\n'
+ b'\r\n')
+ sock.close()
+
+ self._start_server(multicookie_response_handler)
+ pool = HTTPConnectionPool(self.host, self.port)
+ r = pool.request('GET', '/', retries=0)
+ self.assertEqual(r.headers, {'set-cookie': 'foo=1, bar=1'})
+
+
+class TestSNI(SocketDummyServerTestCase):
+
+ def test_hostname_in_first_request_packet(self):
+ if not HAS_SNI:
+ raise SkipTest('SNI-support not available')
+
+ done_receiving = Event()
+ self.buf = b''
+
+ def socket_handler(listener):
+ sock = listener.accept()[0]
+
+ self.buf = sock.recv(65536) # We only accept one packet
+ done_receiving.set() # let the test know it can proceed
+ sock.close()
+
+ self._start_server(socket_handler)
+ pool = HTTPSConnectionPool(self.host, self.port)
+ try:
+ pool.request('GET', '/', retries=0)
+ except SSLError: # We are violating the protocol
+ pass
+ done_receiving.wait()
+ self.assertTrue(self.host.encode() in self.buf,
+ "missing hostname in SSL handshake")
+
+
+class TestSocketClosing(SocketDummyServerTestCase):
+
+ def test_recovery_when_server_closes_connection(self):
+ # Does the pool work seamlessly if an open connection in the
+ # connection pool gets hung up on by the server, then reaches
+ # the front of the queue again?
+
+ done_closing = Event()
+
+ def socket_handler(listener):
+ for i in 0, 1:
+ sock = listener.accept()[0]
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf = sock.recv(65536)
+
+ body = 'Response %d' % i
+ sock.send(('HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n'
+ '%s' % (len(body), body)).encode('utf-8'))
+
+ sock.close() # simulate a server timing out, closing socket
+ done_closing.set() # let the test know it can proceed
+
+ self._start_server(socket_handler)
+ pool = HTTPConnectionPool(self.host, self.port)
+
+ response = pool.request('GET', '/', retries=0)
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.data, b'Response 0')
+
+ done_closing.wait() # wait until the socket in our pool gets closed
+
+ response = pool.request('GET', '/', retries=0)
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.data, b'Response 1')
+
+ def test_connection_refused(self):
+ # Does the pool retry if there is no listener on the port?
+ host, port = get_unreachable_address()
+ pool = HTTPConnectionPool(host, port)
+ self.assertRaises(MaxRetryError, pool.request, 'GET', '/', retries=0)
+
+ def test_connection_read_timeout(self):
+ timed_out = Event()
+ def socket_handler(listener):
+ sock = listener.accept()[0]
+ while not sock.recv(65536).endswith(b'\r\n\r\n'):
+ pass
+
+ timed_out.wait()
+ sock.close()
+
+ self._start_server(socket_handler)
+ pool = HTTPConnectionPool(self.host, self.port, timeout=0.001, retries=False)
+
+ try:
+ self.assertRaises(ReadTimeoutError, pool.request, 'GET', '/')
+ finally:
+ timed_out.set()
+
+ def test_timeout_errors_cause_retries(self):
+ def socket_handler(listener):
+ sock_timeout = listener.accept()[0]
+
+ # Wait for a second request before closing the first socket.
+ sock = listener.accept()[0]
+ sock_timeout.close()
+
+ # Second request.
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += sock.recv(65536)
+
+ # Now respond immediately.
+ body = 'Response 2'
+ sock.send(('HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n'
+ '%s' % (len(body), body)).encode('utf-8'))
+
+ sock.close()
+
+ # In situations where the main thread throws an exception, the server
+ # thread can hang on an accept() call. This ensures everything times
+ # out within 1 second. This should be long enough for any socket
+ # operations in the test suite to complete
+ default_timeout = socket.getdefaulttimeout()
+ socket.setdefaulttimeout(1)
+
+ try:
+ self._start_server(socket_handler)
+ t = Timeout(connect=0.001, read=0.001)
+ pool = HTTPConnectionPool(self.host, self.port, timeout=t)
+
+ response = pool.request('GET', '/', retries=1)
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.data, b'Response 2')
+ finally:
+ socket.setdefaulttimeout(default_timeout)
+
+ def test_delayed_body_read_timeout(self):
+ timed_out = Event()
+
+ def socket_handler(listener):
+ sock = listener.accept()[0]
+ buf = b''
+ body = 'Hi'
+ while not buf.endswith(b'\r\n\r\n'):
+ buf = sock.recv(65536)
+ sock.send(('HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n' % len(body)).encode('utf-8'))
+
+ timed_out.wait()
+ sock.send(body.encode('utf-8'))
+ sock.close()
+
+ self._start_server(socket_handler)
+ pool = HTTPConnectionPool(self.host, self.port)
+
+ response = pool.urlopen('GET', '/', retries=0, preload_content=False,
+ timeout=Timeout(connect=1, read=0.001))
+ try:
+ self.assertRaises(ReadTimeoutError, response.read)
+ finally:
+ timed_out.set()
+
+ def test_incomplete_response(self):
+ body = 'Response'
+ partial_body = body[:2]
+
+ def socket_handler(listener):
+ sock = listener.accept()[0]
+
+ # Consume request
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf = sock.recv(65536)
+
+ # Send partial response and close socket.
+ sock.send((
+ 'HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n'
+ '%s' % (len(body), partial_body)).encode('utf-8')
+ )
+ sock.close()
+
+ self._start_server(socket_handler)
+ pool = HTTPConnectionPool(self.host, self.port)
+
+ response = pool.request('GET', '/', retries=0, preload_content=False)
+ self.assertRaises(ProtocolError, response.read)
+
+ def test_retry_weird_http_version(self):
+ """ Retry class should handle httplib.BadStatusLine errors properly """
+
+ def socket_handler(listener):
+ sock = listener.accept()[0]
+ # First request.
+ # Pause before responding so the first request times out.
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += sock.recv(65536)
+
+ # send unknown http protocol
+ body = "bad http 0.5 response"
+ sock.send(('HTTP/0.5 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n'
+ '%s' % (len(body), body)).encode('utf-8'))
+ sock.close()
+
+ # Second request.
+ sock = listener.accept()[0]
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += sock.recv(65536)
+
+ # Now respond immediately.
+ sock.send(('HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n'
+ 'foo' % (len('foo'))).encode('utf-8'))
+
+ sock.close() # Close the socket.
+
+ self._start_server(socket_handler)
+ pool = HTTPConnectionPool(self.host, self.port)
+ retry = Retry(read=1)
+ response = pool.request('GET', '/', retries=retry)
+ self.assertEqual(response.status, 200)
+ self.assertEqual(response.data, b'foo')
+
+
+
+class TestProxyManager(SocketDummyServerTestCase):
+
+ def test_simple(self):
+ def echo_socket_handler(listener):
+ sock = listener.accept()[0]
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += sock.recv(65536)
+
+ sock.send(('HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n'
+ '%s' % (len(buf), buf.decode('utf-8'))).encode('utf-8'))
+ sock.close()
+
+ self._start_server(echo_socket_handler)
+ base_url = 'http://%s:%d' % (self.host, self.port)
+ proxy = proxy_from_url(base_url)
+
+ r = proxy.request('GET', 'http://google.com/')
+
+ self.assertEqual(r.status, 200)
+ # FIXME: The order of the headers is not predictable right now. We
+ # should fix that someday (maybe when we migrate to
+ # OrderedDict/MultiDict).
+ self.assertEqual(sorted(r.data.split(b'\r\n')),
+ sorted([
+ b'GET http://google.com/ HTTP/1.1',
+ b'Host: google.com',
+ b'Accept-Encoding: identity',
+ b'Accept: */*',
+ b'',
+ b'',
+ ]))
+
+ def test_headers(self):
+ def echo_socket_handler(listener):
+ sock = listener.accept()[0]
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += sock.recv(65536)
+
+ sock.send(('HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n'
+ '%s' % (len(buf), buf.decode('utf-8'))).encode('utf-8'))
+ sock.close()
+
+ self._start_server(echo_socket_handler)
+ base_url = 'http://%s:%d' % (self.host, self.port)
+
+ # Define some proxy headers.
+ proxy_headers = {'For The Proxy': 'YEAH!'}
+ proxy = proxy_from_url(base_url, proxy_headers=proxy_headers)
+
+ conn = proxy.connection_from_url('http://www.google.com/')
+
+ r = conn.urlopen('GET', 'http://www.google.com/', assert_same_host=False)
+
+ self.assertEqual(r.status, 200)
+ # FIXME: The order of the headers is not predictable right now. We
+ # should fix that someday (maybe when we migrate to
+ # OrderedDict/MultiDict).
+ self.assertTrue(b'For The Proxy: YEAH!\r\n' in r.data)
+
+ def test_retries(self):
+ def echo_socket_handler(listener):
+ sock = listener.accept()[0]
+ # First request, which should fail
+ sock.close()
+
+ # Second request
+ sock = listener.accept()[0]
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += sock.recv(65536)
+
+ sock.send(('HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: %d\r\n'
+ '\r\n'
+ '%s' % (len(buf), buf.decode('utf-8'))).encode('utf-8'))
+ sock.close()
+
+ self._start_server(echo_socket_handler)
+ base_url = 'http://%s:%d' % (self.host, self.port)
+
+ proxy = proxy_from_url(base_url)
+ conn = proxy.connection_from_url('http://www.google.com')
+
+ r = conn.urlopen('GET', 'http://www.google.com',
+ assert_same_host=False, retries=1)
+ self.assertEqual(r.status, 200)
+
+ self.assertRaises(ProxyError, conn.urlopen, 'GET',
+ 'http://www.google.com',
+ assert_same_host=False, retries=False)
+
+ def test_connect_reconn(self):
+ def proxy_ssl_one(listener):
+ sock = listener.accept()[0]
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += sock.recv(65536)
+ s = buf.decode('utf-8')
+ if not s.startswith('CONNECT '):
+ sock.send(('HTTP/1.1 405 Method not allowed\r\n'
+ 'Allow: CONNECT\r\n\r\n').encode('utf-8'))
+ sock.close()
+ return
+
+ if not s.startswith('CONNECT %s:443' % (self.host,)):
+ sock.send(('HTTP/1.1 403 Forbidden\r\n\r\n').encode('utf-8'))
+ sock.close()
+ return
+
+ sock.send(('HTTP/1.1 200 Connection Established\r\n\r\n').encode('utf-8'))
+ ssl_sock = ssl.wrap_socket(sock,
+ server_side=True,
+ keyfile=DEFAULT_CERTS['keyfile'],
+ certfile=DEFAULT_CERTS['certfile'],
+ ca_certs=DEFAULT_CA)
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += ssl_sock.recv(65536)
+
+ ssl_sock.send(('HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: 2\r\n'
+ 'Connection: close\r\n'
+ '\r\n'
+ 'Hi').encode('utf-8'))
+ ssl_sock.close()
+ def echo_socket_handler(listener):
+ proxy_ssl_one(listener)
+ proxy_ssl_one(listener)
+
+ self._start_server(echo_socket_handler)
+ base_url = 'http://%s:%d' % (self.host, self.port)
+
+ proxy = proxy_from_url(base_url)
+
+ url = 'https://{0}'.format(self.host)
+ conn = proxy.connection_from_url(url)
+ r = conn.urlopen('GET', url, retries=0)
+ self.assertEqual(r.status, 200)
+ r = conn.urlopen('GET', url, retries=0)
+ self.assertEqual(r.status, 200)
+
+
+class TestSSL(SocketDummyServerTestCase):
+
+ def test_ssl_failure_midway_through_conn(self):
+ def socket_handler(listener):
+ sock = listener.accept()[0]
+ sock2 = sock.dup()
+ ssl_sock = ssl.wrap_socket(sock,
+ server_side=True,
+ keyfile=DEFAULT_CERTS['keyfile'],
+ certfile=DEFAULT_CERTS['certfile'],
+ ca_certs=DEFAULT_CA)
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += ssl_sock.recv(65536)
+
+ # Deliberately send from the non-SSL socket.
+ sock2.send((
+ 'HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: 2\r\n'
+ '\r\n'
+ 'Hi').encode('utf-8'))
+ sock2.close()
+ ssl_sock.close()
+
+ self._start_server(socket_handler)
+ pool = HTTPSConnectionPool(self.host, self.port)
+
+ self.assertRaises(SSLError, pool.request, 'GET', '/', retries=0)
+
+ def test_ssl_read_timeout(self):
+ timed_out = Event()
+
+ def socket_handler(listener):
+ sock = listener.accept()[0]
+ ssl_sock = ssl.wrap_socket(sock,
+ server_side=True,
+ keyfile=DEFAULT_CERTS['keyfile'],
+ certfile=DEFAULT_CERTS['certfile'],
+ ca_certs=DEFAULT_CA)
+
+ buf = b''
+ while not buf.endswith(b'\r\n\r\n'):
+ buf += ssl_sock.recv(65536)
+
+ # Send incomplete message (note Content-Length)
+ ssl_sock.send((
+ 'HTTP/1.1 200 OK\r\n'
+ 'Content-Type: text/plain\r\n'
+ 'Content-Length: 10\r\n'
+ '\r\n'
+ 'Hi-').encode('utf-8'))
+ timed_out.wait()
+
+ sock.close()
+ ssl_sock.close()
+
+ self._start_server(socket_handler)
+ pool = HTTPSConnectionPool(self.host, self.port)
+
+ response = pool.urlopen('GET', '/', retries=0, preload_content=False,
+ timeout=Timeout(connect=1, read=0.001))
+ try:
+ self.assertRaises(ReadTimeoutError, response.read)
+ finally:
+ timed_out.set()
+
+
+def consume_socket(sock, chunks=65536):
+ while not sock.recv(chunks).endswith(b'\r\n\r\n'):
+ pass
+
+
+def create_response_handler(response, num=1):
+ def socket_handler(listener):
+ for _ in range(num):
+ sock = listener.accept()[0]
+ consume_socket(sock)
+
+ sock.send(response)
+ sock.close()
+
+ return socket_handler
+
+
+class TestErrorWrapping(SocketDummyServerTestCase):
+
+ def test_bad_statusline(self):
+ handler = create_response_handler(
+ b'HTTP/1.1 Omg What Is This?\r\n'
+ b'Content-Length: 0\r\n'
+ b'\r\n'
+ )
+ self._start_server(handler)
+ pool = HTTPConnectionPool(self.host, self.port, retries=False)
+ self.assertRaises(ProtocolError, pool.request, 'GET', '/')
+
+ def test_unknown_protocol(self):
+ handler = create_response_handler(
+ b'HTTP/1000 200 OK\r\n'
+ b'Content-Length: 0\r\n'
+ b'\r\n'
+ )
+ self._start_server(handler)
+ pool = HTTPConnectionPool(self.host, self.port, retries=False)
+ self.assertRaises(ProtocolError, pool.request, 'GET', '/')
diff --git a/test/with_dummyserver/test_socketlevel.pyc b/test/with_dummyserver/test_socketlevel.pyc
new file mode 100644
index 0000000..ba3b19e
--- /dev/null
+++ b/test/with_dummyserver/test_socketlevel.pyc
Binary files differ