aboutsummaryrefslogtreecommitdiff
path: root/requests/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'requests/utils.py')
-rw-r--r--requests/utils.py330
1 files changed, 225 insertions, 105 deletions
diff --git a/requests/utils.py b/requests/utils.py
index 8365cc3..f5f6b95 100644
--- a/requests/utils.py
+++ b/requests/utils.py
@@ -12,48 +12,25 @@ that are also useful for external consumption.
import cgi
import codecs
import os
-import random
+import platform
import re
+import sys
import zlib
from netrc import netrc, NetrcParseError
+from . import __version__
+from . import certs
from .compat import parse_http_list as _parse_list_header
-from .compat import quote, is_py2, urlparse
-from .compat import basestring, bytes, str
+from .compat import quote, urlparse, bytes, str, OrderedDict, urlunparse
from .cookies import RequestsCookieJar, cookiejar_from_dict
_hush_pyflakes = (RequestsCookieJar,)
-CERTIFI_BUNDLE_PATH = None
-try:
- # see if requests's own CA certificate bundle is installed
- import certifi
- CERTIFI_BUNDLE_PATH = certifi.where()
-except ImportError:
- pass
-
NETRC_FILES = ('.netrc', '_netrc')
-# common paths for the OS's CA certificate bundle
-POSSIBLE_CA_BUNDLE_PATHS = [
- # Red Hat, CentOS, Fedora and friends (provided by the ca-certificates package):
- '/etc/pki/tls/certs/ca-bundle.crt',
- # Ubuntu, Debian, and friends (provided by the ca-certificates package):
- '/etc/ssl/certs/ca-certificates.crt',
- # FreeBSD (provided by the ca_root_nss package):
- '/usr/local/share/certs/ca-root-nss.crt',
-]
-
-def get_os_ca_bundle_path():
- """Try to pick an available CA certificate bundle provided by the OS."""
- for path in POSSIBLE_CA_BUNDLE_PATHS:
- if os.path.exists(path):
- return path
- return None
-
# if certifi is installed, use its CA bundle;
# otherwise, try and use the OS bundle
-DEFAULT_CA_BUNDLE_PATH = CERTIFI_BUNDLE_PATH or get_os_ca_bundle_path()
+DEFAULT_CA_BUNDLE_PATH = certs.where()
def dict_to_sequence(d):
"""Returns an internal sequence dictionary update."""
@@ -63,6 +40,13 @@ def dict_to_sequence(d):
return d
+def super_len(o):
+ if hasattr(o, '__len__'):
+ return len(o)
+ if hasattr(o, 'len'):
+ return o.len
+ if hasattr(o, 'fileno'):
+ return os.fstat(o.fileno()).st_size
def get_netrc_auth(url):
"""Returns the Requests tuple auth for a given url from netrc."""
@@ -96,7 +80,7 @@ def get_netrc_auth(url):
pass
# AppEngine hackiness.
- except AttributeError:
+ except (ImportError, AttributeError):
pass
@@ -107,6 +91,54 @@ def guess_filename(obj):
return name
+def from_key_val_list(value):
+ """Take an object and test to see if it can be represented as a
+ dictionary. Unless it can not be represented as such, return an
+ OrderedDict, e.g.,
+
+ ::
+
+ >>> from_key_val_list([('key', 'val')])
+ OrderedDict([('key', 'val')])
+ >>> from_key_val_list('string')
+ ValueError: need more than 1 value to unpack
+ >>> from_key_val_list({'key': 'val'})
+ OrderedDict([('key', 'val')])
+ """
+ if value is None:
+ return None
+
+ if isinstance(value, (str, bytes, bool, int)):
+ raise ValueError('cannot encode objects that are not 2-tuples')
+
+ return OrderedDict(value)
+
+
+def to_key_val_list(value):
+ """Take an object and test to see if it can be represented as a
+ dictionary. If it can be, return a list of tuples, e.g.,
+
+ ::
+
+ >>> to_key_val_list([('key', 'val')])
+ [('key', 'val')]
+ >>> to_key_val_list({'key': 'val'})
+ [('key', 'val')]
+ >>> to_key_val_list('string')
+ ValueError: cannot encode objects that are not 2-tuples.
+ """
+ if value is None:
+ return None
+
+ if isinstance(value, (str, bytes, bool, int)):
+ raise ValueError('cannot encode objects that are not 2-tuples')
+
+ if isinstance(value, dict):
+ value = value.items()
+
+ return list(value)
+
+
# From mitsuhiko/werkzeug (used with permission).
def parse_list_header(value):
"""Parse lists as described by RFC 2068 Section 2.
@@ -197,66 +229,6 @@ def unquote_header_value(value, is_filename=False):
return value
-def header_expand(headers):
- """Returns an HTTP Header value string from a dictionary.
-
- Example expansion::
-
- {'text/x-dvi': {'q': '.8', 'mxb': '100000', 'mxt': '5.0'}, 'text/x-c': {}}
- # Accept: text/x-dvi; q=.8; mxb=100000; mxt=5.0, text/x-c
-
- (('text/x-dvi', {'q': '.8', 'mxb': '100000', 'mxt': '5.0'}), ('text/x-c', {}))
- # Accept: text/x-dvi; q=.8; mxb=100000; mxt=5.0, text/x-c
- """
-
- collector = []
-
- if isinstance(headers, dict):
- headers = list(headers.items())
- elif isinstance(headers, basestring):
- return headers
- elif isinstance(headers, str):
- # As discussed in https://github.com/kennethreitz/requests/issues/400
- # latin-1 is the most conservative encoding used on the web. Anyone
- # who needs more can encode to a byte-string before calling
- return headers.encode("latin-1")
- elif headers is None:
- return headers
-
- for i, (value, params) in enumerate(headers):
-
- _params = []
-
- for (p_k, p_v) in list(params.items()):
-
- _params.append('%s=%s' % (p_k, p_v))
-
- collector.append(value)
- collector.append('; ')
-
- if len(params):
-
- collector.append('; '.join(_params))
-
- if not len(headers) == i + 1:
- collector.append(', ')
-
- # Remove trailing separators.
- if collector[-1] in (', ', '; '):
- del collector[-1]
-
- return ''.join(collector)
-
-
-def randombytes(n):
- """Return n random bytes."""
- if is_py2:
- L = [chr(random.randrange(0, 256)) for i in range(n)]
- else:
- L = [chr(random.randrange(0, 256)).encode('utf-8') for i in range(n)]
- return b"".join(L)
-
-
def dict_from_cookiejar(cj):
"""Returns a key/value dictionary from a CookieJar.
@@ -265,11 +237,8 @@ def dict_from_cookiejar(cj):
cookie_dict = {}
- for _, cookies in list(cj._cookies.items()):
- for _, cookies in list(cookies.items()):
- for cookie in list(cookies.values()):
- # print cookie
- cookie_dict[cookie.name] = cookie.value
+ for cookie in cj:
+ cookie_dict[cookie.name] = cookie.value
return cookie_dict
@@ -336,6 +305,14 @@ def stream_decode_response_unicode(iterator, r):
yield rv
+def iter_slices(string, slice_length):
+ """Iterate over slices of a string."""
+ pos = 0
+ while pos < len(string):
+ yield string[pos:pos + slice_length]
+ pos += slice_length
+
+
def get_unicode_from_response(r):
"""Returns the requested content back in unicode.
@@ -370,8 +347,7 @@ def get_unicode_from_response(r):
def stream_decompress(iterator, mode='gzip'):
- """
- Stream decodes an iterator over compressed data
+ """Stream decodes an iterator over compressed data
:param iterator: An iterator over compressed data
:param mode: 'gzip' or 'deflate'
@@ -403,9 +379,10 @@ def stream_decompress(iterator, mode='gzip'):
def stream_untransfer(gen, resp):
- if 'gzip' in resp.headers.get('content-encoding', ''):
+ ce = resp.headers.get('content-encoding', '').lower()
+ if 'gzip' in ce:
gen = stream_decompress(gen, mode='gzip')
- elif 'deflate' in resp.headers.get('content-encoding', ''):
+ elif 'deflate' in ce:
gen = stream_decompress(gen, mode='deflate')
return gen
@@ -419,13 +396,12 @@ UNRESERVED_SET = frozenset(
def unquote_unreserved(uri):
"""Un-escape any percent-escape sequences in a URI that are unreserved
- characters.
- This leaves all reserved, illegal and non-ASCII bytes encoded.
+ characters. This leaves all reserved, illegal and non-ASCII bytes encoded.
"""
parts = uri.split('%')
for i in range(1, len(parts)):
h = parts[i][0:2]
- if len(h) == 2:
+ if len(h) == 2 and h.isalnum():
c = chr(int(h, 16))
if c in UNRESERVED_SET:
parts[i] = c + parts[i][2:]
@@ -447,7 +423,8 @@ def requote_uri(uri):
# or '%')
return quote(unquote_unreserved(uri), safe="!#$%&'()*+,/:;=?@[]~")
-def get_environ_proxies():
+
+def get_environ_proxies(url):
"""Return a dict of environment proxies."""
proxy_keys = [
@@ -455,10 +432,153 @@ def get_environ_proxies():
'http',
'https',
'ftp',
- 'socks',
- 'no'
+ 'socks'
]
get_proxy = lambda k: os.environ.get(k) or os.environ.get(k.upper())
+
+ # First check whether no_proxy is defined. If it is, check that the URL
+ # we're getting isn't in the no_proxy list.
+ no_proxy = get_proxy('no_proxy')
+
+ if no_proxy:
+ # We need to check whether we match here. We need to see if we match
+ # the end of the netloc, both with and without the port.
+ no_proxy = no_proxy.split(',')
+ netloc = urlparse(url).netloc
+
+ for host in no_proxy:
+ if netloc.endswith(host) or netloc.split(':')[0].endswith(host):
+ # The URL does match something in no_proxy, so we don't want
+ # to apply the proxies on this URL.
+ return {}
+
+ # If we get here, we either didn't have no_proxy set or we're not going
+ # anywhere that no_proxy applies to.
proxies = [(key, get_proxy(key + '_proxy')) for key in proxy_keys]
return dict([(key, val) for (key, val) in proxies if val])
+
+
+def default_user_agent():
+ """Return a string representing the default user agent."""
+ _implementation = platform.python_implementation()
+
+ if _implementation == 'CPython':
+ _implementation_version = platform.python_version()
+ elif _implementation == 'PyPy':
+ _implementation_version = '%s.%s.%s' % (
+ sys.pypy_version_info.major,
+ sys.pypy_version_info.minor,
+ sys.pypy_version_info.micro
+ )
+ if sys.pypy_version_info.releaselevel != 'final':
+ _implementation_version = ''.join([_implementation_version, sys.pypy_version_info.releaselevel])
+ elif _implementation == 'Jython':
+ _implementation_version = platform.python_version() # Complete Guess
+ elif _implementation == 'IronPython':
+ _implementation_version = platform.python_version() # Complete Guess
+ else:
+ _implementation_version = 'Unknown'
+
+ try:
+ p_system = platform.system()
+ p_release = platform.release()
+ except IOError:
+ p_system = 'Unknown'
+ p_release = 'Unknown'
+
+ return " ".join([
+ 'python-requests/%s' % __version__,
+ '%s/%s' % (_implementation, _implementation_version),
+ '%s/%s' % (p_system, p_release),
+ ])
+
+def default_headers():
+ return {
+ 'User-Agent': default_user_agent(),
+ 'Accept-Encoding': ', '.join(('gzip', 'deflate', 'compress')),
+ 'Accept': '*/*'
+ }
+
+
+def parse_header_links(value):
+ """Return a dict of parsed link headers proxies.
+
+ i.e. Link: <http:/.../front.jpeg>; rel=front; type="image/jpeg",<http://.../back.jpeg>; rel=back;type="image/jpeg"
+
+ """
+
+ links = []
+
+ replace_chars = " '\""
+
+ for val in value.split(","):
+ try:
+ url, params = val.split(";", 1)
+ except ValueError:
+ url, params = val, ''
+
+ link = {}
+
+ link["url"] = url.strip("<> '\"")
+
+ for param in params.split(";"):
+ try:
+ key,value = param.split("=")
+ except ValueError:
+ break
+
+ link[key.strip(replace_chars)] = value.strip(replace_chars)
+
+ links.append(link)
+
+ return links
+
+
+# Null bytes; no need to recreate these on each call to guess_json_utf
+_null = '\x00'.encode('ascii') # encoding to ASCII for Python 3
+_null2 = _null * 2
+_null3 = _null * 3
+
+
+def guess_json_utf(data):
+ # JSON always starts with two ASCII characters, so detection is as
+ # easy as counting the nulls and from their location and count
+ # determine the encoding. Also detect a BOM, if present.
+ sample = data[:4]
+ if sample in (codecs.BOM_UTF32_LE, codecs.BOM32_BE):
+ return 'utf-32' # BOM included
+ if sample[:3] == codecs.BOM_UTF8:
+ return 'utf-8-sig' # BOM included, MS style (discouraged)
+ if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
+ return 'utf-16' # BOM included
+ nullcount = sample.count(_null)
+ if nullcount == 0:
+ return 'utf-8'
+ if nullcount == 2:
+ if sample[::2] == _null2: # 1st and 3rd are null
+ return 'utf-16-be'
+ if sample[1::2] == _null2: # 2nd and 4th are null
+ return 'utf-16-le'
+ # Did not detect 2 valid UTF-16 ascii-range characters
+ if nullcount == 3:
+ if sample[:3] == _null3:
+ return 'utf-32-be'
+ if sample[1:] == _null3:
+ return 'utf-32-le'
+ # Did not detect a valid UTF-32 ascii-range character
+ return None
+
+
+def prepend_scheme_if_needed(url, new_scheme):
+ '''Given a URL that may or may not have a scheme, prepend the given scheme.
+ Does not replace a present scheme with the one provided as an argument.'''
+ scheme, netloc, path, params, query, fragment = urlparse(url, new_scheme)
+
+ # urlparse is a finicky beast, and sometimes decides that there isn't a
+ # netloc present. Assume that it's being over-cautious, and switch netloc
+ # and path if urlparse decided there was no netloc.
+ if not netloc:
+ netloc, path = path, netloc
+
+ return urlunparse((scheme, netloc, path, params, query, fragment))