diff options
Diffstat (limited to 'requests/utils.py')
-rw-r--r-- | requests/utils.py | 330 |
1 files changed, 225 insertions, 105 deletions
diff --git a/requests/utils.py b/requests/utils.py index 8365cc3..f5f6b95 100644 --- a/requests/utils.py +++ b/requests/utils.py @@ -12,48 +12,25 @@ that are also useful for external consumption. import cgi import codecs import os -import random +import platform import re +import sys import zlib from netrc import netrc, NetrcParseError +from . import __version__ +from . import certs from .compat import parse_http_list as _parse_list_header -from .compat import quote, is_py2, urlparse -from .compat import basestring, bytes, str +from .compat import quote, urlparse, bytes, str, OrderedDict, urlunparse from .cookies import RequestsCookieJar, cookiejar_from_dict _hush_pyflakes = (RequestsCookieJar,) -CERTIFI_BUNDLE_PATH = None -try: - # see if requests's own CA certificate bundle is installed - import certifi - CERTIFI_BUNDLE_PATH = certifi.where() -except ImportError: - pass - NETRC_FILES = ('.netrc', '_netrc') -# common paths for the OS's CA certificate bundle -POSSIBLE_CA_BUNDLE_PATHS = [ - # Red Hat, CentOS, Fedora and friends (provided by the ca-certificates package): - '/etc/pki/tls/certs/ca-bundle.crt', - # Ubuntu, Debian, and friends (provided by the ca-certificates package): - '/etc/ssl/certs/ca-certificates.crt', - # FreeBSD (provided by the ca_root_nss package): - '/usr/local/share/certs/ca-root-nss.crt', -] - -def get_os_ca_bundle_path(): - """Try to pick an available CA certificate bundle provided by the OS.""" - for path in POSSIBLE_CA_BUNDLE_PATHS: - if os.path.exists(path): - return path - return None - # if certifi is installed, use its CA bundle; # otherwise, try and use the OS bundle -DEFAULT_CA_BUNDLE_PATH = CERTIFI_BUNDLE_PATH or get_os_ca_bundle_path() +DEFAULT_CA_BUNDLE_PATH = certs.where() def dict_to_sequence(d): """Returns an internal sequence dictionary update.""" @@ -63,6 +40,13 @@ def dict_to_sequence(d): return d +def super_len(o): + if hasattr(o, '__len__'): + return len(o) + if hasattr(o, 'len'): + return o.len + if hasattr(o, 'fileno'): + return os.fstat(o.fileno()).st_size def get_netrc_auth(url): """Returns the Requests tuple auth for a given url from netrc.""" @@ -96,7 +80,7 @@ def get_netrc_auth(url): pass # AppEngine hackiness. - except AttributeError: + except (ImportError, AttributeError): pass @@ -107,6 +91,54 @@ def guess_filename(obj): return name +def from_key_val_list(value): + """Take an object and test to see if it can be represented as a + dictionary. Unless it can not be represented as such, return an + OrderedDict, e.g., + + :: + + >>> from_key_val_list([('key', 'val')]) + OrderedDict([('key', 'val')]) + >>> from_key_val_list('string') + ValueError: need more than 1 value to unpack + >>> from_key_val_list({'key': 'val'}) + OrderedDict([('key', 'val')]) + """ + if value is None: + return None + + if isinstance(value, (str, bytes, bool, int)): + raise ValueError('cannot encode objects that are not 2-tuples') + + return OrderedDict(value) + + +def to_key_val_list(value): + """Take an object and test to see if it can be represented as a + dictionary. If it can be, return a list of tuples, e.g., + + :: + + >>> to_key_val_list([('key', 'val')]) + [('key', 'val')] + >>> to_key_val_list({'key': 'val'}) + [('key', 'val')] + >>> to_key_val_list('string') + ValueError: cannot encode objects that are not 2-tuples. + """ + if value is None: + return None + + if isinstance(value, (str, bytes, bool, int)): + raise ValueError('cannot encode objects that are not 2-tuples') + + if isinstance(value, dict): + value = value.items() + + return list(value) + + # From mitsuhiko/werkzeug (used with permission). def parse_list_header(value): """Parse lists as described by RFC 2068 Section 2. @@ -197,66 +229,6 @@ def unquote_header_value(value, is_filename=False): return value -def header_expand(headers): - """Returns an HTTP Header value string from a dictionary. - - Example expansion:: - - {'text/x-dvi': {'q': '.8', 'mxb': '100000', 'mxt': '5.0'}, 'text/x-c': {}} - # Accept: text/x-dvi; q=.8; mxb=100000; mxt=5.0, text/x-c - - (('text/x-dvi', {'q': '.8', 'mxb': '100000', 'mxt': '5.0'}), ('text/x-c', {})) - # Accept: text/x-dvi; q=.8; mxb=100000; mxt=5.0, text/x-c - """ - - collector = [] - - if isinstance(headers, dict): - headers = list(headers.items()) - elif isinstance(headers, basestring): - return headers - elif isinstance(headers, str): - # As discussed in https://github.com/kennethreitz/requests/issues/400 - # latin-1 is the most conservative encoding used on the web. Anyone - # who needs more can encode to a byte-string before calling - return headers.encode("latin-1") - elif headers is None: - return headers - - for i, (value, params) in enumerate(headers): - - _params = [] - - for (p_k, p_v) in list(params.items()): - - _params.append('%s=%s' % (p_k, p_v)) - - collector.append(value) - collector.append('; ') - - if len(params): - - collector.append('; '.join(_params)) - - if not len(headers) == i + 1: - collector.append(', ') - - # Remove trailing separators. - if collector[-1] in (', ', '; '): - del collector[-1] - - return ''.join(collector) - - -def randombytes(n): - """Return n random bytes.""" - if is_py2: - L = [chr(random.randrange(0, 256)) for i in range(n)] - else: - L = [chr(random.randrange(0, 256)).encode('utf-8') for i in range(n)] - return b"".join(L) - - def dict_from_cookiejar(cj): """Returns a key/value dictionary from a CookieJar. @@ -265,11 +237,8 @@ def dict_from_cookiejar(cj): cookie_dict = {} - for _, cookies in list(cj._cookies.items()): - for _, cookies in list(cookies.items()): - for cookie in list(cookies.values()): - # print cookie - cookie_dict[cookie.name] = cookie.value + for cookie in cj: + cookie_dict[cookie.name] = cookie.value return cookie_dict @@ -336,6 +305,14 @@ def stream_decode_response_unicode(iterator, r): yield rv +def iter_slices(string, slice_length): + """Iterate over slices of a string.""" + pos = 0 + while pos < len(string): + yield string[pos:pos + slice_length] + pos += slice_length + + def get_unicode_from_response(r): """Returns the requested content back in unicode. @@ -370,8 +347,7 @@ def get_unicode_from_response(r): def stream_decompress(iterator, mode='gzip'): - """ - Stream decodes an iterator over compressed data + """Stream decodes an iterator over compressed data :param iterator: An iterator over compressed data :param mode: 'gzip' or 'deflate' @@ -403,9 +379,10 @@ def stream_decompress(iterator, mode='gzip'): def stream_untransfer(gen, resp): - if 'gzip' in resp.headers.get('content-encoding', ''): + ce = resp.headers.get('content-encoding', '').lower() + if 'gzip' in ce: gen = stream_decompress(gen, mode='gzip') - elif 'deflate' in resp.headers.get('content-encoding', ''): + elif 'deflate' in ce: gen = stream_decompress(gen, mode='deflate') return gen @@ -419,13 +396,12 @@ UNRESERVED_SET = frozenset( def unquote_unreserved(uri): """Un-escape any percent-escape sequences in a URI that are unreserved - characters. - This leaves all reserved, illegal and non-ASCII bytes encoded. + characters. This leaves all reserved, illegal and non-ASCII bytes encoded. """ parts = uri.split('%') for i in range(1, len(parts)): h = parts[i][0:2] - if len(h) == 2: + if len(h) == 2 and h.isalnum(): c = chr(int(h, 16)) if c in UNRESERVED_SET: parts[i] = c + parts[i][2:] @@ -447,7 +423,8 @@ def requote_uri(uri): # or '%') return quote(unquote_unreserved(uri), safe="!#$%&'()*+,/:;=?@[]~") -def get_environ_proxies(): + +def get_environ_proxies(url): """Return a dict of environment proxies.""" proxy_keys = [ @@ -455,10 +432,153 @@ def get_environ_proxies(): 'http', 'https', 'ftp', - 'socks', - 'no' + 'socks' ] get_proxy = lambda k: os.environ.get(k) or os.environ.get(k.upper()) + + # First check whether no_proxy is defined. If it is, check that the URL + # we're getting isn't in the no_proxy list. + no_proxy = get_proxy('no_proxy') + + if no_proxy: + # We need to check whether we match here. We need to see if we match + # the end of the netloc, both with and without the port. + no_proxy = no_proxy.split(',') + netloc = urlparse(url).netloc + + for host in no_proxy: + if netloc.endswith(host) or netloc.split(':')[0].endswith(host): + # The URL does match something in no_proxy, so we don't want + # to apply the proxies on this URL. + return {} + + # If we get here, we either didn't have no_proxy set or we're not going + # anywhere that no_proxy applies to. proxies = [(key, get_proxy(key + '_proxy')) for key in proxy_keys] return dict([(key, val) for (key, val) in proxies if val]) + + +def default_user_agent(): + """Return a string representing the default user agent.""" + _implementation = platform.python_implementation() + + if _implementation == 'CPython': + _implementation_version = platform.python_version() + elif _implementation == 'PyPy': + _implementation_version = '%s.%s.%s' % ( + sys.pypy_version_info.major, + sys.pypy_version_info.minor, + sys.pypy_version_info.micro + ) + if sys.pypy_version_info.releaselevel != 'final': + _implementation_version = ''.join([_implementation_version, sys.pypy_version_info.releaselevel]) + elif _implementation == 'Jython': + _implementation_version = platform.python_version() # Complete Guess + elif _implementation == 'IronPython': + _implementation_version = platform.python_version() # Complete Guess + else: + _implementation_version = 'Unknown' + + try: + p_system = platform.system() + p_release = platform.release() + except IOError: + p_system = 'Unknown' + p_release = 'Unknown' + + return " ".join([ + 'python-requests/%s' % __version__, + '%s/%s' % (_implementation, _implementation_version), + '%s/%s' % (p_system, p_release), + ]) + +def default_headers(): + return { + 'User-Agent': default_user_agent(), + 'Accept-Encoding': ', '.join(('gzip', 'deflate', 'compress')), + 'Accept': '*/*' + } + + +def parse_header_links(value): + """Return a dict of parsed link headers proxies. + + i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg" + + """ + + links = [] + + replace_chars = " '\"" + + for val in value.split(","): + try: + url, params = val.split(";", 1) + except ValueError: + url, params = val, '' + + link = {} + + link["url"] = url.strip("<> '\"") + + for param in params.split(";"): + try: + key,value = param.split("=") + except ValueError: + break + + link[key.strip(replace_chars)] = value.strip(replace_chars) + + links.append(link) + + return links + + +# Null bytes; no need to recreate these on each call to guess_json_utf +_null = '\x00'.encode('ascii') # encoding to ASCII for Python 3 +_null2 = _null * 2 +_null3 = _null * 3 + + +def guess_json_utf(data): + # JSON always starts with two ASCII characters, so detection is as + # easy as counting the nulls and from their location and count + # determine the encoding. Also detect a BOM, if present. + sample = data[:4] + if sample in (codecs.BOM_UTF32_LE, codecs.BOM32_BE): + return 'utf-32' # BOM included + if sample[:3] == codecs.BOM_UTF8: + return 'utf-8-sig' # BOM included, MS style (discouraged) + if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): + return 'utf-16' # BOM included + nullcount = sample.count(_null) + if nullcount == 0: + return 'utf-8' + if nullcount == 2: + if sample[::2] == _null2: # 1st and 3rd are null + return 'utf-16-be' + if sample[1::2] == _null2: # 2nd and 4th are null + return 'utf-16-le' + # Did not detect 2 valid UTF-16 ascii-range characters + if nullcount == 3: + if sample[:3] == _null3: + return 'utf-32-be' + if sample[1:] == _null3: + return 'utf-32-le' + # Did not detect a valid UTF-32 ascii-range character + return None + + +def prepend_scheme_if_needed(url, new_scheme): + '''Given a URL that may or may not have a scheme, prepend the given scheme. + Does not replace a present scheme with the one provided as an argument.''' + scheme, netloc, path, params, query, fragment = urlparse(url, new_scheme) + + # urlparse is a finicky beast, and sometimes decides that there isn't a + # netloc present. Assume that it's being over-cautious, and switch netloc + # and path if urlparse decided there was no netloc. + if not netloc: + netloc, path = path, netloc + + return urlunparse((scheme, netloc, path, params, query, fragment)) |