'''SSL with SNI_-support for Python 2. Follow these instructions if you would like to verify SSL certificates in Python 2. Note, the default libraries do *not* do certificate checking; you need to do additional work to validate certificates yourself. This needs the following packages installed: * pyOpenSSL (tested with 0.13) * ndg-httpsclient (tested with 0.3.2) * pyasn1 (tested with 0.1.6) You can install them with the following command: pip install pyopenssl ndg-httpsclient pyasn1 To activate certificate checking, call :func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code before you begin making HTTP requests. This can be done in a ``sitecustomize`` module, or at any other time before your application begins using ``urllib3``, like this:: try: import urllib3.contrib.pyopenssl urllib3.contrib.pyopenssl.inject_into_urllib3() except ImportError: pass Now you can use :mod:`urllib3` as you normally would, and it will support SNI when the required modules are installed. Activating this module also has the positive side effect of disabling SSL/TLS compression in Python 2 (see `CRIME attack`_). If you want to configure the default list of supported cipher suites, you can set the ``urllib3.contrib.pyopenssl.DEFAULT_SSL_CIPHER_LIST`` variable. Module Variables ---------------- :var DEFAULT_SSL_CIPHER_LIST: The list of supported SSL/TLS cipher suites. .. _sni: https://en.wikipedia.org/wiki/Server_Name_Indication .. _crime attack: https://en.wikipedia.org/wiki/CRIME_(security_exploit) ''' try: from ndg.httpsclient.ssl_peer_verification import SUBJ_ALT_NAME_SUPPORT from ndg.httpsclient.subj_alt_name import SubjectAltName as BaseSubjectAltName except SyntaxError as e: raise ImportError(e) import OpenSSL.SSL from pyasn1.codec.der import decoder as der_decoder from pyasn1.type import univ, constraint from socket import _fileobject, timeout import ssl import select from .. import connection from .. import util __all__ = ['inject_into_urllib3', 'extract_from_urllib3'] # SNI only *really* works if we can read the subjectAltName of certificates. HAS_SNI = SUBJ_ALT_NAME_SUPPORT # Map from urllib3 to PyOpenSSL compatible parameter-values. _openssl_versions = { ssl.PROTOCOL_SSLv23: OpenSSL.SSL.SSLv23_METHOD, ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD, } try: _openssl_versions.update({ssl.PROTOCOL_SSLv3: OpenSSL.SSL.SSLv3_METHOD}) except AttributeError: pass _openssl_verify = { ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE, ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER, ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER + OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT, } DEFAULT_SSL_CIPHER_LIST = util.ssl_.DEFAULT_CIPHERS # OpenSSL will only write 16K at a time SSL_WRITE_BLOCKSIZE = 16384 try: _ = memoryview has_memoryview = True except NameError: has_memoryview = False orig_util_HAS_SNI = util.HAS_SNI orig_connection_ssl_wrap_socket = connection.ssl_wrap_socket def inject_into_urllib3(): 'Monkey-patch urllib3 with PyOpenSSL-backed SSL-support.' connection.ssl_wrap_socket = ssl_wrap_socket util.HAS_SNI = HAS_SNI def extract_from_urllib3(): 'Undo monkey-patching by :func:`inject_into_urllib3`.' connection.ssl_wrap_socket = orig_connection_ssl_wrap_socket util.HAS_SNI = orig_util_HAS_SNI ### Note: This is a slightly bug-fixed version of same from ndg-httpsclient. class SubjectAltName(BaseSubjectAltName): '''ASN.1 implementation for subjectAltNames support''' # There is no limit to how many SAN certificates a certificate may have, # however this needs to have some limit so we'll set an arbitrarily high # limit. sizeSpec = univ.SequenceOf.sizeSpec + \ constraint.ValueSizeConstraint(1, 1024) ### Note: This is a slightly bug-fixed version of same from ndg-httpsclient. def get_subj_alt_name(peer_cert): # Search through extensions dns_name = [] if not SUBJ_ALT_NAME_SUPPORT: return dns_name general_names = SubjectAltName() for i in range(peer_cert.get_extension_count()): ext = peer_cert.get_extension(i) ext_name = ext.get_short_name() if ext_name != 'subjectAltName': continue # PyOpenSSL returns extension data in ASN.1 encoded form ext_dat = ext.get_data() decoded_dat = der_decoder.decode(ext_dat, asn1Spec=general_names) for name in decoded_dat: if not isinstance(name, SubjectAltName): continue for entry in range(len(name)): component = name.getComponentByPosition(entry) if component.getName() != 'dNSName': continue dns_name.append(str(component.getComponent())) return dns_name class WrappedSocket(object): '''API-compatibility wrapper for Python OpenSSL's Connection-class. Note: _makefile_refs, _drop() and _reuse() are needed for the garbage collector of pypy. ''' def __init__(self, connection, socket, suppress_ragged_eofs=True): self.connection = connection self.socket = socket self.suppress_ragged_eofs = suppress_ragged_eofs self._makefile_refs = 0 def fileno(self): return self.socket.fileno() def makefile(self, mode, bufsize=-1): self._makefile_refs += 1 return _fileobject(self, mode, bufsize, close=True) def recv(self, *args, **kwargs): try: data = self.connection.recv(*args, **kwargs) except OpenSSL.SSL.SysCallError as e: if self.suppress_ragged_eofs and e.args == (-1, 'Unexpected EOF'): return b'' else: raise except OpenSSL.SSL.ZeroReturnError as e: if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: return b'' else: raise except OpenSSL.SSL.WantReadError: rd, wd, ed = select.select( [self.socket], [], [], self.socket.gettimeout()) if not rd: raise timeout('The read operation timed out') else: return self.recv(*args, **kwargs) else: return data def settimeout(self, timeout): return self.socket.settimeout(timeout) def _send_until_done(self, data): while True: try: return self.connection.send(data) except OpenSSL.SSL.WantWriteError: _, wlist, _ = select.select([], [self.socket], [], self.socket.gettimeout()) if not wlist: raise timeout() continue def sendall(self, data): if has_memoryview and not isinstance(data, memoryview): data = memoryview(data) total_sent = 0 while total_sent < len(data): sent = self._send_until_done(data[total_sent:total_sent+SSL_WRITE_BLOCKSIZE]) total_sent += sent def shutdown(self): # FIXME rethrow compatible exceptions should we ever use this self.connection.shutdown() def close(self): if self._makefile_refs < 1: return self.connection.close() else: self._makefile_refs -= 1 def getpeercert(self, binary_form=False): x509 = self.connection.get_peer_certificate() if not x509: return x509 if binary_form: return OpenSSL.crypto.dump_certificate( OpenSSL.crypto.FILETYPE_ASN1, x509) return { 'subject': ( (('commonName', x509.get_subject().CN),), ), 'subjectAltName': [ ('DNS', value) for value in get_subj_alt_name(x509) ] } def _reuse(self): self._makefile_refs += 1 def _drop(self): if self._makefile_refs < 1: self.close() else: self._makefile_refs -= 1 def _verify_callback(cnx, x509, err_no, err_depth, return_code): return err_no == 0 def ssl_wrap_socket(sock, keyfile=None, certfile=None, cert_reqs=None, ca_certs=None, server_hostname=None, ssl_version=None, ca_cert_dir=None): ctx = OpenSSL.SSL.Context(_openssl_versions[ssl_version]) if certfile: keyfile = keyfile or certfile # Match behaviour of the normal python ssl library ctx.use_certificate_file(certfile) if keyfile: ctx.use_privatekey_file(keyfile) if cert_reqs != ssl.CERT_NONE: ctx.set_verify(_openssl_verify[cert_reqs], _verify_callback) if ca_certs or ca_cert_dir: try: ctx.load_verify_locations(ca_certs, ca_cert_dir) except OpenSSL.SSL.Error as e: raise ssl.SSLError('bad ca_certs: %r' % ca_certs, e) else: ctx.set_default_verify_paths() # Disable TLS compression to migitate CRIME attack (issue #309) OP_NO_COMPRESSION = 0x20000 ctx.set_options(OP_NO_COMPRESSION) # Set list of supported ciphersuites. ctx.set_cipher_list(DEFAULT_SSL_CIPHER_LIST) cnx = OpenSSL.SSL.Connection(ctx, sock) cnx.set_tlsext_host_name(server_hostname) cnx.set_connect_state() while True: try: cnx.do_handshake() except OpenSSL.SSL.WantReadError: rd, _, _ = select.select([sock], [], [], sock.gettimeout()) if not rd: raise timeout('select timed out') continue except OpenSSL.SSL.Error as e: raise ssl.SSLError('bad handshake: %r' % e) break return WrappedSocket(cnx, sock)