diff options
Diffstat (limited to 'tests/test_requests.py')
-rwxr-xr-x | tests/test_requests.py | 846 |
1 files changed, 846 insertions, 0 deletions
diff --git a/tests/test_requests.py b/tests/test_requests.py new file mode 100755 index 0000000..ac8329e --- /dev/null +++ b/tests/test_requests.py @@ -0,0 +1,846 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# from __future__ import unicode_literals + +# Path hack. +import sys +import os +sys.path.insert(0, os.path.abspath('..')) + +import json +import os +import sys +import unittest +import pickle + +import requests +from requests.compat import str, StringIO +# import envoy +from requests import HTTPError +from requests import get, post, head, put +from requests.auth import HTTPBasicAuth, HTTPDigestAuth + +if 'HTTPBIN_URL' not in os.environ: + os.environ['HTTPBIN_URL'] = 'http://httpbin.org/' + +HTTPBIN_URL = os.environ.get('HTTPBIN_URL') + + +def httpbin(*suffix): + """Returns url for HTTPBIN resource.""" + return HTTPBIN_URL + '/'.join(suffix) + + +SERVICES = (httpbin, ) + +_httpbin = False + + +class TestSetup(object): + """Requests test cases.""" + + # It goes to eleven. + _multiprocess_can_split_ = True + + def setUp(self): + + global _httpbin + + if (not 'HTTPBIN_URL' in os.environ) and not _httpbin: + # c = envoy.connect('httpbin %s' % (PORT)) + # time.sleep(1) + _httpbin = True + + +class RequestsTestSuite(TestSetup, unittest.TestCase): + """Requests test cases.""" + + def test_entry_points(self): + + requests.session + requests.session().get + requests.session().head + requests.get + requests.head + requests.put + requests.patch + requests.post + + def test_invalid_url(self): + self.assertRaises(ValueError, get, 'hiwpefhipowhefopw') + + def test_path_is_not_double_encoded(self): + request = requests.Request("http://0.0.0.0/get/test case") + + self.assertEqual(request.path_url, "/get/test%20case") + + def test_HTTP_200_OK_GET(self): + r = get(httpbin('get')) + self.assertEqual(r.status_code, 200) + + def test_response_sent(self): + r = get(httpbin('get')) + + self.assertTrue(r.request.sent) + + def test_HTTP_302_ALLOW_REDIRECT_GET(self): + r = get(httpbin('redirect', '1')) + self.assertEqual(r.status_code, 200) + + def test_HTTP_302_GET(self): + r = get(httpbin('redirect', '1'), allow_redirects=False) + self.assertEqual(r.status_code, 302) + + def test_HTTP_200_OK_GET_WITH_PARAMS(self): + heads = {'User-agent': 'Mozilla/5.0'} + + r = get(httpbin('user-agent'), headers=heads) + + assert heads['User-agent'] in r.text + self.assertEqual(r.status_code, 200) + + def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self): + heads = {'User-agent': 'Mozilla/5.0'} + + r = get(httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads) + self.assertEqual(r.status_code, 200) + + # def test_unicode_headers(self): + # # Simply calling requests with a unicode instance should simply work + # # when the characters are all representable using latin-1: + # heads = { u'User-Agent': u'Requests Test Suite' } + # requests.get(url=httpbin('get'), headers=heads) + + # # Characters outside latin-1 should raise an exception: + # heads = { u'User-Agent': u'\u30cd\u30c3\u30c8\u30ef\u30fc\u30af' } + # self.assertRaises(UnicodeEncodeError, requests.get, + # url=httpbin('get'), headers=heads) + + # def test_session_with_escaped_url(self): + # # Test a URL that contains percent-escaped characters + # # This URL should not be modified (double-escaped) + # # Tests: + # # - Quoted illegal characters ("%20" (' '), "%3C" ('<'), "%3E" ('>')) + # # - Quoted reserved characters ("%25" ('%'), "%23" ('#'), "%2F" ('/')) + # # - Quoted non-ASCII characters ("%C3%98", "%C3%A5") + # path_fully_escaped = '%3Ca%25b%23c%2Fd%3E/%C3%98%20%C3%A5' + # url = httpbin('get/' + path_fully_escaped) + # response = get(url) + # self.assertEqual(response.url, httpbin('get/' + path_fully_escaped)) + + # # Test that illegal characters in a path get properly percent-escaped + # # Tests: + # # - Bare illegal characters (space, '<') + # # - Bare non-ASCII characters ('\u00d8') + # path = u'<a%25b%23c%2Fd%3E/\u00d8 %C3%A5' + # url = httpbin('get/' + path) + # response = get(url) + # self.assertEqual(response.url, httpbin('get/' + path_fully_escaped)) + + # # Test that reserved characters in a path do not get percent-escaped + # # Tests: + # # - All reserved characters (RFC 3986), except '?', '#', '[' and ']', + # # which are not allowed in the path, and ';' which delimits + # # parameters. + # # All such characters must be allowed bare in path, and must not be + # # encoded. + # # - Special unreserved characters (RFC 3986), which should not be + # # encoded (even though it wouldn't hurt). + # path_reserved = '!$&\'()*+,/:=@-._~' + # url = httpbin('get/' + path_reserved) + # response = get(url) + # self.assertEqual(response.url, httpbin('get/' + path_reserved)) + + # # Test that percent-encoded unreserved characters in a path get + # # normalised to their un-encoded forms. + # path_unreserved = 'ABCDwxyz1234-._~' + # path_unreserved_escaped = '%41%42%43%44%77%78%79%7A%31%32%33%34%2D%2E%5F%7E' + # url = httpbin('get/' + path_unreserved_escaped) + # response = get(url) + # self.assertEqual(response.url, httpbin('get/' + path_unreserved)) + + # # Re-run all of the same tests on the query part of the URI + # query_fully_escaped = '%3Ca%25b%23c%2Fd%3E=%C3%98%20%C3%A5' + # url = httpbin('get/?' + query_fully_escaped) + # response = get(url) + # self.assertEqual(response.url, httpbin('get/?' + query_fully_escaped)) + + # query = u'<a%25b%23c%2Fd%3E=\u00d8 %C3%A5' + # url = httpbin('get/?' + query) + # response = get(url) + # self.assertEqual(response.url, httpbin('get/?' + query_fully_escaped)) + + # # The legal characters in query happens to be the same as in path + # query_reserved = '!$&\'()*+,/:=@-._~' + # url = httpbin('get/?' + query_reserved) + # response = get(url) + # self.assertEqual(response.url, httpbin('get/?' + query_reserved)) + + # query_unreserved = 'ABCDwxyz=1234-._~' + # query_unreserved_escaped = '%41%42%43%44%77%78%79%7A=%31%32%33%34%2D%2E%5F%7E' + # url = httpbin('get/?' + query_unreserved_escaped) + # response = get(url) + # self.assertEqual(response.url, httpbin('get/?' + query_unreserved)) + + def test_user_agent_transfers(self): + """Issue XX""" + + heads = { + 'User-agent': + 'Mozilla/5.0 (github.com/kennethreitz/requests)' + } + + r = get(httpbin('user-agent'), headers=heads) + self.assertTrue(heads['User-agent'] in r.text) + + heads = { + 'user-agent': + 'Mozilla/5.0 (github.com/kennethreitz/requests)' + } + + r = get(httpbin('user-agent'), headers=heads) + self.assertTrue(heads['user-agent'] in r.text) + + def test_HTTP_200_OK_HEAD(self): + r = head(httpbin('get')) + self.assertEqual(r.status_code, 200) + + def test_HTTP_200_OK_PUT(self): + r = put(httpbin('put')) + self.assertEqual(r.status_code, 200) + + def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self): + + for service in SERVICES: + + auth = ('user', 'pass') + url = service('basic-auth', 'user', 'pass') + + r = get(url, auth=auth) + self.assertEqual(r.status_code, 200) + + r = get(url) + self.assertEqual(r.status_code, 401) + + s = requests.session(auth=auth) + r = get(url, session=s) + self.assertEqual(r.status_code, 200) + + def test_BASICAUTH_HTTP_200_OK_GET(self): + + for service in SERVICES: + + auth = HTTPBasicAuth('user', 'pass') + url = service('basic-auth', 'user', 'pass') + + r = get(url, auth=auth) + self.assertEqual(r.status_code, 200) + + auth = ('user', 'pass') + r = get(url, auth=auth) + self.assertEqual(r.status_code, 200) + + r = get(url) + self.assertEqual(r.status_code, 401) + + s = requests.session(auth=auth) + r = get(url, session=s) + self.assertEqual(r.status_code, 200) + + def test_DIGESTAUTH_HTTP_200_OK_GET(self): + + for service in SERVICES: + + auth = HTTPDigestAuth('user', 'pass') + url = service('digest-auth', 'auth', 'user', 'pass') + + r = get(url, auth=auth) + self.assertEqual(r.status_code, 200) + + r = get(url) + self.assertEqual(r.status_code, 401) + + s = requests.session(auth=auth) + r = get(url, session=s) + self.assertEqual(r.status_code, 200) + + def test_DIGESTAUTH_WRONG_HTTP_401_GET(self): + + for service in SERVICES: + + auth = HTTPDigestAuth('user', 'wrongpass') + url = service('digest-auth', 'auth', 'user', 'pass') + + r = get(url, auth=auth) + self.assertEqual(r.status_code, 401) + + s = requests.session(auth=auth) + r = get(url, session=s) + self.assertEqual(r.status_code, 401) + + def test_POSTBIN_GET_POST_FILES(self): + + for service in SERVICES: + + url = service('post') + post1 = post(url).raise_for_status() + + post1 = post(url, data={'some': 'data'}) + self.assertEqual(post1.status_code, 200) + + with open(__file__) as f: + post2 = post(url, files={'some': f}) + self.assertEqual(post2.status_code, 200) + + post3 = post(url, data='[{"some": "json"}]') + self.assertEqual(post3.status_code, 200) + + def test_POSTBIN_GET_POST_FILES_WITH_PARAMS(self): + + for service in SERVICES: + + with open(__file__) as f: + url = service('post') + post1 = post(url, + files={'some': f}, + data={'some': 'data'}) + + self.assertEqual(post1.status_code, 200) + + def test_POSTBIN_GET_POST_FILES_WITH_HEADERS(self): + + for service in SERVICES: + + url = service('post') + + with open(__file__) as f: + + post2 = post(url, + files={'some': f}, + headers={'User-Agent': 'requests-tests'}) + + self.assertEqual(post2.status_code, 200) + + def test_nonzero_evaluation(self): + + for service in SERVICES: + + r = get(service('status', '500')) + self.assertEqual(bool(r), False) + + r = get(service('/get')) + self.assertEqual(bool(r), True) + + def test_request_ok_set(self): + + for service in SERVICES: + + r = get(service('status', '404')) + # print r.status_code + # r.raise_for_status() + self.assertEqual(r.ok, False) + + def test_status_raising(self): + r = get(httpbin('status', '404')) + self.assertRaises(HTTPError, r.raise_for_status) + + r = get(httpbin('status', '200')) + self.assertFalse(r.error) + r.raise_for_status() + + def test_default_status_raising(self): + config = {'danger_mode': True} + args = [httpbin('status', '404')] + kwargs = dict(config=config) + self.assertRaises(HTTPError, get, *args, **kwargs) + + r = get(httpbin('status', '200')) + self.assertEqual(r.status_code, 200) + + def test_decompress_gzip(self): + + r = get(httpbin('gzip')) + r.content.decode('ascii') + + def test_response_has_unicode_url(self): + + for service in SERVICES: + + url = service('get') + + response = get(url) + + assert isinstance(response.url, str) + + def test_unicode_get(self): + + for service in SERVICES: + + url = service('/get') + + get(url, params={'foo': 'føø'}) + get(url, params={'føø': 'føø'}) + get(url, params={'føø': 'føø'}) + get(url, params={'foo': 'foo'}) + get(service('ø'), params={'foo': 'foo'}) + + def test_httpauth_recursion(self): + + http_auth = HTTPBasicAuth('user', 'BADpass') + + for service in SERVICES: + r = get(service('basic-auth', 'user', 'pass'), auth=http_auth) + self.assertEqual(r.status_code, 401) + + def test_urlencoded_post_data(self): + + for service in SERVICES: + + r = post(service('post'), data=dict(test='fooaowpeuf')) + + self.assertEqual(r.status_code, 200) + self.assertEqual(r.headers['content-type'], 'application/json') + self.assertEqual(r.url, service('post')) + + rbody = json.loads(r.text) + + self.assertEqual(rbody.get('form'), dict(test='fooaowpeuf')) + self.assertEqual(rbody.get('data'), '') + + def test_nonurlencoded_post_data(self): + + for service in SERVICES: + + r = post(service('post'), data='fooaowpeuf') + + self.assertEqual(r.status_code, 200) + self.assertEqual(r.headers['content-type'], 'application/json') + self.assertEqual(r.url, service('post')) + + rbody = json.loads(r.text) + # Body wasn't valid url encoded data, so the server returns None as + # "form" and the raw body as "data". + + assert rbody.get('form') in (None, {}) + self.assertEqual(rbody.get('data'), 'fooaowpeuf') + + def test_urlencoded_post_querystring(self): + + for service in SERVICES: + + r = post(service('post'), params=dict(test='fooaowpeuf')) + + self.assertEqual(r.status_code, 200) + self.assertEqual(r.headers['content-type'], 'application/json') + self.assertEqual(r.url, service('post?test=fooaowpeuf')) + + rbody = json.loads(r.text) + self.assertEqual(rbody.get('form'), {}) # No form supplied + self.assertEqual(rbody.get('data'), '') + + def test_urlencoded_post_query_and_data(self): + + for service in SERVICES: + + r = post( + service('post'), + params=dict(test='fooaowpeuf'), + data=dict(test2="foobar")) + + self.assertEqual(r.status_code, 200) + self.assertEqual(r.headers['content-type'], 'application/json') + self.assertEqual(r.url, service('post?test=fooaowpeuf')) + + rbody = json.loads(r.text) + self.assertEqual(rbody.get('form'), dict(test2='foobar')) + self.assertEqual(rbody.get('data'), '') + + def test_nonurlencoded_postdata(self): + + for service in SERVICES: + + r = post(service('post'), data="foobar") + + self.assertEqual(r.status_code, 200) + self.assertEqual(r.headers['content-type'], 'application/json') + + rbody = json.loads(r.text) + + assert rbody.get('form') in (None, {}) + self.assertEqual(rbody.get('data'), 'foobar') + + def test_urlencoded_get_query_multivalued_param(self): + + for service in SERVICES: + + r = get(service('get'), params=dict(test=['foo', 'baz'])) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.url, service('get?test=foo&test=baz')) + + def test_urlencoded_post_querystring_multivalued(self): + + for service in SERVICES: + + r = post(service('post'), params=dict(test=['foo', 'baz'])) + self.assertEqual(r.status_code, 200) + self.assertEqual(r.headers['content-type'], 'application/json') + self.assertEqual(r.url, service('post?test=foo&test=baz')) + + rbody = json.loads(r.text) + self.assertEqual(rbody.get('form'), {}) # No form supplied + self.assertEqual(rbody.get('data'), '') + + def test_urlencoded_post_query_multivalued_and_data(self): + + for service in SERVICES: + + r = post( + service('post'), + params=dict(test=['foo', 'baz']), + data=dict(test2="foobar", test3=['foo', 'baz'])) + + self.assertEqual(r.status_code, 200) + self.assertEqual(r.headers['content-type'], 'application/json') + self.assertEqual(r.url, service('post?test=foo&test=baz')) + + # print(r.text) + # print('-----------------------') + + rbody = json.loads(r.text) + self.assertEqual(rbody.get('form'), dict(test2='foobar', test3=['foo', 'baz'])) + self.assertEqual(rbody.get('data'), '') + + def test_GET_no_redirect(self): + + for service in SERVICES: + + r = get(service('redirect', '3'), allow_redirects=False) + self.assertEqual(r.status_code, 302) + self.assertEqual(len(r.history), 0) + + def test_HEAD_no_redirect(self): + + for service in SERVICES: + + r = head(service('redirect', '3'), allow_redirects=False) + self.assertEqual(r.status_code, 302) + self.assertEqual(len(r.history), 0) + + def test_redirect_history(self): + + for service in SERVICES: + + r = get(service('redirect', '3')) + self.assertEqual(r.status_code, 200) + self.assertEqual(len(r.history), 3) + + def test_relative_redirect_history(self): + + for service in SERVICES: + + r = get(service('relative-redirect', '3')) + self.assertEqual(r.status_code, 200) + self.assertEqual(len(r.history), 3) + + def test_session_HTTP_200_OK_GET(self): + + s = requests.session() + r = get(httpbin('get'), session=s) + self.assertEqual(r.status_code, 200) + + def test_session_persistent_headers(self): + + heads = {'User-agent': 'Mozilla/5.0'} + + s = requests.session() + s.headers = heads + + # Make 2 requests from Session object, should send header both times + r1 = get(httpbin('user-agent'), session=s) + assert heads['User-agent'] in r1.text + + r2 = get(httpbin('user-agent'), session=s) + assert heads['User-agent'] in r2.text + + new_heads = {'User-agent': 'blah'} + r3 = get(httpbin('user-agent'), headers=new_heads, session=s) + assert new_heads['User-agent'] in r3.text + + self.assertEqual(r2.status_code, 200) + + def test_single_hook(self): + + def add_foo_header(args): + if not args.get('headers'): + args['headers'] = {} + + args['headers'].update({ + 'X-Foo': 'foo' + }) + + return args + + for service in SERVICES: + url = service('headers') + response = get(url=url, hooks={'args': add_foo_header}) + + assert 'foo' in response.text + + def test_multiple_hooks(self): + + def add_foo_header(args): + if not args.get('headers'): + args['headers'] = {} + + args['headers'].update({ + 'X-Foo': 'foo' + }) + + return args + + def add_bar_header(args): + if not args.get('headers'): + args['headers'] = {} + + args['headers'].update({ + 'X-Bar': 'bar' + }) + + return args + + for service in SERVICES: + url = service('headers') + + response = get(url=url, + hooks={ + 'args': [add_foo_header, add_bar_header] + } + ) + + assert 'foo' in response.text + assert 'bar' in response.text + + def test_session_persistent_cookies(self): + + s = requests.session() + + # Internally dispatched cookies are sent. + _c = {'kenneth': 'reitz', 'bessie': 'monke'} + r = get(httpbin('cookies'), cookies=_c, session=s) + r = get(httpbin('cookies'), session=s) + + # Those cookies persist transparently. + c = json.loads(r.text).get('cookies') + assert c == _c + + # Double check. + r = get(httpbin('cookies'), cookies={}, session=s) + c = json.loads(r.text).get('cookies') + assert c == _c + + # Remove a cookie by setting it's value to None. + r = get(httpbin('cookies'), cookies={'bessie': None}, session=s) + c = json.loads(r.text).get('cookies') + del _c['bessie'] + assert c == _c + + # Test session-level cookies. + s = requests.session(cookies=_c) + r = get(httpbin('cookies'), session=s) + c = json.loads(r.text).get('cookies') + assert c == _c + + # Have the server set a cookie. + r = get(httpbin('cookies', 'set', 'k', 'v'), allow_redirects=True, session=s) + c = json.loads(r.text).get('cookies') + + assert 'k' in c + + # And server-set cookie persistience. + r = get(httpbin('cookies'), session=s) + c = json.loads(r.text).get('cookies') + + assert 'k' in c + + def test_session_persistent_params(self): + + params = {'a': 'a_test'} + + s = requests.session() + s.params = params + + # Make 2 requests from Session object, should send header both times + r1 = get(httpbin('get'), session=s) + assert params['a'] in r1.text + + params2 = {'b': 'b_test'} + + r2 = get(httpbin('get'), params=params2, session=s) + assert params['a'] in r2.text + assert params2['b'] in r2.text + + params3 = {'b': 'b_test', 'a': None, 'c': 'c_test'} + + r3 = get(httpbin('get'), params=params3, session=s) + + assert not params['a'] in r3.text + assert params3['b'] in r3.text + assert params3['c'] in r3.text + + def test_session_pickling(self): + + s = requests.session( + headers={'header': 'value'}, + cookies={'a-cookie': 'cookie-value'}, + auth=('username', 'password')) + + ds = pickle.loads(pickle.dumps(s)) + + self.assertEqual(s.headers, ds.headers) + self.assertEqual(s.cookies, ds.cookies) + self.assertEqual(s.auth, ds.auth) + + def test_unpickled_session_requests(self): + s = requests.session() + r = get(httpbin('cookies', 'set', 'k', 'v'), allow_redirects=True, session=s) + c = json.loads(r.text).get('cookies') + assert 'k' in c + + ds = pickle.loads(pickle.dumps(s)) + r = get(httpbin('cookies'), session=ds) + c = json.loads(r.text).get('cookies') + assert 'k' in c + + ds1 = pickle.loads(pickle.dumps(requests.session())) + ds2 = pickle.loads(pickle.dumps(requests.session(prefetch=True))) + assert not ds1.prefetch + assert ds2.prefetch + + def test_invalid_content(self): + # WARNING: if you're using a terrible DNS provider (comcast), + # this will fail. + try: + hah = 'http://somedomainthatclearlydoesntexistg.com' + r = get(hah, allow_redirects=False) + except requests.ConnectionError: + pass # \o/ + else: + assert False + + config = {'safe_mode': True} + r = get(hah, allow_redirects=False, config=config) + assert r.content == None + + def test_cached_response(self): + + r1 = get(httpbin('get'), prefetch=False) + assert not r1._content + assert r1.content + assert r1.text + + r2 = get(httpbin('get'), prefetch=True) + assert r2._content + assert r2.content + assert r2.text + + def test_iter_lines(self): + + lines = (0, 2, 10, 100) + + for i in lines: + r = get(httpbin('stream', str(i)), prefetch=False) + lines = list(r.iter_lines()) + len_lines = len(lines) + + self.assertEqual(i, len_lines) + + # Tests that trailing whitespaces within lines do not get stripped. + # Tests that a trailing non-terminated line does not get stripped. + quote = ( + '''Agamemnon \n''' + '''\tWhy will he not upon our fair request\r\n''' + '''\tUntent his person and share the air with us?''' + ) + + # Make a request and monkey-patch its contents + r = get(httpbin('get')) + r.raw = StringIO(quote) + + lines = list(r.iter_lines()) + len_lines = len(lines) + self.assertEqual(len_lines, 3) + + joined = lines[0] + '\n' + lines[1] + '\r\n' + lines[2] + self.assertEqual(joined, quote) + + def test_safe_mode(self): + + safe = requests.session(config=dict(safe_mode=True)) + + # Safe mode creates empty responses for failed requests. + # Iterating on these responses should produce empty sequences + r = get('http://_/', session=safe) + self.assertEqual(list(r.iter_lines()), []) + assert isinstance(r.error, requests.exceptions.ConnectionError) + + r = get('http://_/', session=safe) + self.assertEqual(list(r.iter_content()), []) + assert isinstance(r.error, requests.exceptions.ConnectionError) + + # When not in safe mode, should raise Timeout exception + self.assertRaises( + requests.exceptions.Timeout, + get, + httpbin('stream', '1000'), timeout=0.0001) + + # In safe mode, should return a blank response + r = get(httpbin('stream', '1000'), timeout=0.0001, + config=dict(safe_mode=True)) + assert r.content is None + assert isinstance(r.error, requests.exceptions.Timeout) + + def test_upload_binary_data(self): + + requests.get(httpbin('post'), auth=('a', 'b'), data='\xff') + + def test_useful_exception_for_invalid_port(self): + # If we pass a legitimate URL with an invalid port, we should fail. + self.assertRaises( + ValueError, + get, + 'http://google.com:banana') + + def test_useful_exception_for_invalid_scheme(self): + + # If we pass a legitimate URL with a scheme not supported + # by requests, we should fail. + self.assertRaises( + ValueError, + get, + 'ftp://ftp.kernel.org/pub/') + + def test_can_have_none_in_header_values(self): + try: + # Don't choke on headers with none in the value. + requests.get(httpbin('headers'), headers={'Foo': None}) + except TypeError: + self.fail() + + def test_danger_mode_redirects(self): + s = requests.session() + s.config['danger_mode'] = True + s.get(httpbin('redirect', '4')) + + + def test_empty_response(self): + r = requests.get(httpbin('status', '404')) + r.text + + def test_no_content(self): + r = requests.get(httpbin('status', "0"), config={"safe_mode":True}) + r.content + r.content + +if __name__ == '__main__': + unittest.main() |