diff options
author | SVN-Git Migration <python-modules-team@lists.alioth.debian.org> | 2015-10-08 13:41:31 -0700 |
---|---|---|
committer | SVN-Git Migration <python-modules-team@lists.alioth.debian.org> | 2015-10-08 13:41:31 -0700 |
commit | ca5cb993a3ce4fbdf50eebd31cb6b71eec9bc391 (patch) | |
tree | 0cb1e57a7e04660775dc9426c1e4476082f88d1d /test_requests.py | |
parent | 224200a9815f792f93632d03a38e4f0763ae69ef (diff) | |
download | python-requests-ca5cb993a3ce4fbdf50eebd31cb6b71eec9bc391.tar python-requests-ca5cb993a3ce4fbdf50eebd31cb6b71eec9bc391.tar.gz |
Imported Upstream version 2.2.1
Diffstat (limited to 'test_requests.py')
-rwxr-xr-x | test_requests.py | 653 |
1 files changed, 476 insertions, 177 deletions
diff --git a/test_requests.py b/test_requests.py index d1e6ed0..63897bb 100755 --- a/test_requests.py +++ b/test_requests.py @@ -6,15 +6,16 @@ from __future__ import division import json import os -import unittest import pickle +import unittest import requests import pytest -from requests.auth import HTTPDigestAuth from requests.adapters import HTTPAdapter -from requests.compat import str, cookielib, getproxies, urljoin, urlparse -from requests.cookies import cookiejar_from_dict +from requests.auth import HTTPDigestAuth +from requests.compat import ( + Morsel, cookielib, getproxies, str, urljoin, urlparse) +from requests.cookies import cookiejar_from_dict, morsel_to_cookie from requests.exceptions import InvalidURL, MissingSchema from requests.structures import CaseInsensitiveDict @@ -57,8 +58,10 @@ class RequestsTestCase(unittest.TestCase): requests.post def test_invalid_url(self): - self.assertRaises(MissingSchema, requests.get, 'hiwpefhipowhefopw') - self.assertRaises(InvalidURL, requests.get, 'http://') + with pytest.raises(MissingSchema): + requests.get('hiwpefhipowhefopw') + with pytest.raises(InvalidURL): + requests.get('http://') def test_basic_building(self): req = requests.Request() @@ -71,24 +74,22 @@ class RequestsTestCase(unittest.TestCase): def test_no_content_length(self): get_req = requests.Request('GET', httpbin('get')).prepare() - self.assertTrue('Content-Length' not in get_req.headers) + assert 'Content-Length' not in get_req.headers head_req = requests.Request('HEAD', httpbin('head')).prepare() - self.assertTrue('Content-Length' not in head_req.headers) + assert 'Content-Length' not in head_req.headers def test_path_is_not_double_encoded(self): request = requests.Request('GET', "http://0.0.0.0/get/test case").prepare() - self.assertEqual(request.path_url, "/get/test%20case") + assert request.path_url == '/get/test%20case' def test_params_are_added_before_fragment(self): request = requests.Request('GET', "http://example.com/path#fragment", params={"a": "b"}).prepare() - self.assertEqual(request.url, - "http://example.com/path?a=b#fragment") + assert request.url == "http://example.com/path?a=b#fragment" request = requests.Request('GET', "http://example.com/path?key=value#fragment", params={"a": "b"}).prepare() - self.assertEqual(request.url, - "http://example.com/path?key=value&a=b#fragment") + assert request.url == "http://example.com/path?key=value&a=b#fragment" def test_mixed_case_scheme_acceptable(self): s = requests.Session() @@ -100,8 +101,7 @@ class RequestsTestCase(unittest.TestCase): url = scheme + parts.netloc + parts.path r = requests.Request('GET', url) r = s.send(r.prepare()) - self.assertEqual(r.status_code, 200, - "failed for scheme %s" % scheme) + assert r.status_code == 200, 'failed for scheme {0}'.format(scheme) def test_HTTP_200_OK_GET_ALTERNATIVE(self): r = requests.Request('GET', httpbin('get')) @@ -110,11 +110,11 @@ class RequestsTestCase(unittest.TestCase): r = s.send(r.prepare()) - self.assertEqual(r.status_code, 200) + assert r.status_code == 200 def test_HTTP_302_ALLOW_REDIRECT_GET(self): r = requests.get(httpbin('redirect', '1')) - self.assertEqual(r.status_code, 200) + assert r.status_code == 200 # def test_HTTP_302_ALLOW_REDIRECT_POST(self): # r = requests.post(httpbin('status', '302'), data={'some': 'data'}) @@ -125,31 +125,31 @@ class RequestsTestCase(unittest.TestCase): r = requests.get(httpbin('user-agent'), headers=heads) - self.assertTrue(heads['User-agent'] in r.text) - self.assertEqual(r.status_code, 200) + assert heads['User-agent'] in r.text + assert r.status_code == 200 def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self): heads = {'User-agent': 'Mozilla/5.0'} r = requests.get(httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads) - self.assertEqual(r.status_code, 200) + assert r.status_code == 200 def test_set_cookie_on_301(self): s = requests.session() url = httpbin('cookies/set?foo=bar') r = s.get(url) - self.assertTrue(s.cookies['foo'] == 'bar') + assert s.cookies['foo'] == 'bar' def test_cookie_sent_on_redirect(self): s = requests.session() s.get(httpbin('cookies/set?foo=bar')) r = s.get(httpbin('redirect/1')) # redirects to httpbin('get') - self.assertTrue("Cookie" in r.json()["headers"]) + assert 'Cookie' in r.json()['headers'] def test_cookie_removed_on_expire(self): s = requests.session() s.get(httpbin('cookies/set?foo=bar')) - self.assertTrue(s.cookies['foo'] == 'bar') + assert s.cookies['foo'] == 'bar' s.get( httpbin('response-headers'), params={ @@ -162,7 +162,13 @@ class RequestsTestCase(unittest.TestCase): def test_cookie_quote_wrapped(self): s = requests.session() s.get(httpbin('cookies/set?foo="bar:baz"')) - self.assertTrue(s.cookies['foo'] == '"bar:baz"') + assert s.cookies['foo'] == '"bar:baz"' + + def test_cookie_persists_via_api(self): + s = requests.session() + r = s.get(httpbin('redirect/1'), cookies={'foo': 'bar'}) + assert 'foo' in r.request.headers['Cookie'] + assert 'foo' in r.history[0].request.headers['Cookie'] def test_request_cookie_overrides_session_cookie(self): s = requests.session() @@ -172,6 +178,12 @@ class RequestsTestCase(unittest.TestCase): # Session cookie should not be modified assert s.cookies['foo'] == 'bar' + def test_request_cookies_not_persisted(self): + s = requests.session() + s.get(httpbin('cookies'), cookies={'foo': 'baz'}) + # Sending a request with cookies should not add cookies to the session + assert not s.cookies + def test_generic_cookiejar_works(self): cj = cookielib.CookieJar() cookiejar_from_dict({'foo': 'bar'}, cj) @@ -183,11 +195,19 @@ class RequestsTestCase(unittest.TestCase): # Make sure the session cj is still the custom one assert s.cookies is cj + def test_param_cookiejar_works(self): + cj = cookielib.CookieJar() + cookiejar_from_dict({'foo' : 'bar'}, cj) + s = requests.session() + r = s.get(httpbin('cookies'), cookies=cj) + # Make sure the cookie was sent + assert r.json()['cookies']['foo'] == 'bar' + def test_requests_in_history_are_not_overridden(self): resp = requests.get(httpbin('redirect/3')) urls = [r.url for r in resp.history] req_urls = [r.request.url for r in resp.history] - self.assertEquals(urls, req_urls) + assert urls == req_urls def test_user_agent_transfers(self): @@ -196,37 +216,37 @@ class RequestsTestCase(unittest.TestCase): } r = requests.get(httpbin('user-agent'), headers=heads) - self.assertTrue(heads['User-agent'] in r.text) + assert heads['User-agent'] in r.text heads = { 'user-agent': 'Mozilla/5.0 (github.com/kennethreitz/requests)' } r = requests.get(httpbin('user-agent'), headers=heads) - self.assertTrue(heads['user-agent'] in r.text) + assert heads['user-agent'] in r.text def test_HTTP_200_OK_HEAD(self): r = requests.head(httpbin('get')) - self.assertEqual(r.status_code, 200) + assert r.status_code == 200 def test_HTTP_200_OK_PUT(self): r = requests.put(httpbin('put')) - self.assertEqual(r.status_code, 200) + assert r.status_code == 200 def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self): auth = ('user', 'pass') url = httpbin('basic-auth', 'user', 'pass') r = requests.get(url, auth=auth) - self.assertEqual(r.status_code, 200) + assert r.status_code == 200 r = requests.get(url) - self.assertEqual(r.status_code, 401) + assert r.status_code == 401 s = requests.session() s.auth = auth r = s.get(url) - self.assertEqual(r.status_code, 200) + assert r.status_code == 200 def test_basicauth_with_netrc(self): auth = ('user', 'pass') @@ -239,22 +259,22 @@ class RequestsTestCase(unittest.TestCase): # Should use netrc and work. r = requests.get(url) - self.assertEqual(r.status_code, 200) + assert r.status_code == 200 # Given auth should override and fail. r = requests.get(url, auth=wrong_auth) - self.assertEqual(r.status_code, 401) + assert r.status_code == 401 s = requests.session() # Should use netrc and work. r = s.get(url) - self.assertEqual(r.status_code, 200) + assert r.status_code == 200 # Given auth should override and fail. s.auth = wrong_auth r = s.get(url) - self.assertEqual(r.status_code, 401) + assert r.status_code == 401 def test_DIGEST_HTTP_200_OK_GET(self): @@ -262,15 +282,15 @@ class RequestsTestCase(unittest.TestCase): url = httpbin('digest-auth', 'auth', 'user', 'pass') r = requests.get(url, auth=auth) - self.assertEqual(r.status_code, 200) + assert r.status_code == 200 r = requests.get(url) - self.assertEqual(r.status_code, 401) + assert r.status_code == 401 s = requests.session() s.auth = HTTPDigestAuth('user', 'pass') r = s.get(url) - self.assertEqual(r.status_code, 200) + assert r.status_code == 200 def test_DIGEST_AUTH_RETURNS_COOKIE(self): url = httpbin('digest-auth', 'auth', 'user', 'pass') @@ -294,11 +314,10 @@ class RequestsTestCase(unittest.TestCase): url = httpbin('digest-auth', 'auth', 'user', 'pass') r = requests.get(url, auth=auth, stream=True) - self.assertNotEqual(r.raw.read(), b'') + assert r.raw.read() != b'' r = requests.get(url, auth=auth, stream=False) - self.assertEqual(r.raw.read(), b'') - + assert r.raw.read() == b'' def test_DIGESTAUTH_WRONG_HTTP_401_GET(self): @@ -306,15 +325,23 @@ class RequestsTestCase(unittest.TestCase): url = httpbin('digest-auth', 'auth', 'user', 'pass') r = requests.get(url, auth=auth) - self.assertEqual(r.status_code, 401) + assert r.status_code == 401 r = requests.get(url) - self.assertEqual(r.status_code, 401) + assert r.status_code == 401 s = requests.session() s.auth = auth r = s.get(url) - self.assertEqual(r.status_code, 401) + assert r.status_code == 401 + + def test_DIGESTAUTH_QUOTES_QOP_VALUE(self): + + auth = HTTPDigestAuth('user', 'pass') + url = httpbin('digest-auth', 'auth', 'user', 'pass') + + r = requests.get(url, auth=auth) + assert '"auth"' in r.request.headers['Authorization'] def test_POSTBIN_GET_POST_FILES(self): @@ -322,19 +349,17 @@ class RequestsTestCase(unittest.TestCase): post1 = requests.post(url).raise_for_status() post1 = requests.post(url, data={'some': 'data'}) - self.assertEqual(post1.status_code, 200) + assert post1.status_code == 200 with open('requirements.txt') as f: post2 = requests.post(url, files={'some': f}) - self.assertEqual(post2.status_code, 200) + assert post2.status_code == 200 post4 = requests.post(url, data='[{"some": "json"}]') - self.assertEqual(post4.status_code, 200) + assert post4.status_code == 200 - try: - requests.post(url, files=['bad file data']) - except ValueError: - pass + with pytest.raises(ValueError): + requests.post(url, files = ['bad file data']) def test_POSTBIN_GET_POST_FILES_WITH_DATA(self): @@ -342,19 +367,17 @@ class RequestsTestCase(unittest.TestCase): post1 = requests.post(url).raise_for_status() post1 = requests.post(url, data={'some': 'data'}) - self.assertEqual(post1.status_code, 200) + assert post1.status_code == 200 with open('requirements.txt') as f: post2 = requests.post(url, data={'some': 'data'}, files={'some': f}) - self.assertEqual(post2.status_code, 200) + assert post2.status_code == 200 post4 = requests.post(url, data='[{"some": "json"}]') - self.assertEqual(post4.status_code, 200) + assert post4.status_code == 200 - try: - requests.post(url, files=['bad file data']) - except ValueError: - pass + with pytest.raises(ValueError): + requests.post(url, files = ['bad file data']) def test_conflicting_post_params(self): url = httpbin('post') @@ -364,14 +387,15 @@ class RequestsTestCase(unittest.TestCase): def test_request_ok_set(self): r = requests.get(httpbin('status', '404')) - self.assertEqual(r.ok, False) + assert not r.ok def test_status_raising(self): r = requests.get(httpbin('status', '404')) - self.assertRaises(requests.exceptions.HTTPError, r.raise_for_status) + with pytest.raises(requests.exceptions.HTTPError): + r.raise_for_status() r = requests.get(httpbin('status', '500')) - self.assertFalse(r.ok) + assert not r.ok def test_decompress_gzip(self): r = requests.get(httpbin('gzip')) @@ -391,36 +415,36 @@ class RequestsTestCase(unittest.TestCase): def test_urlencoded_get_query_multivalued_param(self): r = requests.get(httpbin('get'), params=dict(test=['foo', 'baz'])) - self.assertEqual(r.status_code, 200) - self.assertEqual(r.url, httpbin('get?test=foo&test=baz')) + assert r.status_code == 200 + assert r.url == httpbin('get?test=foo&test=baz') def test_different_encodings_dont_break_post(self): r = requests.post(httpbin('post'), data={'stuff': json.dumps({'a': 123})}, params={'blah': 'asdf1234'}, files={'file': ('test_requests.py', open(__file__, 'rb'))}) - self.assertEqual(r.status_code, 200) + assert r.status_code == 200 def test_unicode_multipart_post(self): r = requests.post(httpbin('post'), data={'stuff': u'ëlïxr'}, files={'file': ('test_requests.py', open(__file__, 'rb'))}) - self.assertEqual(r.status_code, 200) + assert r.status_code == 200 r = requests.post(httpbin('post'), data={'stuff': u'ëlïxr'.encode('utf-8')}, files={'file': ('test_requests.py', open(__file__, 'rb'))}) - self.assertEqual(r.status_code, 200) + assert r.status_code == 200 r = requests.post(httpbin('post'), data={'stuff': 'elixr'}, files={'file': ('test_requests.py', open(__file__, 'rb'))}) - self.assertEqual(r.status_code, 200) + assert r.status_code == 200 r = requests.post(httpbin('post'), data={'stuff': 'elixr'.encode('utf-8')}, files={'file': ('test_requests.py', open(__file__, 'rb'))}) - self.assertEqual(r.status_code, 200) + assert r.status_code == 200 def test_unicode_multipart_post_fieldnames(self): filename = os.path.splitext(__file__)[0] + '.py' @@ -430,8 +454,13 @@ class RequestsTestCase(unittest.TestCase): files={'file': ('test_requests.py', open(filename, 'rb'))}) prep = r.prepare() - self.assertTrue(b'name="stuff"' in prep.body) - self.assertFalse(b'name="b\'stuff\'"' in prep.body) + assert b'name="stuff"' in prep.body + assert b'name="b\'stuff\'"' not in prep.body + + def test_unicode_method_name(self): + files = {'file': open('test_requests.py', 'rb')} + r = requests.request(method=u'POST', url=httpbin('post'), files=files) + assert r.status_code == 200 def test_custom_content_type(self): r = requests.post(httpbin('post'), @@ -439,8 +468,8 @@ class RequestsTestCase(unittest.TestCase): files={'file1': ('test_requests.py', open(__file__, 'rb')), 'file2': ('test_requests', open(__file__, 'rb'), 'text/py-content-type')}) - self.assertEqual(r.status_code, 200) - self.assertTrue(b"text/py-content-type" in r.request.body) + assert r.status_code == 200 + assert b"text/py-content-type" in r.request.body def test_hook_receives_request_arguments(self): def hook(resp, **kwargs): @@ -449,6 +478,25 @@ class RequestsTestCase(unittest.TestCase): requests.Request('GET', HTTPBIN, hooks={'response': hook}) + def test_session_hooks_are_used_with_no_request_hooks(self): + hook = lambda x, *args, **kwargs: x + s = requests.Session() + s.hooks['response'].append(hook) + r = requests.Request('GET', HTTPBIN) + prep = s.prepare_request(r) + assert prep.hooks['response'] != [] + assert prep.hooks['response'] == [hook] + + def test_session_hooks_are_overriden_by_request_hooks(self): + hook1 = lambda x, *args, **kwargs: x + hook2 = lambda x, *args, **kwargs: x + assert hook1 is not hook2 + s = requests.Session() + s.hooks['response'].append(hook2) + r = requests.Request('GET', HTTPBIN, hooks={'response': [hook1]}) + prep = s.prepare_request(r) + assert prep.hooks['response'] == [hook1] + def test_prepared_request_hook(self): def hook(resp, **kwargs): resp.hook_working = True @@ -461,7 +509,7 @@ class RequestsTestCase(unittest.TestCase): s.proxies = getproxies() resp = s.send(prep) - self.assertTrue(hasattr(resp, 'hook_working')) + assert hasattr(resp, 'hook_working') def test_prepared_from_session(self): class DummyAuth(requests.auth.AuthBase): @@ -470,7 +518,7 @@ class RequestsTestCase(unittest.TestCase): return r req = requests.Request('GET', httpbin('headers')) - self.assertEqual(req.auth, None) + assert not req.auth s = requests.Session() s.auth = DummyAuth() @@ -478,7 +526,7 @@ class RequestsTestCase(unittest.TestCase): prep = s.prepare_request(req) resp = s.send(prep) - self.assertTrue(resp.json()['headers']['Dummy-Auth-Test'], 'dummy-auth-test-ok') + assert resp.json()['headers']['Dummy-Auth-Test'] == 'dummy-auth-test-ok' def test_links(self): r = requests.Response() @@ -502,7 +550,7 @@ class RequestsTestCase(unittest.TestCase): 'x-ratelimit-limit': '60', 'x-ratelimit-remaining': '57' } - self.assertEqual(r.links['next']['rel'], 'next') + assert r.links['next']['rel'] == 'next' def test_cookie_parameters(self): key = 'some_cookie' @@ -514,20 +562,108 @@ class RequestsTestCase(unittest.TestCase): jar = requests.cookies.RequestsCookieJar() jar.set(key, value, secure=secure, domain=domain, rest=rest) - self.assertEqual(len(jar), 1) - self.assertTrue('some_cookie' in jar) + assert len(jar) == 1 + assert 'some_cookie' in jar cookie = list(jar)[0] - self.assertEqual(cookie.secure, secure) - self.assertEqual(cookie.domain, domain) - self.assertEqual(cookie._rest['HttpOnly'], rest['HttpOnly']) + assert cookie.secure == secure + assert cookie.domain == domain + assert cookie._rest['HttpOnly'] == rest['HttpOnly'] + + def test_cookie_as_dict_keeps_len(self): + key = 'some_cookie' + value = 'some_value' + + key1 = 'some_cookie1' + value1 = 'some_value1' + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value) + jar.set(key1, value1) + + d1 = dict(jar) + d2 = dict(jar.iteritems()) + d3 = dict(jar.items()) + + assert len(jar) == 2 + assert len(d1) == 2 + assert len(d2) == 2 + assert len(d3) == 2 + + def test_cookie_as_dict_keeps_items(self): + key = 'some_cookie' + value = 'some_value' + + key1 = 'some_cookie1' + value1 = 'some_value1' + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value) + jar.set(key1, value1) + + d1 = dict(jar) + d2 = dict(jar.iteritems()) + d3 = dict(jar.items()) + + assert d1['some_cookie'] == 'some_value' + assert d2['some_cookie'] == 'some_value' + assert d3['some_cookie1'] == 'some_value1' + + def test_cookie_as_dict_keys(self): + key = 'some_cookie' + value = 'some_value' + + key1 = 'some_cookie1' + value1 = 'some_value1' + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value) + jar.set(key1, value1) + + keys = jar.keys() + assert keys == list(keys) + # make sure one can use keys multiple times + assert list(keys) == list(keys) + + def test_cookie_as_dict_values(self): + key = 'some_cookie' + value = 'some_value' + + key1 = 'some_cookie1' + value1 = 'some_value1' + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value) + jar.set(key1, value1) + + values = jar.values() + assert values == list(values) + # make sure one can use values multiple times + assert list(values) == list(values) + + def test_cookie_as_dict_items(self): + key = 'some_cookie' + value = 'some_value' + + key1 = 'some_cookie1' + value1 = 'some_value1' + + jar = requests.cookies.RequestsCookieJar() + jar.set(key, value) + jar.set(key1, value1) + + items = jar.items() + assert items == list(items) + # make sure one can use items multiple times + assert list(items) == list(items) + def test_time_elapsed_blank(self): r = requests.get(httpbin('get')) td = r.elapsed total_seconds = ((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6) - self.assertTrue(total_seconds > 0.0) + assert total_seconds > 0.0 def test_response_is_iterable(self): r = requests.Response() @@ -538,27 +674,55 @@ class RequestsTestCase(unittest.TestCase): return read_(amt) setattr(io, 'read', read_mock) r.raw = io - self.assertTrue(next(iter(r))) + assert next(iter(r)) io.close() + def test_request_and_response_are_pickleable(self): + r = requests.get(httpbin('get')) + + # verify we can pickle the original request + assert pickle.loads(pickle.dumps(r.request)) + + # verify we can pickle the response and that we have access to + # the original request. + pr = pickle.loads(pickle.dumps(r)) + assert r.request.url == pr.request.url + assert r.request.headers == pr.request.headers + def test_get_auth_from_url(self): url = 'http://user:pass@complex.url.com/path?query=yes' - self.assertEqual(('user', 'pass'), - requests.utils.get_auth_from_url(url)) + assert ('user', 'pass') == requests.utils.get_auth_from_url(url) + + def test_get_auth_from_url_encoded_spaces(self): + url = 'http://user:pass%20pass@complex.url.com/path?query=yes' + assert ('user', 'pass pass') == requests.utils.get_auth_from_url(url) + + def test_get_auth_from_url_not_encoded_spaces(self): + url = 'http://user:pass pass@complex.url.com/path?query=yes' + assert ('user', 'pass pass') == requests.utils.get_auth_from_url(url) + + def test_get_auth_from_url_percent_chars(self): + url = 'http://user%25user:pass@complex.url.com/path?query=yes' + assert ('user%user', 'pass') == requests.utils.get_auth_from_url(url) + + def test_get_auth_from_url_encoded_hashes(self): + url = 'http://user:pass%23pass@complex.url.com/path?query=yes' + assert ('user', 'pass#pass') == requests.utils.get_auth_from_url(url) def test_cannot_send_unprepared_requests(self): r = requests.Request(url=HTTPBIN) - self.assertRaises(ValueError, requests.Session().send, r) + with pytest.raises(ValueError): + requests.Session().send(r) def test_http_error(self): error = requests.exceptions.HTTPError() - self.assertEqual(error.response, None) + assert not error.response response = requests.Response() error = requests.exceptions.HTTPError(response=response) - self.assertEqual(error.response, response) + assert error.response == response error = requests.exceptions.HTTPError('message', response=response) - self.assertEqual(str(error), 'message') - self.assertEqual(error.response, response) + assert str(error) == 'message' + assert error.response == response def test_session_pickling(self): r = requests.Request('GET', httpbin('get')) @@ -568,7 +732,7 @@ class RequestsTestCase(unittest.TestCase): s.proxies = getproxies() r = s.send(r.prepare()) - self.assertEqual(r.status_code, 200) + assert r.status_code == 200 def test_fixes_1329(self): """ @@ -579,30 +743,21 @@ class RequestsTestCase(unittest.TestCase): s.headers.update({'accept': 'application/json'}) r = s.get(httpbin('get')) headers = r.request.headers - self.assertEqual( - headers['accept'], - 'application/json' - ) - self.assertEqual( - headers['Accept'], - 'application/json' - ) - self.assertEqual( - headers['ACCEPT'], - 'application/json' - ) + assert headers['accept'] == 'application/json' + assert headers['Accept'] == 'application/json' + assert headers['ACCEPT'] == 'application/json' def test_uppercase_scheme_redirect(self): parts = urlparse(httpbin('html')) url = "HTTP://" + parts.netloc + parts.path r = requests.get(httpbin('redirect-to'), params={'url': url}) - self.assertEqual(r.status_code, 200) - self.assertEqual(r.url.lower(), url.lower()) + assert r.status_code == 200 + assert r.url.lower() == url.lower() def test_transport_adapter_ordering(self): s = requests.Session() order = ['https://', 'http://'] - self.assertEqual(order, list(s.adapters)) + assert order == list(s.adapters) s.mount('http://git', HTTPAdapter()) s.mount('http://github', HTTPAdapter()) s.mount('http://github.com', HTTPAdapter()) @@ -615,7 +770,7 @@ class RequestsTestCase(unittest.TestCase): 'https://', 'http://', ] - self.assertEqual(order, list(s.adapters)) + assert order == list(s.adapters) s.mount('http://gittip', HTTPAdapter()) s.mount('http://gittip.com', HTTPAdapter()) s.mount('http://gittip.com/about/', HTTPAdapter()) @@ -630,12 +785,12 @@ class RequestsTestCase(unittest.TestCase): 'https://', 'http://', ] - self.assertEqual(order, list(s.adapters)) + assert order == list(s.adapters) s2 = requests.Session() s2.adapters = {'http://': HTTPAdapter()} s2.mount('https://', HTTPAdapter()) - self.assertTrue('http://' in s2.adapters) - self.assertTrue('https://' in s2.adapters) + assert 'http://' in s2.adapters + assert 'https://' in s2.adapters def test_header_remove_is_case_insensitive(self): # From issue #1321 @@ -658,7 +813,7 @@ class RequestsTestCase(unittest.TestCase): 'exactly-------------sixty-----------three------------characters', ) r = requests.Request('GET', url).prepare() - self.assertEqual(r.url, url) + assert r.url == url def test_header_keys_are_native(self): headers = {u'unicode': 'blah', 'byte'.encode('ascii'): 'blah'} @@ -667,8 +822,8 @@ class RequestsTestCase(unittest.TestCase): # This is testing that they are builtin strings. A bit weird, but there # we go. - self.assertTrue('unicode' in p.headers.keys()) - self.assertTrue('byte' in p.headers.keys()) + assert 'unicode' in p.headers.keys() + assert 'byte' in p.headers.keys() def test_can_send_nonstring_objects_with_files(self): data = {'a': 0.0} @@ -676,42 +831,61 @@ class RequestsTestCase(unittest.TestCase): r = requests.Request('POST', httpbin('post'), data=data, files=files) p = r.prepare() - self.assertTrue('multipart/form-data' in p.headers['Content-Type']) + assert 'multipart/form-data' in p.headers['Content-Type'] + + def test_autoset_header_values_are_native(self): + data = 'this is a string' + length = '16' + req = requests.Request('POST', httpbin('post'), data=data) + p = req.prepare() + + assert p.headers['Content-Length'] == length + + def test_oddball_schemes_dont_check_URLs(self): + test_urls = ( + 'data:image/gif;base64,R0lGODlhAQABAHAAACH5BAUAAAAALAAAAAABAAEAAAICRAEAOw==', + 'file:///etc/passwd', + 'magnet:?xt=urn:btih:be08f00302bc2d1d3cfa3af02024fa647a271431', + ) + for test_url in test_urls: + req = requests.Request('GET', test_url) + preq = req.prepare() + assert test_url == preq.url class TestContentEncodingDetection(unittest.TestCase): def test_none(self): encodings = requests.utils.get_encodings_from_content('') - self.assertEqual(len(encodings), 0) + assert not len(encodings) def test_html_charset(self): """HTML5 meta charset attribute""" content = '<meta charset="UTF-8">' encodings = requests.utils.get_encodings_from_content(content) - self.assertEqual(len(encodings), 1) - self.assertEqual(encodings[0], 'UTF-8') + assert len(encodings) == 1 + assert encodings[0] == 'UTF-8' def test_html4_pragma(self): """HTML4 pragma directive""" content = '<meta http-equiv="Content-type" content="text/html;charset=UTF-8">' encodings = requests.utils.get_encodings_from_content(content) - self.assertEqual(len(encodings), 1) - self.assertEqual(encodings[0], 'UTF-8') + assert len(encodings) == 1 + assert encodings[0] == 'UTF-8' def test_xhtml_pragma(self): """XHTML 1.x served with text/html MIME type""" content = '<meta http-equiv="Content-type" content="text/html;charset=UTF-8" />' encodings = requests.utils.get_encodings_from_content(content) - self.assertEqual(len(encodings), 1) - self.assertEqual(encodings[0], 'UTF-8') + assert len(encodings) == 1 + assert encodings[0] == 'UTF-8' def test_xml(self): """XHTML 1.x served as XML""" content = '<?xml version="1.0" encoding="UTF-8"?>' encodings = requests.utils.get_encodings_from_content(content) - self.assertEqual(len(encodings), 1) - self.assertEqual(encodings[0], 'UTF-8') + assert len(encodings) == 1 + assert encodings[0] == 'UTF-8' def test_precedence(self): content = ''' @@ -720,44 +894,44 @@ class TestContentEncodingDetection(unittest.TestCase): <meta http-equiv="Content-type" content="text/html;charset=HTML4" /> '''.strip() encodings = requests.utils.get_encodings_from_content(content) - self.assertEqual(encodings, ['HTML5', 'HTML4', 'XML']) + assert encodings == ['HTML5', 'HTML4', 'XML'] class TestCaseInsensitiveDict(unittest.TestCase): def test_mapping_init(self): cid = CaseInsensitiveDict({'Foo': 'foo','BAr': 'bar'}) - self.assertEqual(len(cid), 2) - self.assertTrue('foo' in cid) - self.assertTrue('bar' in cid) + assert len(cid) == 2 + assert 'foo' in cid + assert 'bar' in cid def test_iterable_init(self): cid = CaseInsensitiveDict([('Foo', 'foo'), ('BAr', 'bar')]) - self.assertEqual(len(cid), 2) - self.assertTrue('foo' in cid) - self.assertTrue('bar' in cid) + assert len(cid) == 2 + assert 'foo' in cid + assert 'bar' in cid def test_kwargs_init(self): cid = CaseInsensitiveDict(FOO='foo', BAr='bar') - self.assertEqual(len(cid), 2) - self.assertTrue('foo' in cid) - self.assertTrue('bar' in cid) + assert len(cid) == 2 + assert 'foo' in cid + assert 'bar' in cid def test_docstring_example(self): cid = CaseInsensitiveDict() cid['Accept'] = 'application/json' - self.assertEqual(cid['aCCEPT'], 'application/json') - self.assertEqual(list(cid), ['Accept']) + assert cid['aCCEPT'] == 'application/json' + assert list(cid) == ['Accept'] def test_len(self): cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'}) cid['A'] = 'a' - self.assertEqual(len(cid), 2) + assert len(cid) == 2 def test_getitem(self): cid = CaseInsensitiveDict({'Spam': 'blueval'}) - self.assertEqual(cid['spam'], 'blueval') - self.assertEqual(cid['SPAM'], 'blueval') + assert cid['spam'] == 'blueval' + assert cid['SPAM'] == 'blueval' def test_fixes_649(self): """__setitem__ should behave case-insensitively.""" @@ -766,74 +940,68 @@ class TestCaseInsensitiveDict(unittest.TestCase): cid['Spam'] = 'twoval' cid['sPAM'] = 'redval' cid['SPAM'] = 'blueval' - self.assertEqual(cid['spam'], 'blueval') - self.assertEqual(cid['SPAM'], 'blueval') - self.assertEqual(list(cid.keys()), ['SPAM']) + assert cid['spam'] == 'blueval' + assert cid['SPAM'] == 'blueval' + assert list(cid.keys()) == ['SPAM'] def test_delitem(self): cid = CaseInsensitiveDict() cid['Spam'] = 'someval' del cid['sPam'] - self.assertFalse('spam' in cid) - self.assertEqual(len(cid), 0) + assert 'spam' not in cid + assert len(cid) == 0 def test_contains(self): cid = CaseInsensitiveDict() cid['Spam'] = 'someval' - self.assertTrue('Spam' in cid) - self.assertTrue('spam' in cid) - self.assertTrue('SPAM' in cid) - self.assertTrue('sPam' in cid) - self.assertFalse('notspam' in cid) + assert 'Spam' in cid + assert 'spam' in cid + assert 'SPAM' in cid + assert 'sPam' in cid + assert 'notspam' not in cid def test_get(self): cid = CaseInsensitiveDict() cid['spam'] = 'oneval' cid['SPAM'] = 'blueval' - self.assertEqual(cid.get('spam'), 'blueval') - self.assertEqual(cid.get('SPAM'), 'blueval') - self.assertEqual(cid.get('sPam'), 'blueval') - self.assertEqual(cid.get('notspam', 'default'), 'default') + assert cid.get('spam') == 'blueval' + assert cid.get('SPAM') == 'blueval' + assert cid.get('sPam') == 'blueval' + assert cid.get('notspam', 'default') == 'default' def test_update(self): cid = CaseInsensitiveDict() cid['spam'] = 'blueval' cid.update({'sPam': 'notblueval'}) - self.assertEqual(cid['spam'], 'notblueval') + assert cid['spam'] == 'notblueval' cid = CaseInsensitiveDict({'Foo': 'foo','BAr': 'bar'}) cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'}) - self.assertEqual(len(cid), 2) - self.assertEqual(cid['foo'], 'anotherfoo') - self.assertEqual(cid['bar'], 'anotherbar') + assert len(cid) == 2 + assert cid['foo'] == 'anotherfoo' + assert cid['bar'] == 'anotherbar' def test_update_retains_unchanged(self): cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'}) cid.update({'foo': 'newfoo'}) - self.assertEquals(cid['bar'], 'bar') + assert cid['bar'] == 'bar' def test_iter(self): cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'}) keys = frozenset(['Spam', 'Eggs']) - self.assertEqual(frozenset(iter(cid)), keys) + assert frozenset(iter(cid)) == keys def test_equality(self): cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'}) othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'}) - self.assertEqual(cid, othercid) + assert cid == othercid del othercid['spam'] - self.assertNotEqual(cid, othercid) - self.assertEqual(cid, {'spam': 'blueval', 'eggs': 'redval'}) + assert cid != othercid + assert cid == {'spam': 'blueval', 'eggs': 'redval'} def test_setdefault(self): cid = CaseInsensitiveDict({'Spam': 'blueval'}) - self.assertEqual( - cid.setdefault('spam', 'notblueval'), - 'blueval' - ) - self.assertEqual( - cid.setdefault('notspam', 'notblueval'), - 'notblueval' - ) + assert cid.setdefault('spam', 'notblueval') == 'blueval' + assert cid.setdefault('notspam', 'notblueval') == 'notblueval' def test_lower_items(self): cid = CaseInsensitiveDict({ @@ -842,7 +1010,7 @@ class TestCaseInsensitiveDict(unittest.TestCase): }) keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items()) lowerkeyset = frozenset(['accept', 'user-agent']) - self.assertEqual(keyset, lowerkeyset) + assert keyset == lowerkeyset def test_preserve_key_case(self): cid = CaseInsensitiveDict({ @@ -850,9 +1018,9 @@ class TestCaseInsensitiveDict(unittest.TestCase): 'user-Agent': 'requests', }) keyset = frozenset(['Accept', 'user-Agent']) - self.assertEqual(frozenset(i[0] for i in cid.items()), keyset) - self.assertEqual(frozenset(cid.keys()), keyset) - self.assertEqual(frozenset(cid), keyset) + assert frozenset(i[0] for i in cid.items()) == keyset + assert frozenset(cid.keys()) == keyset + assert frozenset(cid) == keyset def test_preserve_last_key_case(self): cid = CaseInsensitiveDict({ @@ -862,9 +1030,140 @@ class TestCaseInsensitiveDict(unittest.TestCase): cid.update({'ACCEPT': 'application/json'}) cid['USER-AGENT'] = 'requests' keyset = frozenset(['ACCEPT', 'USER-AGENT']) - self.assertEqual(frozenset(i[0] for i in cid.items()), keyset) - self.assertEqual(frozenset(cid.keys()), keyset) - self.assertEqual(frozenset(cid), keyset) + assert frozenset(i[0] for i in cid.items()) == keyset + assert frozenset(cid.keys()) == keyset + assert frozenset(cid) == keyset + + +class UtilsTestCase(unittest.TestCase): + + def test_super_len_io_streams(self): + """ Ensures that we properly deal with different kinds of IO streams. """ + # uses StringIO or io.StringIO (see import above) + from io import BytesIO + from requests.utils import super_len + + assert super_len(StringIO.StringIO()) == 0 + assert super_len(StringIO.StringIO('with so much drama in the LBC')) == 29 + + assert super_len(BytesIO()) == 0 + assert super_len(BytesIO(b"it's kinda hard bein' snoop d-o-double-g")) == 40 + + try: + import cStringIO + except ImportError: + pass + else: + assert super_len(cStringIO.StringIO('but some how, some way...')) == 25 + + def test_get_environ_proxies_ip_ranges(self): + """ Ensures that IP addresses are correctly matches with ranges in no_proxy variable """ + from requests.utils import get_environ_proxies + os.environ['no_proxy'] = "192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1" + assert get_environ_proxies('http://192.168.0.1:5000/') == {} + assert get_environ_proxies('http://192.168.0.1/') == {} + assert get_environ_proxies('http://172.16.1.1/') == {} + assert get_environ_proxies('http://172.16.1.1:5000/') == {} + assert get_environ_proxies('http://192.168.1.1:5000/') != {} + assert get_environ_proxies('http://192.168.1.1/') != {} + + def test_get_environ_proxies(self): + """ Ensures that IP addresses are correctly matches with ranges in no_proxy variable """ + from requests.utils import get_environ_proxies + os.environ['no_proxy'] = "127.0.0.1,localhost.localdomain,192.168.0.0/24,172.16.1.1" + assert get_environ_proxies('http://localhost.localdomain:5000/v1.0/') == {} + assert get_environ_proxies('http://www.requests.com/') != {} + + def test_is_ipv4_address(self): + from requests.utils import is_ipv4_address + assert is_ipv4_address('8.8.8.8') + assert not is_ipv4_address('8.8.8.8.8') + assert not is_ipv4_address('localhost.localdomain') + + def test_is_valid_cidr(self): + from requests.utils import is_valid_cidr + assert not is_valid_cidr('8.8.8.8') + assert is_valid_cidr('192.168.1.0/24') + + def test_dotted_netmask(self): + from requests.utils import dotted_netmask + assert dotted_netmask(8) == '255.0.0.0' + assert dotted_netmask(24) == '255.255.255.0' + assert dotted_netmask(25) == '255.255.255.128' + + def test_address_in_network(self): + from requests.utils import address_in_network + assert address_in_network('192.168.1.1', '192.168.1.0/24') + assert not address_in_network('172.16.0.1', '192.168.1.0/24') + + def test_get_auth_from_url(self): + """ Ensures that username and password in well-encoded URI as per RFC 3986 are correclty extracted """ + from requests.utils import get_auth_from_url + from requests.compat import quote + percent_encoding_test_chars = "%!*'();:@&=+$,/?#[] " + url_address = "request.com/url.html#test" + url = "http://" + quote(percent_encoding_test_chars, '') + ':' + quote(percent_encoding_test_chars, '') + '@' + url_address + (username, password) = get_auth_from_url(url) + assert username == percent_encoding_test_chars + assert password == percent_encoding_test_chars + + +class TestMorselToCookieExpires(unittest.TestCase): + + """Tests for morsel_to_cookie when morsel contains expires.""" + + def test_expires_valid_str(self): + """Test case where we convert expires from string time.""" + + morsel = Morsel() + morsel['expires'] = 'Thu, 01-Jan-1970 00:00:01 GMT' + cookie = morsel_to_cookie(morsel) + assert cookie.expires == 1 + + def test_expires_invalid_int(self): + """Test case where an invalid type is passed for expires.""" + + morsel = Morsel() + morsel['expires'] = 100 + with pytest.raises(TypeError): + morsel_to_cookie(morsel) + + def test_expires_invalid_str(self): + """Test case where an invalid string is input.""" + + morsel = Morsel() + morsel['expires'] = 'woops' + with pytest.raises(ValueError): + morsel_to_cookie(morsel) + + def test_expires_none(self): + """Test case where expires is None.""" + + morsel = Morsel() + morsel['expires'] = None + cookie = morsel_to_cookie(morsel) + assert cookie.expires is None + + +class TestMorselToCookieMaxAge(unittest.TestCase): + + """Tests for morsel_to_cookie when morsel contains max-age.""" + + def test_max_age_valid_int(self): + """Test case where a valid max age in seconds is passed.""" + + morsel = Morsel() + morsel['max-age'] = 60 + cookie = morsel_to_cookie(morsel) + assert isinstance(cookie.expires, int) + + def test_max_age_invalid_str(self): + """Test case where a invalid max age is passed.""" + + morsel = Morsel() + morsel['max-age'] = 'woops' + with pytest.raises(TypeError): + morsel_to_cookie(morsel) if __name__ == '__main__': |