#!/usr/bin/env python # -*- coding: utf-8 -*- """Tests for Requests.""" from __future__ import division import json import os import unittest import pickle import requests from requests.auth import HTTPDigestAuth from requests.adapters import HTTPAdapter from requests.compat import str, cookielib from requests.cookies import cookiejar_from_dict from requests.exceptions import InvalidURL, MissingSchema from requests.structures import CaseInsensitiveDict try: import StringIO except ImportError: import io as StringIO HTTPBIN = os.environ.get('HTTPBIN_URL', 'http://httpbin.org/') def httpbin(*suffix): """Returns url for HTTPBIN resource.""" return HTTPBIN + '/'.join(suffix) class RequestsTestCase(unittest.TestCase): _multiprocess_can_split_ = True def setUp(self): """Create simple data set with headers.""" pass def tearDown(self): """Teardown.""" pass def test_entry_points(self): requests.session requests.session().get requests.session().head requests.get requests.head requests.put requests.patch requests.post def test_invalid_url(self): self.assertRaises(MissingSchema, requests.get, 'hiwpefhipowhefopw') self.assertRaises(InvalidURL, requests.get, 'http://') def test_basic_building(self): req = requests.Request() req.url = 'http://kennethreitz.org/' req.data = {'life': '42'} pr = req.prepare() assert pr.url == req.url assert pr.body == 'life=42' def test_no_content_length(self): get_req = requests.Request('GET', httpbin('get')).prepare() self.assertTrue('Content-Length' not in get_req.headers) head_req = requests.Request('HEAD', httpbin('head')).prepare() self.assertTrue('Content-Length' not in head_req.headers) def test_path_is_not_double_encoded(self): request = requests.Request('GET', "http://0.0.0.0/get/test case").prepare() self.assertEqual(request.path_url, "/get/test%20case") def test_params_are_added_before_fragment(self): request = requests.Request('GET', "http://example.com/path#fragment", params={"a": "b"}).prepare() self.assertEqual(request.url, "http://example.com/path?a=b#fragment") request = requests.Request('GET', "http://example.com/path?key=value#fragment", params={"a": "b"}).prepare() self.assertEqual(request.url, "http://example.com/path?key=value&a=b#fragment") def test_HTTP_200_OK_GET_ALTERNATIVE(self): r = requests.Request('GET', httpbin('get')) s = requests.Session() r = s.send(r.prepare()) self.assertEqual(r.status_code, 200) def test_HTTP_302_ALLOW_REDIRECT_GET(self): r = requests.get(httpbin('redirect', '1')) self.assertEqual(r.status_code, 200) # def test_HTTP_302_ALLOW_REDIRECT_POST(self): # r = requests.post(httpbin('status', '302'), data={'some': 'data'}) # self.assertEqual(r.status_code, 200) def test_HTTP_200_OK_GET_WITH_PARAMS(self): heads = {'User-agent': 'Mozilla/5.0'} r = requests.get(httpbin('user-agent'), headers=heads) self.assertTrue(heads['User-agent'] in r.text) self.assertEqual(r.status_code, 200) def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self): heads = {'User-agent': 'Mozilla/5.0'} r = requests.get(httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads) self.assertEqual(r.status_code, 200) def test_set_cookie_on_301(self): s = requests.session() url = httpbin('cookies/set?foo=bar') r = s.get(url) self.assertTrue(s.cookies['foo'] == 'bar') def test_cookie_sent_on_redirect(self): s = requests.session() s.get(httpbin('cookies/set?foo=bar')) r = s.get(httpbin('redirect/1')) # redirects to httpbin('get') self.assertTrue("Cookie" in r.json()["headers"]) def test_cookie_removed_on_expire(self): s = requests.session() s.get(httpbin('cookies/set?foo=bar')) self.assertTrue(s.cookies['foo'] == 'bar') s.get( httpbin('response-headers'), params={ 'Set-Cookie': 'foo=deleted; expires=Thu, 01-Jan-1970 00:00:01 GMT' } ) assert 'foo' not in s.cookies def test_request_cookie_overrides_session_cookie(self): s = requests.session() s.cookies['foo'] = 'bar' r = s.get(httpbin('cookies'), cookies={'foo': 'baz'}) assert r.json()['cookies']['foo'] == 'baz' # Session cookie should not be modified assert s.cookies['foo'] == 'bar' def test_generic_cookiejar_works(self): cj = cookielib.CookieJar() cookiejar_from_dict({'foo': 'bar'}, cj) s = requests.session() s.cookies = cj r = s.get(httpbin('cookies')) # Make sure the cookie was sent assert r.json()['cookies']['foo'] == 'bar' # Make sure the session cj is still the custom one assert s.cookies is cj def test_user_agent_transfers(self): heads = { 'User-agent': 'Mozilla/5.0 (github.com/kennethreitz/requests)' } r = requests.get(httpbin('user-agent'), headers=heads) self.assertTrue(heads['User-agent'] in r.text) heads = { 'user-agent': 'Mozilla/5.0 (github.com/kennethreitz/requests)' } r = requests.get(httpbin('user-agent'), headers=heads) self.assertTrue(heads['user-agent'] in r.text) def test_HTTP_200_OK_HEAD(self): r = requests.head(httpbin('get')) self.assertEqual(r.status_code, 200) def test_HTTP_200_OK_PUT(self): r = requests.put(httpbin('put')) self.assertEqual(r.status_code, 200) def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self): auth = ('user', 'pass') url = httpbin('basic-auth', 'user', 'pass') r = requests.get(url, auth=auth) self.assertEqual(r.status_code, 200) r = requests.get(url) self.assertEqual(r.status_code, 401) s = requests.session() s.auth = auth r = s.get(url) self.assertEqual(r.status_code, 200) def test_DIGEST_HTTP_200_OK_GET(self): auth = HTTPDigestAuth('user', 'pass') url = httpbin('digest-auth', 'auth', 'user', 'pass') r = requests.get(url, auth=auth) self.assertEqual(r.status_code, 200) r = requests.get(url) self.assertEqual(r.status_code, 401) s = requests.session() s.auth = auth r = s.get(url) self.assertEqual(r.status_code, 200) def test_DIGEST_STREAM(self): auth = HTTPDigestAuth('user', 'pass') url = httpbin('digest-auth', 'auth', 'user', 'pass') r = requests.get(url, auth=auth, stream=True) self.assertNotEqual(r.raw.read(), b'') r = requests.get(url, auth=auth, stream=False) self.assertEqual(r.raw.read(), b'') def test_DIGESTAUTH_WRONG_HTTP_401_GET(self): auth = HTTPDigestAuth('user', 'wrongpass') url = httpbin('digest-auth', 'auth', 'user', 'pass') r = requests.get(url, auth=auth) self.assertEqual(r.status_code, 401) r = requests.get(url) self.assertEqual(r.status_code, 401) s = requests.session() s.auth = auth r = s.get(url) self.assertEqual(r.status_code, 401) def test_POSTBIN_GET_POST_FILES(self): url = httpbin('post') post1 = requests.post(url).raise_for_status() post1 = requests.post(url, data={'some': 'data'}) self.assertEqual(post1.status_code, 200) with open('requirements.txt') as f: post2 = requests.post(url, files={'some': f}) self.assertEqual(post2.status_code, 200) post4 = requests.post(url, data='[{"some": "json"}]') self.assertEqual(post4.status_code, 200) try: requests.post(url, files=['bad file data']) except ValueError: pass def test_POSTBIN_GET_POST_FILES_WITH_DATA(self): url = httpbin('post') post1 = requests.post(url).raise_for_status() post1 = requests.post(url, data={'some': 'data'}) self.assertEqual(post1.status_code, 200) with open('requirements.txt') as f: post2 = requests.post(url, data={'some': 'data'}, files={'some': f}) self.assertEqual(post2.status_code, 200) post4 = requests.post(url, data='[{"some": "json"}]') self.assertEqual(post4.status_code, 200) try: requests.post(url, files=['bad file data']) except ValueError: pass def test_request_ok_set(self): r = requests.get(httpbin('status', '404')) self.assertEqual(r.ok, False) def test_status_raising(self): r = requests.get(httpbin('status', '404')) self.assertRaises(requests.exceptions.HTTPError, r.raise_for_status) r = requests.get(httpbin('status', '500')) self.assertFalse(r.ok) def test_decompress_gzip(self): r = requests.get(httpbin('gzip')) r.content.decode('ascii') def test_unicode_get(self): url = httpbin('/get') requests.get(url, params={'foo': 'føø'}) requests.get(url, params={'føø': 'føø'}) requests.get(url, params={'føø': 'føø'}) requests.get(url, params={'foo': 'foo'}) requests.get(httpbin('ø'), params={'foo': 'foo'}) def test_unicode_header_name(self): requests.put(httpbin('put'), headers={str('Content-Type'): 'application/octet-stream'}, data='\xff') # compat.str is unicode. def test_urlencoded_get_query_multivalued_param(self): r = requests.get(httpbin('get'), params=dict(test=['foo', 'baz'])) self.assertEqual(r.status_code, 200) self.assertEqual(r.url, httpbin('get?test=foo&test=baz')) def test_different_encodings_dont_break_post(self): r = requests.post(httpbin('post'), data={'stuff': json.dumps({'a': 123})}, params={'blah': 'asdf1234'}, files={'file': ('test_requests.py', open(__file__, 'rb'))}) self.assertEqual(r.status_code, 200) def test_unicode_multipart_post(self): r = requests.post(httpbin('post'), data={'stuff': u'ëlïxr'}, files={'file': ('test_requests.py', open(__file__, 'rb'))}) self.assertEqual(r.status_code, 200) r = requests.post(httpbin('post'), data={'stuff': u'ëlïxr'.encode('utf-8')}, files={'file': ('test_requests.py', open(__file__, 'rb'))}) self.assertEqual(r.status_code, 200) r = requests.post(httpbin('post'), data={'stuff': 'elixr'}, files={'file': ('test_requests.py', open(__file__, 'rb'))}) self.assertEqual(r.status_code, 200) r = requests.post(httpbin('post'), data={'stuff': 'elixr'.encode('utf-8')}, files={'file': ('test_requests.py', open(__file__, 'rb'))}) self.assertEqual(r.status_code, 200) def test_unicode_multipart_post_fieldnames(self): filename = os.path.splitext(__file__)[0] + '.py' r = requests.Request(method='POST', url=httpbin('post'), data={'stuff'.encode('utf-8'): 'elixr'}, files={'file': ('test_requests.py', open(filename, 'rb'))}) prep = r.prepare() self.assertTrue(b'name="stuff"' in prep.body) self.assertFalse(b'name="b\'stuff\'"' in prep.body) def test_custom_content_type(self): r = requests.post(httpbin('post'), data={'stuff': json.dumps({'a': 123})}, files={'file1': ('test_requests.py', open(__file__, 'rb')), 'file2': ('test_requests', open(__file__, 'rb'), 'text/py-content-type')}) self.assertEqual(r.status_code, 200) self.assertTrue(b"text/py-content-type" in r.request.body) def test_hook_receives_request_arguments(self): def hook(resp, **kwargs): assert resp is not None assert kwargs != {} requests.Request('GET', HTTPBIN, hooks={'response': hook}) def test_prepared_request_hook(self): def hook(resp, **kwargs): resp.hook_working = True return resp req = requests.Request('GET', HTTPBIN, hooks={'response': hook}) prep = req.prepare() s = requests.Session() resp = s.send(prep) self.assertTrue(hasattr(resp, 'hook_working')) def test_links(self): r = requests.Response() r.headers = { 'cache-control': 'public, max-age=60, s-maxage=60', 'connection': 'keep-alive', 'content-encoding': 'gzip', 'content-type': 'application/json; charset=utf-8', 'date': 'Sat, 26 Jan 2013 16:47:56 GMT', 'etag': '"6ff6a73c0e446c1f61614769e3ceb778"', 'last-modified': 'Sat, 26 Jan 2013 16:22:39 GMT', 'link': ('; rel="next", ; ' ' rel="last"'), 'server': 'GitHub.com', 'status': '200 OK', 'vary': 'Accept', 'x-content-type-options': 'nosniff', 'x-github-media-type': 'github.beta', 'x-ratelimit-limit': '60', 'x-ratelimit-remaining': '57' } self.assertEqual(r.links['next']['rel'], 'next') def test_cookie_parameters(self): key = 'some_cookie' value = 'some_value' secure = True domain = 'test.com' rest = {'HttpOnly': True} jar = requests.cookies.RequestsCookieJar() jar.set(key, value, secure=secure, domain=domain, rest=rest) self.assertEqual(len(jar), 1) self.assertTrue('some_cookie' in jar) cookie = list(jar)[0] self.assertEqual(cookie.secure, secure) self.assertEqual(cookie.domain, domain) self.assertEqual(cookie._rest['HttpOnly'], rest['HttpOnly']) def test_time_elapsed_blank(self): r = requests.get(httpbin('get')) td = r.elapsed total_seconds = ((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / 10**6) self.assertTrue(total_seconds > 0.0) def test_response_is_iterable(self): r = requests.Response() io = StringIO.StringIO('abc') read_ = io.read def read_mock(amt, decode_content=None): return read_(amt) setattr(io, 'read', read_mock) r.raw = io self.assertTrue(next(iter(r))) io.close() def test_get_auth_from_url(self): url = 'http://user:pass@complex.url.com/path?query=yes' self.assertEqual(('user', 'pass'), requests.utils.get_auth_from_url(url)) def test_cannot_send_unprepared_requests(self): r = requests.Request(url=HTTPBIN) self.assertRaises(ValueError, requests.Session().send, r) def test_http_error(self): error = requests.exceptions.HTTPError() self.assertEqual(error.response, None) response = requests.Response() error = requests.exceptions.HTTPError(response=response) self.assertEqual(error.response, response) error = requests.exceptions.HTTPError('message', response=response) self.assertEqual(str(error), 'message') self.assertEqual(error.response, response) def test_session_pickling(self): r = requests.Request('GET', httpbin('get')) s = requests.Session() s = pickle.loads(pickle.dumps(s)) r = s.send(r.prepare()) self.assertEqual(r.status_code, 200) def test_fixes_1329(self): """ Ensure that header updates are done case-insensitively. """ s = requests.Session() s.headers.update({'ACCEPT': 'BOGUS'}) s.headers.update({'accept': 'application/json'}) r = s.get(httpbin('get')) headers = r.request.headers # ASCII encode because of key comparison changes in py3 self.assertEqual( headers['accept'.encode('ascii')], 'application/json' ) self.assertEqual( headers['Accept'.encode('ascii')], 'application/json' ) self.assertEqual( headers['ACCEPT'.encode('ascii')], 'application/json' ) def test_transport_adapter_ordering(self): s = requests.Session() order = ['https://', 'http://'] self.assertEqual(order, list(s.adapters)) s.mount('http://git', HTTPAdapter()) s.mount('http://github', HTTPAdapter()) s.mount('http://github.com', HTTPAdapter()) s.mount('http://github.com/about/', HTTPAdapter()) order = [ 'http://github.com/about/', 'http://github.com', 'http://github', 'http://git', 'https://', 'http://', ] self.assertEqual(order, list(s.adapters)) s.mount('http://gittip', HTTPAdapter()) s.mount('http://gittip.com', HTTPAdapter()) s.mount('http://gittip.com/about/', HTTPAdapter()) order = [ 'http://github.com/about/', 'http://gittip.com/about/', 'http://github.com', 'http://gittip.com', 'http://github', 'http://gittip', 'http://git', 'https://', 'http://', ] self.assertEqual(order, list(s.adapters)) s2 = requests.Session() s2.adapters = {'http://': HTTPAdapter()} s2.mount('https://', HTTPAdapter()) self.assertTrue('http://' in s2.adapters) self.assertTrue('https://' in s2.adapters) def test_header_remove_is_case_insensitive(self): # From issue #1321 s = requests.Session() s.headers['foo'] = 'bar' r = s.get(httpbin('get'), headers={'FOO': None}) assert 'foo' not in r.request.headers def test_params_are_merged_case_sensitive(self): s = requests.Session() s.params['foo'] = 'bar' r = s.get(httpbin('get'), params={'FOO': 'bar'}) assert r.json()['args'] == {'foo': 'bar', 'FOO': 'bar'} def test_long_authinfo_in_url(self): url = 'http://{0}:{1}@{2}:9000/path?query#frag'.format( 'E8A3BE87-9E3F-4620-8858-95478E385B5B', 'EA770032-DA4D-4D84-8CE9-29C6D910BF1E', 'exactly-------------sixty-----------three------------characters', ) r = requests.Request('GET', url).prepare() self.assertEqual(r.url, url) class TestCaseInsensitiveDict(unittest.TestCase): def test_mapping_init(self): cid = CaseInsensitiveDict({'Foo': 'foo','BAr': 'bar'}) self.assertEqual(len(cid), 2) self.assertTrue('foo' in cid) self.assertTrue('bar' in cid) def test_iterable_init(self): cid = CaseInsensitiveDict([('Foo', 'foo'), ('BAr', 'bar')]) self.assertEqual(len(cid), 2) self.assertTrue('foo' in cid) self.assertTrue('bar' in cid) def test_kwargs_init(self): cid = CaseInsensitiveDict(FOO='foo', BAr='bar') self.assertEqual(len(cid), 2) self.assertTrue('foo' in cid) self.assertTrue('bar' in cid) def test_docstring_example(self): cid = CaseInsensitiveDict() cid['Accept'] = 'application/json' self.assertEqual(cid['aCCEPT'], 'application/json') self.assertEqual(list(cid), ['Accept']) def test_len(self): cid = CaseInsensitiveDict({'a': 'a', 'b': 'b'}) cid['A'] = 'a' self.assertEqual(len(cid), 2) def test_getitem(self): cid = CaseInsensitiveDict({'Spam': 'blueval'}) self.assertEqual(cid['spam'], 'blueval') self.assertEqual(cid['SPAM'], 'blueval') def test_fixes_649(self): """__setitem__ should behave case-insensitively.""" cid = CaseInsensitiveDict() cid['spam'] = 'oneval' cid['Spam'] = 'twoval' cid['sPAM'] = 'redval' cid['SPAM'] = 'blueval' self.assertEqual(cid['spam'], 'blueval') self.assertEqual(cid['SPAM'], 'blueval') self.assertEqual(list(cid.keys()), ['SPAM']) def test_delitem(self): cid = CaseInsensitiveDict() cid['Spam'] = 'someval' del cid['sPam'] self.assertFalse('spam' in cid) self.assertEqual(len(cid), 0) def test_contains(self): cid = CaseInsensitiveDict() cid['Spam'] = 'someval' self.assertTrue('Spam' in cid) self.assertTrue('spam' in cid) self.assertTrue('SPAM' in cid) self.assertTrue('sPam' in cid) self.assertFalse('notspam' in cid) def test_get(self): cid = CaseInsensitiveDict() cid['spam'] = 'oneval' cid['SPAM'] = 'blueval' self.assertEqual(cid.get('spam'), 'blueval') self.assertEqual(cid.get('SPAM'), 'blueval') self.assertEqual(cid.get('sPam'), 'blueval') self.assertEqual(cid.get('notspam', 'default'), 'default') def test_update(self): cid = CaseInsensitiveDict() cid['spam'] = 'blueval' cid.update({'sPam': 'notblueval'}) self.assertEqual(cid['spam'], 'notblueval') cid = CaseInsensitiveDict({'Foo': 'foo','BAr': 'bar'}) cid.update({'fOO': 'anotherfoo', 'bAR': 'anotherbar'}) self.assertEqual(len(cid), 2) self.assertEqual(cid['foo'], 'anotherfoo') self.assertEqual(cid['bar'], 'anotherbar') def test_update_retains_unchanged(self): cid = CaseInsensitiveDict({'foo': 'foo', 'bar': 'bar'}) cid.update({'foo': 'newfoo'}) self.assertEquals(cid['bar'], 'bar') def test_iter(self): cid = CaseInsensitiveDict({'Spam': 'spam', 'Eggs': 'eggs'}) keys = frozenset(['Spam', 'Eggs']) self.assertEqual(frozenset(iter(cid)), keys) def test_equality(self): cid = CaseInsensitiveDict({'SPAM': 'blueval', 'Eggs': 'redval'}) othercid = CaseInsensitiveDict({'spam': 'blueval', 'eggs': 'redval'}) self.assertEqual(cid, othercid) del othercid['spam'] self.assertNotEqual(cid, othercid) self.assertEqual(cid, {'spam': 'blueval', 'eggs': 'redval'}) def test_setdefault(self): cid = CaseInsensitiveDict({'Spam': 'blueval'}) self.assertEqual( cid.setdefault('spam', 'notblueval'), 'blueval' ) self.assertEqual( cid.setdefault('notspam', 'notblueval'), 'notblueval' ) def test_lower_items(self): cid = CaseInsensitiveDict({ 'Accept': 'application/json', 'user-Agent': 'requests', }) keyset = frozenset(lowerkey for lowerkey, v in cid.lower_items()) lowerkeyset = frozenset(['accept', 'user-agent']) self.assertEqual(keyset, lowerkeyset) def test_preserve_key_case(self): cid = CaseInsensitiveDict({ 'Accept': 'application/json', 'user-Agent': 'requests', }) keyset = frozenset(['Accept', 'user-Agent']) self.assertEqual(frozenset(i[0] for i in cid.items()), keyset) self.assertEqual(frozenset(cid.keys()), keyset) self.assertEqual(frozenset(cid), keyset) def test_preserve_last_key_case(self): cid = CaseInsensitiveDict({ 'Accept': 'application/json', 'user-Agent': 'requests', }) cid.update({'ACCEPT': 'application/json'}) cid['USER-AGENT'] = 'requests' keyset = frozenset(['ACCEPT', 'USER-AGENT']) self.assertEqual(frozenset(i[0] for i in cid.items()), keyset) self.assertEqual(frozenset(cid.keys()), keyset) self.assertEqual(frozenset(cid), keyset) if __name__ == '__main__': unittest.main()