aboutsummaryrefslogtreecommitdiff
path: root/test/with_dummyserver/test_https.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/with_dummyserver/test_https.py')
-rw-r--r--test/with_dummyserver/test_https.py374
1 files changed, 374 insertions, 0 deletions
diff --git a/test/with_dummyserver/test_https.py b/test/with_dummyserver/test_https.py
new file mode 100644
index 0000000..cf3eee7
--- /dev/null
+++ b/test/with_dummyserver/test_https.py
@@ -0,0 +1,374 @@
+import datetime
+import logging
+import ssl
+import sys
+import unittest
+import warnings
+
+import mock
+from nose.plugins.skip import SkipTest
+
+from dummyserver.testcase import HTTPSDummyServerTestCase
+from dummyserver.server import DEFAULT_CA, DEFAULT_CA_BAD, DEFAULT_CERTS
+
+from test import (
+ onlyPy26OrOlder,
+ requires_network,
+ TARPIT_HOST,
+ clear_warnings,
+)
+from urllib3 import HTTPSConnectionPool
+from urllib3.connection import (
+ VerifiedHTTPSConnection,
+ UnverifiedHTTPSConnection,
+ RECENT_DATE,
+)
+from urllib3.exceptions import (
+ SSLError,
+ ReadTimeoutError,
+ ConnectTimeoutError,
+ InsecureRequestWarning,
+ SystemTimeWarning,
+)
+from urllib3.util.timeout import Timeout
+
+
+log = logging.getLogger('urllib3.connectionpool')
+log.setLevel(logging.NOTSET)
+log.addHandler(logging.StreamHandler(sys.stdout))
+
+
+
+class TestHTTPS(HTTPSDummyServerTestCase):
+ def setUp(self):
+ self._pool = HTTPSConnectionPool(self.host, self.port)
+
+ def test_simple(self):
+ r = self._pool.request('GET', '/')
+ self.assertEqual(r.status, 200, r.data)
+
+ def test_set_ssl_version_to_tlsv1(self):
+ self._pool.ssl_version = ssl.PROTOCOL_TLSv1
+ r = self._pool.request('GET', '/')
+ self.assertEqual(r.status, 200, r.data)
+
+ def test_verified(self):
+ https_pool = HTTPSConnectionPool(self.host, self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ conn = https_pool._new_conn()
+ self.assertEqual(conn.__class__, VerifiedHTTPSConnection)
+
+ with mock.patch('warnings.warn') as warn:
+ r = https_pool.request('GET', '/')
+ self.assertEqual(r.status, 200)
+ self.assertFalse(warn.called, warn.call_args_list)
+
+ def test_invalid_common_name(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+ try:
+ https_pool.request('GET', '/')
+ self.fail("Didn't raise SSL invalid common name")
+ except SSLError as e:
+ self.assertTrue("doesn't match" in str(e))
+
+ def test_verified_with_bad_ca_certs(self):
+ https_pool = HTTPSConnectionPool(self.host, self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA_BAD)
+
+ try:
+ https_pool.request('GET', '/')
+ self.fail("Didn't raise SSL error with bad CA certs")
+ except SSLError as e:
+ self.assertTrue('certificate verify failed' in str(e),
+ "Expected 'certificate verify failed',"
+ "instead got: %r" % e)
+
+ def test_verified_without_ca_certs(self):
+ # default is cert_reqs=None which is ssl.CERT_NONE
+ https_pool = HTTPSConnectionPool(self.host, self.port,
+ cert_reqs='CERT_REQUIRED')
+
+ try:
+ https_pool.request('GET', '/')
+ self.fail("Didn't raise SSL error with no CA certs when"
+ "CERT_REQUIRED is set")
+ except SSLError as e:
+ # there is a different error message depending on whether or
+ # not pyopenssl is injected
+ self.assertTrue('No root certificates specified' in str(e) or
+ 'certificate verify failed' in str(e),
+ "Expected 'No root certificates specified' or "
+ "'certificate verify failed', "
+ "instead got: %r" % e)
+
+ def test_no_ssl(self):
+ pool = HTTPSConnectionPool(self.host, self.port)
+ pool.ConnectionCls = None
+ self.assertRaises(SSLError, pool._new_conn)
+ self.assertRaises(SSLError, pool.request, 'GET', '/')
+
+ def test_unverified_ssl(self):
+ """ Test that bare HTTPSConnection can connect, make requests """
+ pool = HTTPSConnectionPool(self.host, self.port)
+ pool.ConnectionCls = UnverifiedHTTPSConnection
+
+ with mock.patch('warnings.warn') as warn:
+ r = pool.request('GET', '/')
+ self.assertEqual(r.status, 200)
+ self.assertTrue(warn.called)
+
+ call, = warn.call_args_list
+ category = call[0][1]
+ self.assertEqual(category, InsecureRequestWarning)
+
+ def test_ssl_unverified_with_ca_certs(self):
+ pool = HTTPSConnectionPool(self.host, self.port,
+ cert_reqs='CERT_NONE',
+ ca_certs=DEFAULT_CA_BAD)
+
+ with mock.patch('warnings.warn') as warn:
+ r = pool.request('GET', '/')
+ self.assertEqual(r.status, 200)
+ self.assertTrue(warn.called)
+
+ call, = warn.call_args_list
+ category = call[0][1]
+ self.assertEqual(category, InsecureRequestWarning)
+
+ @requires_network
+ def test_ssl_verified_with_platform_ca_certs(self):
+ """
+ We should rely on the platform CA file to validate authenticity of SSL
+ certificates. Since this file is used by many components of the OS,
+ such as curl, apt-get, etc., we decided to not touch it, in order to
+ not compromise the security of the OS running the test suite (typically
+ urllib3 developer's OS).
+
+ This test assumes that httpbin.org uses a certificate signed by a well
+ known Certificate Authority.
+ """
+ try:
+ import urllib3.contrib.pyopenssl
+ except ImportError:
+ raise SkipTest('Test requires PyOpenSSL')
+ if (urllib3.connection.ssl_wrap_socket is
+ urllib3.contrib.pyopenssl.orig_connection_ssl_wrap_socket):
+ # Not patched
+ raise SkipTest('Test should only be run after PyOpenSSL '
+ 'monkey patching')
+
+ https_pool = HTTPSConnectionPool('httpbin.org', 443,
+ cert_reqs=ssl.CERT_REQUIRED)
+
+ https_pool.request('HEAD', '/')
+
+ def test_assert_hostname_false(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ https_pool.assert_hostname = False
+ https_pool.request('GET', '/')
+
+ def test_assert_specific_hostname(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ https_pool.assert_hostname = 'localhost'
+ https_pool.request('GET', '/')
+
+ def test_assert_fingerprint_md5(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ https_pool.assert_fingerprint = 'CA:84:E1:AD0E5a:ef:2f:C3:09' \
+ ':E7:30:F8:CD:C8:5B'
+ https_pool.request('GET', '/')
+
+ def test_assert_fingerprint_sha1(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ https_pool.assert_fingerprint = 'CC:45:6A:90:82:F7FF:C0:8218:8e:' \
+ '7A:F2:8A:D7:1E:07:33:67:DE'
+ https_pool.request('GET', '/')
+
+ def test_assert_invalid_fingerprint(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_REQUIRED',
+ ca_certs=DEFAULT_CA)
+
+ https_pool.assert_fingerprint = 'AA:AA:AA:AA:AA:AAAA:AA:AAAA:AA:' \
+ 'AA:AA:AA:AA:AA:AA:AA:AA:AA'
+
+ self.assertRaises(SSLError, https_pool.request, 'GET', '/')
+ https_pool._get_conn()
+
+ # Uneven length
+ https_pool.assert_fingerprint = 'AA:A'
+ self.assertRaises(SSLError, https_pool.request, 'GET', '/')
+ https_pool._get_conn()
+
+ # Invalid length
+ https_pool.assert_fingerprint = 'AA'
+ self.assertRaises(SSLError, https_pool.request, 'GET', '/')
+
+ def test_verify_none_and_bad_fingerprint(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_NONE',
+ ca_certs=DEFAULT_CA_BAD)
+
+ https_pool.assert_fingerprint = 'AA:AA:AA:AA:AA:AAAA:AA:AAAA:AA:' \
+ 'AA:AA:AA:AA:AA:AA:AA:AA:AA'
+ self.assertRaises(SSLError, https_pool.request, 'GET', '/')
+
+ def test_verify_none_and_good_fingerprint(self):
+ https_pool = HTTPSConnectionPool('127.0.0.1', self.port,
+ cert_reqs='CERT_NONE',
+ ca_certs=DEFAULT_CA_BAD)
+
+ https_pool.assert_fingerprint = 'CC:45:6A:90:82:F7FF:C0:8218:8e:' \
+ '7A:F2:8A:D7:1E:07:33:67:DE'
+ https_pool.request('GET', '/')
+
+ @requires_network
+ def test_https_timeout(self):
+ timeout = Timeout(connect=0.001)
+ https_pool = HTTPSConnectionPool(TARPIT_HOST, self.port,
+ timeout=timeout, retries=False,
+ cert_reqs='CERT_REQUIRED')
+
+ timeout = Timeout(total=None, connect=0.001)
+ https_pool = HTTPSConnectionPool(TARPIT_HOST, self.port,
+ timeout=timeout, retries=False,
+ cert_reqs='CERT_REQUIRED')
+ self.assertRaises(ConnectTimeoutError, https_pool.request, 'GET', '/')
+
+ timeout = Timeout(read=0.001)
+ https_pool = HTTPSConnectionPool(self.host, self.port,
+ timeout=timeout, retries=False,
+ cert_reqs='CERT_REQUIRED')
+ https_pool.ca_certs = DEFAULT_CA
+ https_pool.assert_fingerprint = 'CC:45:6A:90:82:F7FF:C0:8218:8e:' \
+ '7A:F2:8A:D7:1E:07:33:67:DE'
+ url = '/sleep?seconds=0.005'
+ self.assertRaises(ReadTimeoutError, https_pool.request, 'GET', url)
+
+ timeout = Timeout(total=None)
+ https_pool = HTTPSConnectionPool(self.host, self.port, timeout=timeout,
+ cert_reqs='CERT_NONE')
+ https_pool.request('GET', '/')
+
+ def test_tunnel(self):
+ """ test the _tunnel behavior """
+ timeout = Timeout(total=None)
+ https_pool = HTTPSConnectionPool(self.host, self.port, timeout=timeout,
+ cert_reqs='CERT_NONE')
+ conn = https_pool._new_conn()
+ try:
+ conn.set_tunnel(self.host, self.port)
+ except AttributeError: # python 2.6
+ conn._set_tunnel(self.host, self.port)
+ conn._tunnel = mock.Mock()
+ https_pool._make_request(conn, 'GET', '/')
+ conn._tunnel.assert_called_once_with()
+
+ @onlyPy26OrOlder
+ def test_tunnel_old_python(self):
+ """HTTPSConnection can still make connections if _tunnel_host isn't set
+
+ The _tunnel_host attribute was added in 2.6.3 - because our test runners
+ generally use the latest Python 2.6, we simulate the old version by
+ deleting the attribute from the HTTPSConnection.
+ """
+ conn = self._pool._new_conn()
+ del conn._tunnel_host
+ self._pool._make_request(conn, 'GET', '/')
+
+ @requires_network
+ def test_enhanced_timeout(self):
+ def new_pool(timeout, cert_reqs='CERT_REQUIRED'):
+ https_pool = HTTPSConnectionPool(TARPIT_HOST, self.port,
+ timeout=timeout,
+ retries=False,
+ cert_reqs=cert_reqs)
+ return https_pool
+
+ https_pool = new_pool(Timeout(connect=0.001))
+ conn = https_pool._new_conn()
+ self.assertRaises(ConnectTimeoutError, https_pool.request, 'GET', '/')
+ self.assertRaises(ConnectTimeoutError, https_pool._make_request, conn,
+ 'GET', '/')
+
+ https_pool = new_pool(Timeout(connect=5))
+ self.assertRaises(ConnectTimeoutError, https_pool.request, 'GET', '/',
+ timeout=Timeout(connect=0.001))
+
+ t = Timeout(total=None)
+ https_pool = new_pool(t)
+ conn = https_pool._new_conn()
+ self.assertRaises(ConnectTimeoutError, https_pool.request, 'GET', '/',
+ timeout=Timeout(total=None, connect=0.001))
+
+ def test_enhanced_ssl_connection(self):
+ fingerprint = 'CC:45:6A:90:82:F7FF:C0:8218:8e:7A:F2:8A:D7:1E:07:33:67:DE'
+
+ conn = VerifiedHTTPSConnection(self.host, self.port)
+ https_pool = HTTPSConnectionPool(self.host, self.port,
+ cert_reqs='CERT_REQUIRED', ca_certs=DEFAULT_CA,
+ assert_fingerprint=fingerprint)
+
+ https_pool._make_request(conn, 'GET', '/')
+
+ def test_ssl_correct_system_time(self):
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter('always')
+ self._pool.request('GET', '/')
+
+ self.assertEqual([], w)
+
+ def test_ssl_wrong_system_time(self):
+ with mock.patch('urllib3.connection.datetime') as mock_date:
+ mock_date.date.today.return_value = datetime.date(1970, 1, 1)
+
+ with warnings.catch_warnings(record=True) as w:
+ warnings.simplefilter('always')
+ self._pool.request('GET', '/')
+
+ self.assertEqual(len(w), 1)
+ warning = w[0]
+
+ self.assertEqual(SystemTimeWarning, warning.category)
+ self.assertTrue(str(RECENT_DATE) in warning.message.args[0])
+
+
+class TestHTTPS_TLSv1(HTTPSDummyServerTestCase):
+ certs = DEFAULT_CERTS.copy()
+ certs['ssl_version'] = ssl.PROTOCOL_TLSv1
+
+ def setUp(self):
+ self._pool = HTTPSConnectionPool(self.host, self.port)
+
+ def test_set_ssl_version_to_sslv3(self):
+ self._pool.ssl_version = ssl.PROTOCOL_SSLv3
+ self.assertRaises(SSLError, self._pool.request, 'GET', '/')
+
+ def test_ssl_version_as_string(self):
+ self._pool.ssl_version = 'PROTOCOL_SSLv3'
+ self.assertRaises(SSLError, self._pool.request, 'GET', '/')
+
+ def test_ssl_version_as_short_string(self):
+ self._pool.ssl_version = 'SSLv3'
+ self.assertRaises(SSLError, self._pool.request, 'GET', '/')
+
+
+if __name__ == '__main__':
+ unittest.main()