aboutsummaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/__init__.py0
-rw-r--r--test/benchmark.py77
-rw-r--r--test/test_collections.py111
-rw-r--r--test/test_connectionpool.py136
-rw-r--r--test/test_poolmanager.py47
-rw-r--r--test/test_response.py68
6 files changed, 439 insertions, 0 deletions
diff --git a/test/__init__.py b/test/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/__init__.py
diff --git a/test/benchmark.py b/test/benchmark.py
new file mode 100644
index 0000000..e7049c4
--- /dev/null
+++ b/test/benchmark.py
@@ -0,0 +1,77 @@
+#!/usr/bin/env python
+
+"""
+Really simple rudimentary benchmark to compare ConnectionPool versus standard
+urllib to demonstrate the usefulness of connection re-using.
+"""
+from __future__ import print_function
+
+import sys
+import time
+import urllib
+
+sys.path.append('../')
+import urllib3
+
+
+# URLs to download. Doesn't matter as long as they're from the same host, so we
+# can take advantage of connection re-using.
+TO_DOWNLOAD = [
+ 'http://code.google.com/apis/apps/',
+ 'http://code.google.com/apis/base/',
+ 'http://code.google.com/apis/blogger/',
+ 'http://code.google.com/apis/calendar/',
+ 'http://code.google.com/apis/codesearch/',
+ 'http://code.google.com/apis/contact/',
+ 'http://code.google.com/apis/books/',
+ 'http://code.google.com/apis/documents/',
+ 'http://code.google.com/apis/finance/',
+ 'http://code.google.com/apis/health/',
+ 'http://code.google.com/apis/notebook/',
+ 'http://code.google.com/apis/picasaweb/',
+ 'http://code.google.com/apis/spreadsheets/',
+ 'http://code.google.com/apis/webmastertools/',
+ 'http://code.google.com/apis/youtube/',
+]
+
+
+def urllib_get(url_list):
+ assert url_list
+ for url in url_list:
+ now = time.time()
+ r = urllib.urlopen(url)
+ elapsed = time.time() - now
+ print("Got in %0.3f: %s" % (elapsed, url))
+
+
+def pool_get(url_list):
+ assert url_list
+ pool = urllib3.connection_from_url(url_list[0])
+ for url in url_list:
+ now = time.time()
+ r = pool.get_url(url)
+ elapsed = time.time() - now
+ print("Got in %0.3fs: %s" % (elapsed, url))
+
+
+if __name__ == '__main__':
+ print("Running pool_get ...")
+ now = time.time()
+ pool_get(TO_DOWNLOAD)
+ pool_elapsed = time.time() - now
+
+ print("Running urllib_get ...")
+ now = time.time()
+ urllib_get(TO_DOWNLOAD)
+ urllib_elapsed = time.time() - now
+
+ print("Completed pool_get in %0.3fs" % pool_elapsed)
+ print("Completed urllib_get in %0.3fs" % urllib_elapsed)
+
+
+"""
+Example results:
+
+Completed pool_get in 1.163s
+Completed urllib_get in 2.318s
+"""
diff --git a/test/test_collections.py b/test/test_collections.py
new file mode 100644
index 0000000..f8275e0
--- /dev/null
+++ b/test/test_collections.py
@@ -0,0 +1,111 @@
+import unittest
+
+from urllib3._collections import RecentlyUsedContainer as Container
+from urllib3.packages import six
+xrange = six.moves.xrange
+
+class TestLRUContainer(unittest.TestCase):
+ def test_maxsize(self):
+ d = Container(5)
+
+ for i in xrange(5):
+ d[i] = str(i)
+
+ self.assertEqual(len(d), 5)
+
+ for i in xrange(5):
+ self.assertEqual(d[i], str(i))
+
+ d[i+1] = str(i+1)
+
+ self.assertEqual(len(d), 5)
+ self.assertFalse(0 in d)
+ self.assertTrue(i+1 in d)
+
+ def test_expire(self):
+ d = Container(5)
+
+ for i in xrange(5):
+ d[i] = str(i)
+
+ for i in xrange(5):
+ d.get(0)
+
+ # Add one more entry
+ d[5] = '5'
+
+ # Check state
+ self.assertEqual(list(d.keys()), [0, 2, 3, 4, 5])
+
+ def test_pruning(self):
+ d = Container(5)
+
+ for i in xrange(5):
+ d[i] = str(i)
+
+ # Contend 2 entries for the most-used slot to balloon the heap
+ for i in xrange(100):
+ d.get(i % 2)
+
+ self.assertTrue(len(d.access_log) <= d.CLEANUP_FACTOR * d._maxsize)
+
+ def test_same_key(self):
+ d = Container(5)
+
+ for i in xrange(10):
+ d['foo'] = i
+
+ self.assertEqual(list(d.keys()), ['foo'])
+
+ d._prune_invalidated_entries()
+
+ self.assertEqual(len(d.access_log), 1)
+
+ def test_access_ordering(self):
+ d = Container(5)
+
+ for i in xrange(10):
+ d[i] = True
+
+ self.assertEqual(d._get_ordered_access_keys(), [9,8,7,6,5])
+
+ new_order = [7,8,6,9,5]
+ for k in reversed(new_order):
+ d[k]
+
+ self.assertEqual(d._get_ordered_access_keys(), new_order)
+
+ def test_delete(self):
+ d = Container(5)
+
+ for i in xrange(5):
+ d[i] = True
+
+ del d[0]
+ self.assertFalse(0 in d)
+
+ d.pop(1)
+ self.assertFalse(1 in d)
+
+ d.pop(1, None)
+
+ def test_get(self):
+ d = Container(5)
+
+ for i in xrange(5):
+ d[i] = True
+
+ r = d.get(4)
+ self.assertEqual(r, True)
+
+ r = d.get(5)
+ self.assertEqual(r, None)
+
+ r = d.get(5, 42)
+ self.assertEqual(r, 42)
+
+ self.assertRaises(KeyError, lambda: d[5])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_connectionpool.py b/test/test_connectionpool.py
new file mode 100644
index 0000000..4281d42
--- /dev/null
+++ b/test/test_connectionpool.py
@@ -0,0 +1,136 @@
+import unittest
+
+from urllib3.connectionpool import (
+ connection_from_url,
+ get_host,
+ HTTPConnectionPool,
+ make_headers)
+
+from urllib3.exceptions import EmptyPoolError, LocationParseError
+
+
+class TestConnectionPool(unittest.TestCase):
+ def test_get_host(self):
+ url_host_map = {
+ 'http://google.com/mail': ('http', 'google.com', None),
+ 'http://google.com/mail/': ('http', 'google.com', None),
+ 'google.com/mail': ('http', 'google.com', None),
+ 'http://google.com/': ('http', 'google.com', None),
+ 'http://google.com': ('http', 'google.com', None),
+ 'http://www.google.com': ('http', 'www.google.com', None),
+ 'http://mail.google.com': ('http', 'mail.google.com', None),
+ 'http://google.com:8000/mail/': ('http', 'google.com', 8000),
+ 'http://google.com:8000': ('http', 'google.com', 8000),
+ 'https://google.com': ('https', 'google.com', None),
+ 'https://google.com:8000': ('https', 'google.com', 8000),
+ 'http://user:password@127.0.0.1:1234': ('http', '127.0.0.1', 1234),
+ }
+ for url, expected_host in url_host_map.items():
+ returned_host = get_host(url)
+ self.assertEquals(returned_host, expected_host)
+
+ def test_same_host(self):
+ same_host = [
+ ('http://google.com/', '/'),
+ ('http://google.com/', 'http://google.com/'),
+ ('http://google.com/', 'http://google.com'),
+ ('http://google.com/', 'http://google.com/abra/cadabra'),
+ ('http://google.com:42/', 'http://google.com:42/abracadabra'),
+ ]
+
+ for a, b in same_host:
+ c = connection_from_url(a)
+ self.assertTrue(c.is_same_host(b), "%s =? %s" % (a, b))
+
+ not_same_host = [
+ ('https://google.com/', 'http://google.com/'),
+ ('http://google.com/', 'https://google.com/'),
+ ('http://yahoo.com/', 'http://google.com/'),
+ ('http://google.com:42', 'https://google.com/abracadabra'),
+ ('http://google.com', 'https://google.net/'),
+ ]
+
+ for a, b in not_same_host:
+ c = connection_from_url(a)
+ self.assertFalse(c.is_same_host(b), "%s =? %s" % (a, b))
+
+ def test_invalid_host(self):
+ # TODO: Add more tests
+ invalid_host = [
+ 'http://google.com:foo',
+ ]
+
+ for location in invalid_host:
+ self.assertRaises(LocationParseError, get_host, location)
+
+
+ def test_make_headers(self):
+ self.assertEqual(
+ make_headers(accept_encoding=True),
+ {'accept-encoding': 'gzip,deflate'})
+
+ self.assertEqual(
+ make_headers(accept_encoding='foo,bar'),
+ {'accept-encoding': 'foo,bar'})
+
+ self.assertEqual(
+ make_headers(accept_encoding=['foo', 'bar']),
+ {'accept-encoding': 'foo,bar'})
+
+ self.assertEqual(
+ make_headers(accept_encoding=True, user_agent='banana'),
+ {'accept-encoding': 'gzip,deflate', 'user-agent': 'banana'})
+
+ self.assertEqual(
+ make_headers(user_agent='banana'),
+ {'user-agent': 'banana'})
+
+ self.assertEqual(
+ make_headers(keep_alive=True),
+ {'connection': 'keep-alive'})
+
+ self.assertEqual(
+ make_headers(basic_auth='foo:bar'),
+ {'authorization': 'Basic Zm9vOmJhcg=='})
+
+ def test_max_connections(self):
+ pool = HTTPConnectionPool(host='localhost', maxsize=1, block=True)
+
+ pool._get_conn(timeout=0.01)
+
+ try:
+ pool._get_conn(timeout=0.01)
+ self.fail("Managed to get a connection without EmptyPoolError")
+ except EmptyPoolError:
+ pass
+
+ try:
+ pool.get_url('/', pool_timeout=0.01)
+ self.fail("Managed to get a connection without EmptyPoolError")
+ except EmptyPoolError:
+ pass
+
+ self.assertEqual(pool.num_connections, 1)
+
+ def test_pool_edgecases(self):
+ pool = HTTPConnectionPool(host='localhost', maxsize=1, block=False)
+
+ conn1 = pool._get_conn()
+ conn2 = pool._get_conn() # New because block=False
+
+ pool._put_conn(conn1)
+ pool._put_conn(conn2) # Should be discarded
+
+ self.assertEqual(conn1, pool._get_conn())
+ self.assertNotEqual(conn2, pool._get_conn())
+
+ self.assertEqual(pool.num_connections, 3)
+
+ def test_exception_str(self):
+ self.assertEqual(
+ str(EmptyPoolError(HTTPConnectionPool(host='localhost'), "Test.")),
+ "HTTPConnectionPool(host='localhost', port=None): Test.")
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_poolmanager.py b/test/test_poolmanager.py
new file mode 100644
index 0000000..12722f7
--- /dev/null
+++ b/test/test_poolmanager.py
@@ -0,0 +1,47 @@
+import unittest
+
+from urllib3.poolmanager import PoolManager
+from urllib3 import connection_from_url
+
+
+class TestPoolManager(unittest.TestCase):
+ def test_same_url(self):
+ # Convince ourselves that normally we don't get the same object
+ conn1 = connection_from_url('http://localhost:8081/foo')
+ conn2 = connection_from_url('http://localhost:8081/bar')
+
+ self.assertNotEqual(conn1, conn2)
+
+ # Now try again using the PoolManager
+ p = PoolManager(1)
+
+ conn1 = p.connection_from_url('http://localhost:8081/foo')
+ conn2 = p.connection_from_url('http://localhost:8081/bar')
+
+ self.assertEqual(conn1, conn2)
+
+ def test_many_urls(self):
+ urls = [
+ "http://localhost:8081/foo",
+ "http://www.google.com/mail",
+ "http://localhost:8081/bar",
+ "https://www.google.com/",
+ "https://www.google.com/mail",
+ "http://yahoo.com",
+ "http://bing.com",
+ "http://yahoo.com/",
+ ]
+
+ connections = set()
+
+ p = PoolManager(10)
+
+ for url in urls:
+ conn = p.connection_from_url(url)
+ connections.add(conn)
+
+ self.assertEqual(len(connections), 5)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/test_response.py b/test/test_response.py
new file mode 100644
index 0000000..0ef379c
--- /dev/null
+++ b/test/test_response.py
@@ -0,0 +1,68 @@
+import unittest
+import zlib
+
+from io import BytesIO
+
+from urllib3.response import HTTPResponse
+
+class TestLegacyResponse(unittest.TestCase):
+ def test_getheaders(self):
+ headers = {'host': 'example.com'}
+ r = HTTPResponse(headers=headers)
+ self.assertEqual(r.getheaders(), headers)
+
+ def test_getheader(self):
+ headers = {'host': 'example.com'}
+ r = HTTPResponse(headers=headers)
+ self.assertEqual(r.getheader('host'), 'example.com')
+
+
+class TestResponse(unittest.TestCase):
+ def test_cache_content(self):
+ r = HTTPResponse('foo')
+ self.assertEqual(r.data, 'foo')
+ self.assertEqual(r._body, 'foo')
+
+ def test_default(self):
+ r = HTTPResponse()
+ self.assertEqual(r.data, None)
+
+ def test_none(self):
+ r = HTTPResponse(None)
+ self.assertEqual(r.data, None)
+
+ def test_preload(self):
+ fp = BytesIO(b'foo')
+
+ r = HTTPResponse(fp, preload_content=True)
+
+ self.assertEqual(fp.tell(), len(b'foo'))
+ self.assertEqual(r.data, b'foo')
+
+ def test_no_preload(self):
+ fp = BytesIO(b'foo')
+
+ r = HTTPResponse(fp, preload_content=False)
+
+ self.assertEqual(fp.tell(), 0)
+ self.assertEqual(r.data, b'foo')
+ self.assertEqual(fp.tell(), len(b'foo'))
+
+ def test_decode_bad_data(self):
+ fp = BytesIO(b'\x00' * 10)
+ self.assertRaises(zlib.error, HTTPResponse, fp, headers={
+ 'content-encoding': 'deflate'
+ })
+
+ def test_decode_deflate(self):
+ import zlib
+ data = zlib.compress(b'foo')
+
+ fp = BytesIO(data)
+ r = HTTPResponse(fp, headers={'content-encoding': 'deflate'})
+
+ self.assertEqual(r.data, b'foo')
+
+
+if __name__ == '__main__':
+ unittest.main()