aboutsummaryrefslogtreecommitdiff
path: root/test_requests.py
diff options
context:
space:
mode:
authorSVN-Git Migration <python-modules-team@lists.alioth.debian.org>2015-10-08 13:41:31 -0700
committerSVN-Git Migration <python-modules-team@lists.alioth.debian.org>2015-10-08 13:41:31 -0700
commitca5cb993a3ce4fbdf50eebd31cb6b71eec9bc391 (patch)
tree0cb1e57a7e04660775dc9426c1e4476082f88d1d /test_requests.py
parent224200a9815f792f93632d03a38e4f0763ae69ef (diff)
downloadpython-requests-ca5cb993a3ce4fbdf50eebd31cb6b71eec9bc391.tar
python-requests-ca5cb993a3ce4fbdf50eebd31cb6b71eec9bc391.tar.gz
Imported Upstream version 2.2.1
Diffstat (limited to 'test_requests.py')
-rwxr-xr-xtest_requests.py653
1 files changed, 476 insertions, 177 deletions
diff --git a/test_requests.py b/test_requests.py
index d1e6ed0..63897bb 100755
--- a/test_requests.py
+++ b/test_requests.py
@@ -6,15 +6,16 @@
from __future__ import division
import json
import os
-import unittest
import pickle
+import unittest
import requests
import pytest
-from requests.auth import HTTPDigestAuth
from requests.adapters import HTTPAdapter
-from requests.compat import str, cookielib, getproxies, urljoin, urlparse
-from requests.cookies import cookiejar_from_dict
+from requests.auth import HTTPDigestAuth
+from requests.compat import (
+ Morsel, cookielib, getproxies, str, urljoin, urlparse)
+from requests.cookies import cookiejar_from_dict, morsel_to_cookie
from requests.exceptions import InvalidURL, MissingSchema
from requests.structures import CaseInsensitiveDict
@@ -57,8 +58,10 @@ class RequestsTestCase(unittest.TestCase):
requests.post
def test_invalid_url(self):
- self.assertRaises(MissingSchema, requests.get, 'hiwpefhipowhefopw')
- self.assertRaises(InvalidURL, requests.get, 'http://')
+ with pytest.raises(MissingSchema):
+ requests.get('hiwpefhipowhefopw')
+ with pytest.raises(InvalidURL):
+ requests.get('http://')
def test_basic_building(self):
req = requests.Request()
@@ -71,24 +74,22 @@ class RequestsTestCase(unittest.TestCase):
def test_no_content_length(self):
get_req = requests.Request('GET', httpbin('get')).prepare()
- self.assertTrue('Content-Length' not in get_req.headers)
+ assert 'Content-Length' not in get_req.headers
head_req = requests.Request('HEAD', httpbin('head')).prepare()
- self.assertTrue('Content-Length' not in head_req.headers)
+ assert 'Content-Length' not in head_req.headers
def test_path_is_not_double_encoded(self):
request = requests.Request('GET', "http://0.0.0.0/get/test case").prepare()
- self.assertEqual(request.path_url, "/get/test%20case")
+ assert request.path_url == '/get/test%20case'
def test_params_are_added_before_fragment(self):
request = requests.Request('GET',
"http://example.com/path#fragment", params={"a": "b"}).prepare()
- self.assertEqual(request.url,
- "http://example.com/path?a=b#fragment")
+ assert request.url == "http://example.com/path?a=b#fragment"
request = requests.Request('GET',
"http://example.com/path?key=value#fragment", params={"a": "b"}).prepare()
- self.assertEqual(request.url,
- "http://example.com/path?key=value&a=b#fragment")
+ assert request.url == "http://example.com/path?key=value&a=b#fragment"
def test_mixed_case_scheme_acceptable(self):
s = requests.Session()
@@ -100,8 +101,7 @@ class RequestsTestCase(unittest.TestCase):
url = scheme + parts.netloc + parts.path
r = requests.Request('GET', url)
r = s.send(r.prepare())
- self.assertEqual(r.status_code, 200,
- "failed for scheme %s" % scheme)
+ assert r.status_code == 200, 'failed for scheme {0}'.format(scheme)
def test_HTTP_200_OK_GET_ALTERNATIVE(self):
r = requests.Request('GET', httpbin('get'))
@@ -110,11 +110,11 @@ class RequestsTestCase(unittest.TestCase):
r = s.send(r.prepare())
- self.assertEqual(r.status_code, 200)
+ assert r.status_code == 200
def test_HTTP_302_ALLOW_REDIRECT_GET(self):
r = requests.get(httpbin('redirect', '1'))
- self.assertEqual(r.status_code, 200)
+ assert r.status_code == 200
# def test_HTTP_302_ALLOW_REDIRECT_POST(self):
# r = requests.post(httpbin('status', '302'), data={'some': 'data'})
@@ -125,31 +125,31 @@ class RequestsTestCase(unittest.TestCase):
r = requests.get(httpbin('user-agent'), headers=heads)
- self.assertTrue(heads['User-agent'] in r.text)
- self.assertEqual(r.status_code, 200)
+ assert heads['User-agent'] in r.text
+ assert r.status_code == 200
def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self):
heads = {'User-agent': 'Mozilla/5.0'}
r = requests.get(httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads)
- self.assertEqual(r.status_code, 200)
+ assert r.status_code == 200
def test_set_cookie_on_301(self):
s = requests.session()
url = httpbin('cookies/set?foo=bar')
r = s.get(url)
- self.assertTrue(s.cookies['foo'] == 'bar')
+ assert s.cookies['foo'] == 'bar'
def test_cookie_sent_on_redirect(self):
s = requests.session()
s.get(httpbin('cookies/set?foo=bar'))
r = s.get(httpbin('redirect/1')) # redirects to httpbin('get')
- self.assertTrue("Cookie" in r.json()["headers"])
+ assert 'Cookie' in r.json()['headers']
def test_cookie_removed_on_expire(self):
s = requests.session()
s.get(httpbin('cookies/set?foo=bar'))
- self.assertTrue(s.cookies['foo'] == 'bar')
+ assert s.cookies['foo'] == 'bar'
s.get(
httpbin('response-headers'),
params={
@@ -162,7 +162,13 @@ class RequestsTestCase(unittest.TestCase):
def test_cookie_quote_wrapped(self):
s = requests.session()
s.get(httpbin('cookies/set?foo="bar:baz"'))
- self.assertTrue(s.cookies['foo'] == '"bar:baz"')
+ assert s.cookies['foo'] == '"bar:baz"'
+
+ def test_cookie_persists_via_api(self):
+ s = requests.session()
+ r = s.get(httpbin('redirect/1'), cookies={'foo': 'bar'})
+ assert 'foo' in r.request.headers['Cookie']
+ assert 'foo' in r.history[0].request.headers['Cookie']
def test_request_cookie_overrides_session_cookie(self):
s = requests.session()
@@ -172,6 +178,12 @@ class RequestsTestCase(unittest.TestCase):
# Session cookie should not be modified
assert s.cookies['foo'] == 'bar'
+ def test_request_cookies_not_persisted(self):
+ s = requests.session()
+ s.get(httpbin('cookies'), cookies={'foo': 'baz'})
+ # Sending a request with cookies should not add cookies to the session
+ assert not s.cookies
+
def test_generic_cookiejar_works(self):
cj = cookielib.CookieJar()
cookiejar_from_dict({'foo': 'bar'}, cj)
@@ -183,11 +195,19 @@ class RequestsTestCase(unittest.TestCase):
# Make sure the session cj is still the custom one
assert s.cookies is cj
+ def test_param_cookiejar_works(self):
+ cj = cookielib.CookieJar()
+ cookiejar_from_dict({'foo' : 'bar'}, cj)
+ s = requests.session()
+ r = s.get(httpbin('cookies'), cookies=cj)
+ # Make sure the cookie was sent
+ assert r.json()['cookies']['foo'] == 'bar'
+
def test_requests_in_history_are_not_overridden(self):
resp = requests.get(httpbin('redirect/3'))
urls = [r.url for r in resp.history]
req_urls = [r.request.url for r in resp.history]
- self.assertEquals(urls, req_urls)
+ assert urls == req_urls
def test_user_agent_transfers(self):
@@ -196,37 +216,37 @@ class RequestsTestCase(unittest.TestCase):
}
r = requests.get(httpbin('user-agent'), headers=heads)
- self.assertTrue(heads['User-agent'] in r.text)
+ assert heads['User-agent'] in r.text
heads = {
'user-agent': 'Mozilla/5.0 (github.com/kennethreitz/requests)'
}
r = requests.get(httpbin('user-agent'), headers=heads)
- self.assertTrue(heads['user-agent'] in r.text)
+ assert heads['user-agent'] in r.text
def test_HTTP_200_OK_HEAD(self):
r = requests.head(httpbin('get'))
- self.assertEqual(r.status_code, 200)
+ assert r.status_code == 200
def test_HTTP_200_OK_PUT(self):
r = requests.put(httpbin('put'))
- self.assertEqual(r.status_code, 200)
+ assert r.status_code == 200
def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self):
auth = ('user', 'pass')
url = httpbin('basic-auth', 'user', 'pass')
r = requests.get(url, auth=auth)
- self.assertEqual(r.status_code, 200)
+ assert r.status_code == 200
r = requests.get(url)
- self.assertEqual(r.status_code, 401)
+ assert r.status_code == 401
s = requests.session()
s.auth = auth
r = s.get(url)
- self.assertEqual(r.status_code, 200)
+ assert r.status_code == 200
def test_basicauth_with_netrc(self):
auth = ('user', 'pass')
@@ -239,22 +259,22 @@ class RequestsTestCase(unittest.TestCase):
# Should use netrc and work.
r = requests.get(url)
- self.assertEqual(r.status_code, 200)
+ assert r.status_code == 200
# Given auth should override and fail.
r = requests.get(url, auth=wrong_auth)
- self.assertEqual(r.status_code, 401)
+ assert r.status_code == 401
s = requests.session()
# Should use netrc and work.
r = s.get(url)
- self.assertEqual(r.status_code, 200)
+ assert r.status_code == 200
# Given auth should override and fail.
s.auth = wrong_auth
r = s.get(url)
- self.assertEqual(r.status_code, 401)
+ assert r.status_code == 401
def test_DIGEST_HTTP_200_OK_GET(self):
@@ -262,15 +282,15 @@ class RequestsTestCase(unittest.TestCase):
url = httpbin('digest-auth', 'auth', 'user', 'pass')
r = requests.get(url, auth=auth)
- self.assertEqual(r.status_code, 200)
+ assert r.status_code == 200
r = requests.get(url)
- self.assertEqual(r.status_code, 401)
+ assert r.status_code == 401
s = requests.session()
s.auth = HTTPDigestAuth('user', 'pass')
r = s.get(url)
- self.assertEqual(r.status_code, 200)
+ assert r.status_code == 200
def test_DIGEST_AUTH_RETURNS_COOKIE(self):
url = httpbin('digest-auth', 'auth', 'user', 'pass')
@@ -294,11 +314,10 @@ class RequestsTestCase(unittest.TestCase):
url = httpbin('digest-auth', 'auth', 'user', 'pass')
r = requests.get(url, auth=auth, stream=True)
- self.assertNotEqual(r.raw.read(), b'')
+ assert r.raw.read() != b''
r = requests.get(url, auth=auth, stream=False)
- self.assertEqual(r.raw.read(), b'')
-
+ assert r.raw.read() == b''
def test_DIGESTAUTH_WRONG_HTTP_401_GET(self):
@@ -306,15 +325,23 @@ class RequestsTestCase(unittest.TestCase):
url = httpbin('digest-auth', 'auth', 'user', 'pass')
r = requests.get(url, auth=auth)
- self.assertEqual(r.status_code, 401)
+ assert r.status_code == 401
r = requests.get(url)
- self.assertEqual(r.status_code, 401)
+ assert r.status_code == 401
s = requests.session()
s.auth = auth
r = s.get(url)
- self.assertEqual(r.status_code, 401)
+ assert r.status_code == 401
+
+ def test_DIGESTAUTH_QUOTES_QOP_VALUE(self):
+
+ auth = HTTPDigestAuth('user', 'pass')
+ url = httpbin('digest-auth', 'auth', 'user', 'pass')
+
+ r = requests.get(url, auth=auth)
+ assert '"auth"' in r.request.headers['Authorization']
def test_POSTBIN_GET_POST_FILES(self):
@@ -322,19 +349,17 @@ class RequestsTestCase(unittest.TestCase):
post1 = requests.post(url).raise_for_status()
post1 = requests.post(url, data={'some': 'data'})
- self.assertEqual(post1.status_code, 200)
+ assert post1.status_code == 200
with open('requirements.txt') as f:
post2 = requests.post(url, files={'some': f})
- self.assertEqual(post2.status_code, 200)
+ assert post2.status_code == 200
post4 = requests.post(url, data='[{"some": "json"}]')
- self.assertEqual(post4.status_code, 200)
+ assert post4.status_code == 200
- try:
- requests.post(url, files=['bad file data'])
- except ValueError:
- pass
+ with pytest.raises(ValueError):
+ requests.post(url, files = ['bad file data'])
def test_POSTBIN_GET_POST_FILES_WITH_DATA(self):
@@ -342,19 +367,17 @@ class RequestsTestCase(unittest.TestCase):
post1 = requests.post(url).raise_for_status()
post1 = requests.post(url, data={'some': 'data'})
- self.assertEqual(post1.status_code, 200)
+ assert post1.status_code == 200
with open('requirements.txt') as f:
post2 = requests.post(url, data={'some': 'data'}, files={'some': f})
- self.assertEqual(post2.status_code, 200)
+ assert post2.status_code == 200
post4 = requests.post(url, data='[{"some": "json"}]')
- self.assertEqual(post4.status_code, 200)
+ assert post4.status_code == 200
- try:
- requests.post(url, files=['bad file data'])
- except ValueError:
- pass
+ with pytest.raises(ValueError):
+ requests.post(url, files = ['bad file data'])
def test_conflicting_post_params(self):
url = httpbin('post')
@@ -364,14 +387,15 @@ class RequestsTestCase(unittest.TestCase):
def test_request_ok_set(self):
r = requests.get(httpbin('status', '404'))
- self.assertEqual(r.ok, False)
+ assert not r.ok
def test_status_raising(self):
r = requests.get(httpbin('status', '404'))
- self.assertRaises(requests.exceptions.HTTPError, r.raise_for_status)
+ with pytest.raises(requests.exceptions.HTTPError):
+ r.raise_for_status()
r = requests.get(httpbin('status', '500'))
- self.assertFalse(r.ok)
+ assert not r.ok
def test_decompress_gzip(self):
r = requests.get(httpbin('gzip'))
@@ -391,36 +415,36 @@ class RequestsTestCase(unittest.TestCase):
def test_urlencoded_get_query_multivalued_param(self):
r = requests.get(httpbin('get'), params=dict(test=['foo', 'baz']))
- self.assertEqual(r.status_code, 200)
- self.assertEqual(r.url, httpbin('get?test=foo&test=baz'))
+ assert r.status_code == 200
+ assert r.url == httpbin('get?test=foo&test=baz')
def test_different_encodings_dont_break_post(self):
r = requests.post(httpbin('post'),
data={'stuff': json.dumps({'a': 123})},
params={'blah': 'asdf1234'},
files={'file': ('test_requests.py', open(__file__, 'rb'))})
- self.assertEqual(r.status_code, 200)
+ assert r.status_code == 200
def test_unicode_multipart_post(self):
r = requests.post(httpbin('post'),
data={'stuff': u'ëlïxr'},
files={'file': ('test_requests.py', open(__file__, 'rb'))})
- self.assertEqual(r.status_code, 200)
+ assert r.status_code == 200
r = requests.post(httpbin('post'),
data={'stuff': u'ëlïxr'.encode('utf-8')},
files={'file': ('test_requests.py', open(__file__, 'rb'))})
- self.assertEqual(r.status_code, 200)
+ assert r.status_code == 200
r = requests.post(httpbin('post'),
data={'stuff': 'elixr'},
files={'file': ('test_requests.py', open(__file__, 'rb'))})
- self.assertEqual(r.status_code, 200)
+ assert r.status_code == 200
r = requests.post(httpbin('post'),
data={'stuff': 'elixr'.encode('utf-8')},
files={'file': ('test_requests.py', open(__file__, 'rb'))})
- self.assertEqual(r.status_code, 200)
+ assert r.status_code == 200
def test_unicode_multipart_post_fieldnames(self):
filename = os.path.splitext(__file__)[0] + '.py'
@@ -430,8 +454,13 @@ class RequestsTestCase(unittest.TestCase):
files={'file': ('test_requests.py',
open(filename, 'rb'))})
prep = r.prepare()
- self.assertTrue(b'name="stuff"' in prep.body)
- self.assertFalse(b'name="b\'stuff\'"' in prep.body)
+ assert b'name="stuff"' in prep.body
+ assert b'name="b\'stuff\'"' not in prep.body
+
+ def test_unicode_method_name(self):
+ files = {'file': open('test_requests.py', 'rb')}
+ r = requests.request(method=u'POST', url=httpbin('post'), files=files)
+ assert r.status_code == 200
def test_custom_content_type(self):
r = requests.post(httpbin('post'),
@@ -439,8 +468,8 @@ class RequestsTestCase(unittest.TestCase):
files={'file1': ('test_requests.py', open(__file__, 'rb')),
'file2': ('test_requests', open(__file__, 'rb'),
'text/py-content-type')})
- self.assertEqual(r.status_code, 200)
- self.assertTrue(b"text/py-content-type" in r.request.body)
+ assert r.status_code == 200
+ assert b"text/py-content-type" in r.request.body
def test_hook_receives_request_arguments(self):
def hook(resp, **kwargs):
@@ -449,6 +478,25 @@ class RequestsTestCase(unittest.TestCase):
requests.Request('GET', HTTPBIN, hooks={'response': hook})
+ def test_session_hooks_are_used_with_no_request_hooks(self):
+ hook = lambda x, *args, **kwargs: x
+ s = requests.Session()
+ s.hooks['response'].append(hook)
+ r = requests.Request('GET', HTTPBIN)
+ prep = s.prepare_request(r)
+ assert prep.hooks['response'] != []
+ assert prep.hooks['response'] == [hook]
+
+ def test_session_hooks_are_overriden_by_request_hooks(self):
+ hook1 = lambda x, *args, **kwargs: x
+ hook2 = lambda x, *args, **kwargs: x
+ assert hook1 is not hook2
+ s = requests.Session()
+ s.hooks['response'].append(hook2)
+ r = requests.Request('GET', HTTPBIN, hooks={'response': [hook1]})
+ prep = s.prepare_request(r)
+ assert prep.hooks['response'] == [hook1]
+
def test_prepared_request_hook(self):
def hook(resp, **kwargs):
resp.hook_working = True
@@ -461,7 +509,7 @@ class RequestsTestCase(unittest.TestCase):
s.proxies = getproxies()
resp = s.send(prep)
- self.assertTrue(hasattr(resp, 'hook_working'))
+ assert hasattr(resp, 'hook_working')
def test_prepared_from_session(self):
class DummyAuth(requests.auth.AuthBase):
@@ -470,7 +518,7 @@ class RequestsTestCase(unittest.TestCase):
return r
req = requests.Request('GET', httpbin('headers'))
- self.assertEqual(req.auth, None)
+ assert not req.auth
s = requests.Session()
s.auth = DummyAuth()
@@ -478,7 +526,7 @@ class RequestsTestCase(unittest.TestCase):
prep = s.prepare_request(req)
resp = s.send(prep)
- self.assertTrue(resp.json()['headers']['Dummy-Auth-Test'], 'dummy-auth-test-ok')
+ assert resp.json()['headers']['Dummy-Auth-Test'] == 'dummy-auth-test-ok'
def test_links(self):
r = requests.Response()
@@ -502,7 +550,7 @@ class RequestsTestCase(unittest.TestCase):
'x-ratelimit-limit': '60',
'x-ratelimit-remaining': '57'
}
- self.assertEqual(r.links['next']['rel'], 'next')
+ assert r.links['next']['rel'] == 'next'
def test_cookie_parameters(self):
key = 'some_cookie'
@@ -514,20 +562,108 @@ class RequestsTestCase(unittest.TestCase):
jar = requests.cookies.RequestsCookieJar()
jar.set(key, value, secure=secure, domain=domain, rest=rest)
- self.assertEqual(len(jar), 1)
- self.assertTrue('some_cookie' in jar)
+ assert len(jar) == 1
+ assert 'some_cookie' in jar
cookie = list(jar)[0]
- self.assertEqual(cookie.secure, secure)
- self.assertEqual(cookie.domain, domain)
- self.assertEqual(cookie._rest['HttpOnly'], rest['HttpOnly'])
+ assert cookie.secure == secure
+ assert cookie.domain == domain
+ assert cookie._rest['HttpOnly'] == rest['HttpOnly']
+
+ def test_cookie_as_dict_keeps_len(self):
+ key = 'some_cookie'
+ value = 'some_value'
+
+ key1 = 'some_cookie1'
+ value1 = 'some_value1'
+
+ jar = requests.cookies.RequestsCookieJar()
+ jar.set(key, value)
+ jar.set(key1, value1)
+
+ d1 = dict(jar)
+ d2 = dict(jar.iteritems())
+ d3 = dict(jar.items())
+
+ assert len(jar) == 2
+ assert len(d1) == 2
+ assert len(d2) == 2
+ assert len(d3) == 2
+
+ def test_cookie_as_dict_keeps_items(self):
+ key = 'some_cookie'
+ value = 'some_value'
+
+ key1 = 'some_cookie1'
+ value1 = 'some_value1'
+
+ jar = requests.cookies.RequestsCookieJar()
+ jar.set(key, value)
+ jar.set(key1, value1)
+
+ d1 = dict(jar)
+ d2 = dict(jar.iteritems())
+ d3 = dict(jar.items())
+
+ assert d1['some_cookie'] == 'some_value'
+ assert d2['some_cookie'] == 'some_value'
+ assert d3['some_cookie1'] == 'some_value1'
+
+ def test_cookie_as_dict_keys(self):
+ key = 'some_cookie'
+ value = 'some_value'
+
+ key1 = 'some_cookie1'
+ value1 = 'some_value1'
+
+ jar = requests.cookies.RequestsCookieJar()
+ jar.set(key, value)
+ jar.set(key1, value1)
+
+ keys = jar.keys()
+ assert keys == list(keys)
+ # make sure one can use keys multiple times
+ assert list(keys) == list(keys)
+
+ def test_cookie_as_dict_values(self):
+ key = 'some_cookie'
+ value = 'some_value'
+
+ key1 = 'some_cookie1'
+ value1 = 'some_value1'
+
+ jar = requests.cookies.RequestsCookieJar()
+ jar.set(key, value)
+ jar.set(key1, value1)
+
+ values = jar.values()
+ assert values == list(values)
+ # make sure one can use values multiple times
+ assert list(values) == list(values)
+
+ def test_cookie_as_dict_items(self):
+ key = 'some_cookie'
+ value = 'some_value'
+
+ key1 = 'some_cookie1'
+ value1 = 'some_value1'
+
+ jar = requests.cookies.RequestsCookieJar()
+ jar.set(key, value)
+ jar.set(key1, value1)
+
+ items = jar.items()
+ assert items == list(items)
+ # make sure one can use items multiple times
+ assert list(items) == list(items)
+
def test_time_elapsed_blank(self):
r = requests.get(httpbin('get'))
td = r.elapsed
total_seconds = ((td.microseconds + (td.seconds + td.days * 24 * 3600)
* 10**6) / 10**6)
- self.assertTrue(total_seconds > 0.0)
+ assert total_seconds > 0.0
def test_response_is_iterable(self):
r = requests.Response()
@@ -538,27 +674,55 @@ class RequestsTestCase(unittest.TestCase):
return read_(amt)
setattr(io, 'read', read_mock)
r.raw = io
- self.assertTrue(next(iter(r)))
+ assert next(iter(r))
io.close()
+ def test_request_and_response_are_pickleable(self):
+ r = requests.get(httpbin('get'))
+
+ # verify we can pickle the original request
+ assert pickle.loads(pickle.dumps(r.request))
+
+ # verify we can pickle the response and that we have access to
+ # the original request.
+ pr = pickle.loads(pickle.dumps(r))
+ assert r.request.url == pr.request.url
+ assert r.request.headers == pr.request.headers
+
def test_get_auth_from_url(self):
url = 'http://user:pass@complex.url.com/path?query=yes'
- self.assertEqual(('user', 'pass'),
- requests.utils.get_auth_from_url(url))
+ assert ('user', 'pass') == requests.utils.get_auth_from_url(url)
+
+ def test_get_auth_from_url_encoded_spaces(self):
+ url = 'http://user:pass%20pass@complex.url.com/path?query=yes'
+ assert ('user', 'pass pass') == requests.utils.get_auth_from_url(url)
+
+ def test_get_auth_from_url_not_encoded_spaces(self):
+ url = 'http://user:pass pass@complex.url.com/path?query=yes'
+ assert ('user', 'pass pass') == requests.utils.get_auth_from_url(url)
+
+ def test_get_auth_from_url_percent_chars(self):
+ url = 'http://user%25user:pass@complex.url.com/path?query=yes'
+ assert ('user%user', 'pass') == requests.utils.get_auth_from_url(url)
+
+ def test_get_auth_from_url_encoded_hashes(self):
+ url = 'http://user:pass%23pass@complex.url.com/path?query=yes'
+ assert ('user', 'pass#pass') == requests.utils.get_auth_from_url(url)
def test_cannot_send_unprepared_requests(self):
r = requests.Request(url=HTTPBIN)
- self.assertRaises(ValueError, requests.Session().send, r)
+ with pytest.raises(ValueError):
+ requests.Session().send(r)
def test_http_error(self):
error = requests.exceptions.HTTPError()
- self.assertEqual(error.response, None)
+ assert not error.response
response = requests.Response()
error = requests.exceptions.HTTPError(response=response)
- self.assertEqual(error.response, response)
+ assert error.response == response
error = requests.exceptions.HTTPError('message', response=response)
- self.assertEqual(str(error), 'message')
- self.assertEqual(error.response, response)
+ assert str(error) == 'message'
+ assert error.response == response
def test_session_pickling(self):
r = requests.Request('GET', httpbin('get'))
@@ -568,7 +732,7 @@ class RequestsTestCase(unittest.TestCase):
s.proxies = getproxies()
r = s.send(r.prepare())
- self.assertEqual(r.status_code, 200)
+ assert r.status_code == 200
def test_fixes_1329(self):
"""
@@ -579,30 +743,21 @@ class RequestsTestCase(unittest.TestCase):
s.headers.update({'accept': 'application/json'})
r = s.get(httpbin('get'))
headers = r.request.headers
- self.assertEqual(
- headers['accept'],
- 'application/json'
- )
- self.assertEqual(
- headers['Accept'],
- 'application/json'
- )
- self.assertEqual(
- headers['ACCEPT'],
- 'application/json'
- )
+ assert headers['accept'] == 'application/json'
+ assert headers['Accept'] == 'application/json'
+ assert headers['ACCEPT'] == 'application/json'
def test_uppercase_scheme_redirect(self):
parts = urlparse(httpbin('html'))
url = "HTTP://" + parts.netloc + parts.path
r = requests.get(httpbin('redirect-to'), params={'url': url})
- self.assertEqual(r.status_code, 200)
- self.assertEqual(r.url.lower(), url.lower())
+ assert r.status_code == 200
+ assert r.url.lower() == url.lower()
def test_transport_adapter_ordering(self):
s = requests.Session()
order = ['https://', 'http://']
- self.assertEqual(order, list(s.adapters))
+ assert order == list(s.adapters)
s.mount('http://git', HTTPAdapter())
s.mount('http://github', HTTPAdapter())
s.mount('http://github.com', HTTPAdapter())
@@ -615,7 +770,7 @@ class RequestsTestCase(unittest.TestCase):
'https://',
'http://',
]
- self.assertEqual(order, list(s.adapters))
+ assert order == list(s.adapters)
s.mount('http://gittip', HTTPAdapter())
s.mount('http://gittip.com', HTTPAdapter())
s.mount('http://gittip.com/about/', HTTPAdapter())
@@ -630,12 +785,12 @@ class RequestsTestCase(unittest.TestCase):
'https://',
'http://',
]
- self.assertEqual(order, list(s.adapters))
+ assert order == list(s.adapters)
s2 = requests.Session()
s2.adapters = {'http://': HTTPAdapter()}
s2.mount('https://', HTTPAdapter())
- self.assertTrue('http://' in s2.adapters)
- self.assertTrue('https://' in s2.adapters)
+ assert 'http://' in s2.adapters
+ assert 'https://' in s2.adapters
def test_header_remove_is_case_insensitive(self):
# From issue #1321
@@ -658,7 +813,7 @@ class RequestsTestCase(unittest.TestCase):
'exactly-------------sixty-----------three------------characters',
)
r = requests.Request('GET', url).prepare()
- self.assertEqual(r.url, url)
+ assert r.url == url
def test_header_keys_are_native(self):
headers = {u'unicode': 'blah', 'byte'.encode('ascii'): 'blah'}
@@ -667,8 +822,8 @@ class RequestsTestCase(unittest.TestCase):
# This is testing that they are builtin strings. A bit weird, but there
# we go.
- self.assertTrue('unicode' in p.headers.keys())
- self.assertTrue('byte' in p.headers.keys())
+ assert 'unicode' in p.headers.keys()
+ assert 'byte' in p.headers.keys()
def test_can_send_nonstring_objects_with_files(self):
data = {'a': 0.0}
@@ -676,42 +831,61 @@ class RequestsTestCase(unittest.TestCase):
r = requests.Request('POST', httpbin('post'), data=data, files=files)
p = r.prepare()
- self.assertTrue('multipart/form-data' in p.headers['Content-Type'])
+ assert 'multipart/form-data' in p.headers['Content-Type']
+
+ def test_autoset_header_values_are_native(self):
+ data = 'this is a string'
+ length = '16'
+ req = requests.Request('POST', httpbin('post'), data=data)
+ p = req.prepare()
+
+ assert p.headers['Content-Length'] == length
+
+ def test_oddball_schemes_dont_check_URLs(self):
+ test_urls = (
+ 'data:image/gif;base64,R0lGODlhAQABAHAAACH5BAUAAAAALAAAAAABAAEAAAICRAEAOw==',
+ 'file:///etc/passwd',
+ 'magnet:?xt=urn:btih:be08f00302bc2d1d3cfa3af02024fa647a271431',
+ )
+ for test_url in test_urls:
+ req = requests.Request('GET', test_url)
+ preq = req.prepare()
+ assert test_url == preq.url
class TestContentEncodingDetection(unittest.TestCase):
def test_none(self):
encodings = requests.utils.get_encodings_from_content('')
- self.assertEqual(len(encodings), 0)
+ assert not len(encodings)
def test_html_charset(self):
"""HTML5 meta charset attribute"""
content = '<meta charset="UTF-8">'
encodings = requests.utils.get_encodings_from_content(content)
- self.assertEqual(len(encodings), 1)
- self.assertEqual(encodings[0], 'UTF-8')
+ assert len(encodings) == 1
+ assert encodings[0] == 'UTF-8'
def test_html4_pragma(self):
"""HTML4 pragma directive"""
content = '<meta http-equiv="Content-type" content="text/html;charset=UTF-8">'
encodings = requests.utils.get_encodings_from_content(content)
- self.assertEqual(len(encodings), 1)
- self.assertEqual(encodings[0], 'UTF-8')
+ assert len(encodings) == 1
+ assert encodings[0] == 'UTF-8'
def test_xhtml_pragma(self):
"""XHTML 1.x served with text/html MIME type"""
content = '<meta http-equiv="Content-type" content="text/html;charset=UTF-8" />'
encodings = requests.utils.get_encodings_from_content(content)
- self.assertEqual(len(encodings), 1)
- self.assertEqual(encodings[0], 'UTF-8')
+ assert len(encodings) == 1
+ assert encodings[0] == 'UTF-8'
def test_xml(self):
"""XHTML 1.x served as XML"""
content = '<?xml version="1.0" encoding="UTF-8"?>'
encodings = requests.utils.get_encodings_from_content(content)
- self.assertEqual(len(encodings), 1)
- self.assertEqual(encodings[0], 'UTF-8')
+ assert len(encodings) == 1
+ assert encodings[0] == 'UTF-8'
def test_precedence(self):
content = '''
@@ -720,44 +894,44 @@ class TestContentEncodingDetection(unittest.TestCase):
<meta http-equiv="Content-type" content="text/html;charset=HTML4" />
'''.strip()
encodings = requests.utils.get_encodings_from_content(content)
- self.assertEqual(encodings, ['HTML5', 'HTML4', 'XML'])
+ assert encodings == ['HTML5', 'HTML4', 'XML']
class TestCaseInsensitiveDict(unittest.TestCase):
def test_mapping_init(self):
cid = CaseInsensitiveDict({'Foo': 'foo','BAr': 'bar'})
- self.assertEqual(len(cid), 2)
- self.assertTrue('foo' in cid)
- self.assertTrue('bar' in cid)
+ assert len(cid) == 2
+ assert 'foo' in cid
+ assert 'bar' in cid
def test_iterable_init(self):
cid = CaseInsensitiveDict([('Foo', 'foo'), ('BAr', 'bar')])
- self.assertEqual(len(cid), 2)
- self.assertTrue('foo' in cid)
- self.assertTrue('bar' in cid)
+ assert len(cid) == 2
+ assert 'foo' in cid
+ assert 'bar' in cid
def test_kwargs_init(self):
cid = CaseInsensitiveDict(FOO='foo', BAr='bar')
- self.assertEqual(len(cid), 2)
- self.assertTrue('foo' in cid)
- self.assertTrue('bar' in cid)
+ assert len(cid) == 2
+ assert 'foo' in cid
+ assert 'bar' in cid
def test_docstring_example(self):
cid = CaseInsensitiveDict()
cid['Accept'] = 'application/json'
- self.assertEqual(cid['aCCEPT'], 'application/json')
- self.assertEqual(list(cid), ['Accept'])
+ assert cid['aCCEPT'] == 'application/json'
+ assert list(cid) == ['Accept']
def test_len(self):
cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'})
cid['A'] = 'a'
- self.assertEqual(len(cid), 2)
+ assert len(cid) == 2
def test_getitem(self):
cid = CaseInsensitiveDict({'Spam': 'blueval'})
- self.assertEqual(cid['spam'], 'blueval')
- self.assertEqual(cid['SPAM'], 'blueval')
+ assert cid['spam'] == 'blueval'
+ assert cid['SPAM'] == 'blueval'
def test_fixes_649(self):
"""__setitem__ should behave case-insensitively."""
@@ -766,74 +940,68 @@ class TestCaseInsensitiveDict(unittest.TestCase):
cid['Spam'] = 'twoval'
cid['sPAM'] = 'redval'
cid['SPAM'] = 'blueval'
- self.assertEqual(cid['spam'], 'blueval')
- self.assertEqual(cid['SPAM'], 'blueval')
- self.assertEqual(list(cid.keys()), ['SPAM'])
+ assert cid['spam'] == 'blueval'
+ assert cid['SPAM'] == 'blueval'
+ assert list(cid.keys()) == ['SPAM']
def test_delitem(self):
cid = CaseInsensitiveDict()
cid['Spam'] = 'someval'
del cid['sPam']
- self.assertFalse('spam' in cid)
- self.assertEqual(len(cid), 0)
+ assert 'spam' not in cid
+ assert len(cid) == 0
def test_contains(self):
cid = CaseInsensitiveDict()
cid['Spam'] = 'someval'
- self.assertTrue('Spam' in cid)
- self.assertTrue('spam' in cid)
- self.assertTrue('SPAM' in cid)
- self.assertTrue('sPam' in cid)
- self.assertFalse('notspam' in cid)
+ assert 'Spam' in cid
+ assert 'spam' in cid
+ assert 'SPAM' in cid
+ assert 'sPam' in cid
+ assert 'notspam' not in cid
def test_get(self):
cid = CaseInsensitiveDict()
cid['spam'] = 'oneval'
cid['SPAM'] = 'blueval'
- self.assertEqual(cid.get('spam'), 'blueval')
- self.assertEqual(cid.get('SPAM'), 'blueval')
- self.assertEqual(cid.get('sPam'), 'blueval')
- self.assertEqual(cid.get('notspam', 'default'), 'default')
+ assert cid.get('spam') == 'blueval'
+ assert cid.get('SPAM') == 'blueval'
+ assert cid.get('sPam') == 'blueval'
+ assert cid.get('notspam', 'default') == 'default'
def test_update(self):
cid = CaseInsensitiveDict()
cid['spam'] = 'blueval'
cid.update({'sPam': 'notblueval'})
- self.assertEqual(cid['spam'], 'notblueval')
+ assert cid['spam'] == 'notblueval'
cid = CaseInsensitiveDict({'Foo': 'foo','BAr': 'bar'})
cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'})
- self.assertEqual(len(cid), 2)
- self.assertEqual(cid['foo'], 'anotherfoo')
- self.assertEqual(cid['bar'], 'anotherbar')
+ assert len(cid) == 2
+ assert cid['foo'] == 'anotherfoo'
+ assert cid['bar'] == 'anotherbar'
def test_update_retains_unchanged(self):
cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'})
cid.update({'foo': 'newfoo'})
- self.assertEquals(cid['bar'], 'bar')
+ assert cid['bar'] == 'bar'
def test_iter(self):
cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'})
keys = frozenset(['Spam', 'Eggs'])
- self.assertEqual(frozenset(iter(cid)), keys)
+ assert frozenset(iter(cid)) == keys
def test_equality(self):
cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'})
othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'})
- self.assertEqual(cid, othercid)
+ assert cid == othercid
del othercid['spam']
- self.assertNotEqual(cid, othercid)
- self.assertEqual(cid, {'spam': 'blueval', 'eggs': 'redval'})
+ assert cid != othercid
+ assert cid == {'spam': 'blueval', 'eggs': 'redval'}
def test_setdefault(self):
cid = CaseInsensitiveDict({'Spam': 'blueval'})
- self.assertEqual(
- cid.setdefault('spam', 'notblueval'),
- 'blueval'
- )
- self.assertEqual(
- cid.setdefault('notspam', 'notblueval'),
- 'notblueval'
- )
+ assert cid.setdefault('spam', 'notblueval') == 'blueval'
+ assert cid.setdefault('notspam', 'notblueval') == 'notblueval'
def test_lower_items(self):
cid = CaseInsensitiveDict({
@@ -842,7 +1010,7 @@ class TestCaseInsensitiveDict(unittest.TestCase):
})
keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items())
lowerkeyset = frozenset(['accept', 'user-agent'])
- self.assertEqual(keyset, lowerkeyset)
+ assert keyset == lowerkeyset
def test_preserve_key_case(self):
cid = CaseInsensitiveDict({
@@ -850,9 +1018,9 @@ class TestCaseInsensitiveDict(unittest.TestCase):
'user-Agent': 'requests',
})
keyset = frozenset(['Accept', 'user-Agent'])
- self.assertEqual(frozenset(i[0] for i in cid.items()), keyset)
- self.assertEqual(frozenset(cid.keys()), keyset)
- self.assertEqual(frozenset(cid), keyset)
+ assert frozenset(i[0] for i in cid.items()) == keyset
+ assert frozenset(cid.keys()) == keyset
+ assert frozenset(cid) == keyset
def test_preserve_last_key_case(self):
cid = CaseInsensitiveDict({
@@ -862,9 +1030,140 @@ class TestCaseInsensitiveDict(unittest.TestCase):
cid.update({'ACCEPT': 'application/json'})
cid['USER-AGENT'] = 'requests'
keyset = frozenset(['ACCEPT', 'USER-AGENT'])
- self.assertEqual(frozenset(i[0] for i in cid.items()), keyset)
- self.assertEqual(frozenset(cid.keys()), keyset)
- self.assertEqual(frozenset(cid), keyset)
+ assert frozenset(i[0] for i in cid.items()) == keyset
+ assert frozenset(cid.keys()) == keyset
+ assert frozenset(cid) == keyset
+
+
+class UtilsTestCase(unittest.TestCase):
+
+ def test_super_len_io_streams(self):
+ """ Ensures that we properly deal with different kinds of IO streams. """
+ # uses StringIO or io.StringIO (see import above)
+ from io import BytesIO
+ from requests.utils import super_len
+
+ assert super_len(StringIO.StringIO()) == 0
+ assert super_len(StringIO.StringIO('with so much drama in the LBC')) == 29
+
+ assert super_len(BytesIO()) == 0
+ assert super_len(BytesIO(b"it's kinda hard bein' snoop d-o-double-g")) == 40
+
+ try:
+ import cStringIO
+ except ImportError:
+ pass
+ else:
+ assert super_len(cStringIO.StringIO('but some how, some way...')) == 25
+
+ def test_get_environ_proxies_ip_ranges(self):
+ """ Ensures that IP addresses are correctly matches with ranges in no_proxy variable """
+ from requests.utils import get_environ_proxies
+ os.environ['no_proxy'] = "192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1"
+ assert get_environ_proxies('http://192.168.0.1:5000/') == {}
+ assert get_environ_proxies('http://192.168.0.1/') == {}
+ assert get_environ_proxies('http://172.16.1.1/') == {}
+ assert get_environ_proxies('http://172.16.1.1:5000/') == {}
+ assert get_environ_proxies('http://192.168.1.1:5000/') != {}
+ assert get_environ_proxies('http://192.168.1.1/') != {}
+
+ def test_get_environ_proxies(self):
+ """ Ensures that IP addresses are correctly matches with ranges in no_proxy variable """
+ from requests.utils import get_environ_proxies
+ os.environ['no_proxy'] = "127.0.0.1,localhost.localdomain,192.168.0.0/24,172.16.1.1"
+ assert get_environ_proxies('http://localhost.localdomain:5000/v1.0/') == {}
+ assert get_environ_proxies('http://www.requests.com/') != {}
+
+ def test_is_ipv4_address(self):
+ from requests.utils import is_ipv4_address
+ assert is_ipv4_address('8.8.8.8')
+ assert not is_ipv4_address('8.8.8.8.8')
+ assert not is_ipv4_address('localhost.localdomain')
+
+ def test_is_valid_cidr(self):
+ from requests.utils import is_valid_cidr
+ assert not is_valid_cidr('8.8.8.8')
+ assert is_valid_cidr('192.168.1.0/24')
+
+ def test_dotted_netmask(self):
+ from requests.utils import dotted_netmask
+ assert dotted_netmask(8) == '255.0.0.0'
+ assert dotted_netmask(24) == '255.255.255.0'
+ assert dotted_netmask(25) == '255.255.255.128'
+
+ def test_address_in_network(self):
+ from requests.utils import address_in_network
+ assert address_in_network('192.168.1.1', '192.168.1.0/24')
+ assert not address_in_network('172.16.0.1', '192.168.1.0/24')
+
+ def test_get_auth_from_url(self):
+ """ Ensures that username and password in well-encoded URI as per RFC 3986 are correclty extracted """
+ from requests.utils import get_auth_from_url
+ from requests.compat import quote
+ percent_encoding_test_chars = "%!*'();:@&=+$,/?#[] "
+ url_address = "request.com/url.html#test"
+ url = "http://" + quote(percent_encoding_test_chars, '') + ':' + quote(percent_encoding_test_chars, '') + '@' + url_address
+ (username, password) = get_auth_from_url(url)
+ assert username == percent_encoding_test_chars
+ assert password == percent_encoding_test_chars
+
+
+class TestMorselToCookieExpires(unittest.TestCase):
+
+ """Tests for morsel_to_cookie when morsel contains expires."""
+
+ def test_expires_valid_str(self):
+ """Test case where we convert expires from string time."""
+
+ morsel = Morsel()
+ morsel['expires'] = 'Thu, 01-Jan-1970 00:00:01 GMT'
+ cookie = morsel_to_cookie(morsel)
+ assert cookie.expires == 1
+
+ def test_expires_invalid_int(self):
+ """Test case where an invalid type is passed for expires."""
+
+ morsel = Morsel()
+ morsel['expires'] = 100
+ with pytest.raises(TypeError):
+ morsel_to_cookie(morsel)
+
+ def test_expires_invalid_str(self):
+ """Test case where an invalid string is input."""
+
+ morsel = Morsel()
+ morsel['expires'] = 'woops'
+ with pytest.raises(ValueError):
+ morsel_to_cookie(morsel)
+
+ def test_expires_none(self):
+ """Test case where expires is None."""
+
+ morsel = Morsel()
+ morsel['expires'] = None
+ cookie = morsel_to_cookie(morsel)
+ assert cookie.expires is None
+
+
+class TestMorselToCookieMaxAge(unittest.TestCase):
+
+ """Tests for morsel_to_cookie when morsel contains max-age."""
+
+ def test_max_age_valid_int(self):
+ """Test case where a valid max age in seconds is passed."""
+
+ morsel = Morsel()
+ morsel['max-age'] = 60
+ cookie = morsel_to_cookie(morsel)
+ assert isinstance(cookie.expires, int)
+
+ def test_max_age_invalid_str(self):
+ """Test case where a invalid max age is passed."""
+
+ morsel = Morsel()
+ morsel['max-age'] = 'woops'
+ with pytest.raises(TypeError):
+ morsel_to_cookie(morsel)
if __name__ == '__main__':