diff options
Diffstat (limited to 'tests')
-rwxr-xr-x | tests/test_cookies.py | 207 | ||||
-rwxr-xr-x | tests/test_requests.py | 907 | ||||
-rwxr-xr-x | tests/test_requests_async.py | 67 | ||||
-rw-r--r-- | tests/test_requests_ext.py | 131 | ||||
-rwxr-xr-x | tests/test_requests_https.py | 31 |
5 files changed, 0 insertions, 1343 deletions
diff --git a/tests/test_cookies.py b/tests/test_cookies.py deleted file mode 100755 index dca7d2c..0000000 --- a/tests/test_cookies.py +++ /dev/null @@ -1,207 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import sys -import json -import os -import tempfile -import unittest - -# Path hack. -sys.path.insert(0, os.path.abspath('..')) -import requests -from requests.compat import cookielib - -# More hacks -sys.path.append('.') -from test_requests import httpbin, TestBaseMixin - -class CookieTests(TestBaseMixin, unittest.TestCase): - - def test_cookies_from_response(self): - """Basic test that we correctly parse received cookies in the Response object.""" - r = requests.get(httpbin('cookies', 'set', 'myname', 'myvalue')) - - # test deprecated dictionary interface - self.assertEqual(r.cookies['myname'], 'myvalue') - # test CookieJar interface - jar = r.cookies - self.assertEqual(len(jar), 1) - cookie_from_jar = list(jar)[0] - self.assertCookieHas(cookie_from_jar, name='myname', value='myvalue') - - q = requests.get(httpbin('cookies'), cookies=jar) - self.assertEqual(json.loads(q.text)['cookies'], {'myname': 'myvalue'}) - - def test_crossdomain_cookies(self): - """Cookies should not be sent to domains they didn't originate from.""" - r = requests.get("http://github.com") - c = r.cookies - # github should send us cookies - self.assertTrue(len(c) >= 1) - - # github cookies should not be sent to httpbin.org: - r2 = requests.get(httpbin('cookies'), cookies=c) - self.assertEqual(json.loads(r2.text)['cookies'], {}) - - # let's do this again using the session object - s = requests.session() - s.get("http://github.com") - self.assertTrue(len(s.cookies) >= 1) - r = s.get(httpbin('cookies')) - self.assertEqual(json.loads(r.text)['cookies'], {}) - # we can set a cookie and get exactly that same-domain cookie back: - r = s.get(httpbin('cookies', 'set', 'myname', 'myvalue')) - self.assertEqual(json.loads(r.text)['cookies'], {'myname': 'myvalue'}) - - def test_overwrite(self): - """Cookies should get overwritten when appropriate.""" - r = requests.get(httpbin('cookies', 'set', 'shimon', 'yochai')) - cookies = r.cookies - requests.get(httpbin('cookies', 'set', 'elazar', 'shimon'), cookies=cookies) - r = requests.get(httpbin('cookies'), cookies=cookies) - self.assertEqual(json.loads(r.text)['cookies'], - {'shimon': 'yochai', 'elazar': 'shimon'}) - # overwrite the value of 'shimon' - r = requests.get(httpbin('cookies', 'set', 'shimon', 'gamaliel'), cookies=cookies) - self.assertEqual(len(cookies), 2) - r = requests.get(httpbin('cookies'), cookies=cookies) - self.assertEqual(json.loads(r.text)['cookies'], - {'shimon': 'gamaliel', 'elazar': 'shimon'}) - - def test_redirects(self): - """Test that cookies set by a 302 page are correctly processed.""" - r = requests.get(httpbin('cookies', 'set', 'redirects', 'work')) - self.assertEqual(r.history[0].status_code, 302) - expected_cookies = {'redirects': 'work'} - self.assertEqual(json.loads(r.text)['cookies'], expected_cookies) - - r2 = requests.get(httpbin('cookies', 'set', 'very', 'well'), cookies=r.cookies) - expected_cookies = {'redirects': 'work', 'very': 'well'} - self.assertEqual(json.loads(r2.text)['cookies'], expected_cookies) - self.assertTrue(r.cookies is r2.cookies) - - def test_none_cookie(self): - """Regression test: don't send a Cookie header with a string value of 'None'!""" - page = json.loads(requests.get(httpbin('headers')).text) - self.assertTrue('Cookie' not in page['headers']) - - def test_secure_cookies(self): - """Test that secure cookies can only be sent via https.""" - header = "Set-Cookie: ThisIsA=SecureCookie; Path=/; Secure; HttpOnly" - url = 'https://httpbin.org/response-headers?%s' % (requests.utils.quote(header),) - cookies = requests.get(url, verify=False).cookies - self.assertEqual(len(cookies), 1) - self.assertEqual(list(cookies)[0].secure, True) - - secure_resp = requests.get('https://httpbin.org/cookies', cookies=cookies, verify=False) - secure_cookies_sent = json.loads(secure_resp.text)['cookies'] - self.assertEqual(secure_cookies_sent, {'ThisIsA': 'SecureCookie'}) - - insecure_resp = requests.get('http://httpbin.org/cookies', cookies=cookies) - insecure_cookies_sent = json.loads(insecure_resp.text)['cookies'] - self.assertEqual(insecure_cookies_sent, {}) - -class LWPCookieJarTest(TestBaseMixin, unittest.TestCase): - """Check store/load of cookies to FileCookieJar's, specifically LWPCookieJar's.""" - - COOKIEJAR_CLASS = cookielib.LWPCookieJar - - def setUp(self): - # blank the file - self.cookiejar_file = tempfile.NamedTemporaryFile() - self.cookiejar_filename = self.cookiejar_file.name - cookiejar = self.COOKIEJAR_CLASS(self.cookiejar_filename) - cookiejar.save() - - def tearDown(self): - try: - self.cookiejar_file.close() - except OSError: - pass - - def test_cookiejar_persistence(self): - """Test that we can save cookies to a FileCookieJar.""" - cookiejar = self.COOKIEJAR_CLASS(self.cookiejar_filename) - cookiejar.load() - # initially should be blank - self.assertEqual(len(cookiejar), 0) - - response = requests.get(httpbin('cookies', 'set', 'key', 'value'), cookies=cookiejar) - self.assertEqual(len(cookiejar), 1) - cookie = list(cookiejar)[0] - self.assertEqual(json.loads(response.text)['cookies'], {'key': 'value'}) - self.assertCookieHas(cookie, name='key', value='value') - - # save and reload the cookies from the file: - cookiejar.save(ignore_discard=True) - cookiejar_2 = self.COOKIEJAR_CLASS(self.cookiejar_filename) - cookiejar_2.load(ignore_discard=True) - self.assertEqual(len(cookiejar_2), 1) - cookie_2 = list(cookiejar_2)[0] - # this cookie should have been saved with the correct domain restriction: - self.assertCookieHas(cookie_2, name='key', value='value', - domain='httpbin.org', path='/') - - # httpbin sets session cookies, so if we don't ignore the discard attribute, - # there should be no cookie: - cookiejar_3 = self.COOKIEJAR_CLASS(self.cookiejar_filename) - cookiejar_3.load() - self.assertEqual(len(cookiejar_3), 0) - - def test_crossdomain(self): - """Test persistence of the domains associated with the cookies.""" - cookiejar = self.COOKIEJAR_CLASS(self.cookiejar_filename) - cookiejar.load() - self.assertEqual(len(cookiejar), 0) - - # github sets a cookie - requests.get("http://github.com", cookies=cookiejar) - num_github_cookies = len(cookiejar) - self.assertTrue(num_github_cookies >= 1) - # httpbin sets another - requests.get(httpbin('cookies', 'set', 'key', 'value'), cookies=cookiejar) - num_total_cookies = len(cookiejar) - self.assertTrue(num_total_cookies >= 2) - self.assertTrue(num_total_cookies > num_github_cookies) - - # save and load - cookiejar.save(ignore_discard=True) - cookiejar_2 = self.COOKIEJAR_CLASS(self.cookiejar_filename) - cookiejar_2.load(ignore_discard=True) - self.assertEqual(len(cookiejar_2), num_total_cookies) - r = requests.get(httpbin('cookies'), cookies=cookiejar_2) - self.assertEqual(json.loads(r.text)['cookies'], {'key': 'value'}) - - def test_persistent_cookies(self): - """Test that we correctly interpret persistent cookies.""" - # httpbin's normal cookie methods don't send persistent cookies, - # so cook up the appropriate header and force it to send - header = "Set-Cookie: Persistent=CookiesAreScary; expires=Sun, 04-May-2032 04:56:50 GMT; path=/" - url = httpbin('response-headers?%s' % (requests.utils.quote(header),)) - cookiejar = self.COOKIEJAR_CLASS(self.cookiejar_filename) - - requests.get(url, cookies=cookiejar) - self.assertEqual(len(cookiejar), 1) - self.assertCookieHas(list(cookiejar)[0], name='Persistent', value='CookiesAreScary') - - requests.get(httpbin('cookies', 'set', 'ThisCookieIs', 'SessionOnly'), cookies=cookiejar) - self.assertEqual(len(cookiejar), 2) - self.assertEqual(len([c for c in cookiejar if c.name == 'Persistent']), 1) - self.assertEqual(len([c for c in cookiejar if c.name == 'ThisCookieIs']), 1) - - # save and load - cookiejar.save() - cookiejar_2 = self.COOKIEJAR_CLASS(self.cookiejar_filename) - cookiejar_2.load() - # we should only load the persistent cookie - self.assertEqual(len(cookiejar_2), 1) - self.assertCookieHas(list(cookiejar_2)[0], name='Persistent', value='CookiesAreScary') - -class MozCookieJarTest(LWPCookieJarTest): - """Same test, but substitute MozillaCookieJar.""" - - COOKIEJAR_CLASS = cookielib.MozillaCookieJar - -if __name__ == '__main__': - unittest.main() diff --git a/tests/test_requests.py b/tests/test_requests.py deleted file mode 100755 index 530008a..0000000 --- a/tests/test_requests.py +++ /dev/null @@ -1,907 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -# from __future__ import unicode_literals - -# Path hack. -import sys -import os -sys.path.insert(0, os.path.abspath('..')) - -import json -import os -import unittest -import pickle - -import requests -from requests.compat import str, StringIO -# import envoy -from requests import HTTPError -from requests import get, post, head, put -from requests.auth import HTTPBasicAuth, HTTPDigestAuth - -if 'HTTPBIN_URL' not in os.environ: - os.environ['HTTPBIN_URL'] = 'http://httpbin.org/' - -HTTPBIN_URL = os.environ.get('HTTPBIN_URL') - - -def httpbin(*suffix): - """Returns url for HTTPBIN resource.""" - return HTTPBIN_URL + '/'.join(suffix) - - -SERVICES = (httpbin, ) - -_httpbin = False - - -class TestSetup(object): - """Requests test cases.""" - - # It goes to eleven. - _multiprocess_can_split_ = True - - def setUp(self): - - global _httpbin - - if (not 'HTTPBIN_URL' in os.environ) and not _httpbin: - # c = envoy.connect('httpbin %s' % (PORT)) - # time.sleep(1) - _httpbin = True - -class TestBaseMixin(object): - - def assertCookieHas(self, cookie, **kwargs): - """Assert that a cookie has various specified properties.""" - for attr, expected_value in kwargs.items(): - cookie_attr = getattr(cookie, attr) - message = 'Failed comparison for %s: %s != %s' % (attr, cookie_attr, expected_value) - self.assertEqual(cookie_attr, expected_value, message) - -class RequestsTestSuite(TestSetup, TestBaseMixin, unittest.TestCase): - """Requests test cases.""" - - def test_entry_points(self): - - requests.session - requests.session().get - requests.session().head - requests.get - requests.head - requests.put - requests.patch - requests.post - - def test_invalid_url(self): - self.assertRaises(ValueError, get, 'hiwpefhipowhefopw') - - def test_path_is_not_double_encoded(self): - request = requests.Request("http://0.0.0.0/get/test case") - - self.assertEqual(request.path_url, "/get/test%20case") - - def test_HTTP_200_OK_GET(self): - r = get(httpbin('get')) - self.assertEqual(r.status_code, 200) - - def test_response_sent(self): - r = get(httpbin('get')) - - self.assertTrue(r.request.sent) - - def test_HTTP_302_ALLOW_REDIRECT_GET(self): - r = get(httpbin('redirect', '1')) - self.assertEqual(r.status_code, 200) - - def test_HTTP_302_GET(self): - r = get(httpbin('redirect', '1'), allow_redirects=False) - self.assertEqual(r.status_code, 302) - - def test_HTTP_200_OK_GET_WITH_PARAMS(self): - heads = {'User-agent': 'Mozilla/5.0'} - - r = get(httpbin('user-agent'), headers=heads) - - assert heads['User-agent'] in r.text - self.assertEqual(r.status_code, 200) - - def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self): - heads = {'User-agent': 'Mozilla/5.0'} - - r = get(httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads) - self.assertEqual(r.status_code, 200) - - # def test_unicode_headers(self): - # # Simply calling requests with a unicode instance should simply work - # # when the characters are all representable using latin-1: - # heads = { u'User-Agent': u'Requests Test Suite' } - # requests.get(url=httpbin('get'), headers=heads) - - # # Characters outside latin-1 should raise an exception: - # heads = { u'User-Agent': u'\u30cd\u30c3\u30c8\u30ef\u30fc\u30af' } - # self.assertRaises(UnicodeEncodeError, requests.get, - # url=httpbin('get'), headers=heads) - - # def test_session_with_escaped_url(self): - # # Test a URL that contains percent-escaped characters - # # This URL should not be modified (double-escaped) - # # Tests: - # # - Quoted illegal characters ("%20" (' '), "%3C" ('<'), "%3E" ('>')) - # # - Quoted reserved characters ("%25" ('%'), "%23" ('#'), "%2F" ('/')) - # # - Quoted non-ASCII characters ("%C3%98", "%C3%A5") - # path_fully_escaped = '%3Ca%25b%23c%2Fd%3E/%C3%98%20%C3%A5' - # url = httpbin('get/' + path_fully_escaped) - # response = get(url) - # self.assertEqual(response.url, httpbin('get/' + path_fully_escaped)) - - # # Test that illegal characters in a path get properly percent-escaped - # # Tests: - # # - Bare illegal characters (space, '<') - # # - Bare non-ASCII characters ('\u00d8') - # path = u'<a%25b%23c%2Fd%3E/\u00d8 %C3%A5' - # url = httpbin('get/' + path) - # response = get(url) - # self.assertEqual(response.url, httpbin('get/' + path_fully_escaped)) - - # # Test that reserved characters in a path do not get percent-escaped - # # Tests: - # # - All reserved characters (RFC 3986), except '?', '#', '[' and ']', - # # which are not allowed in the path, and ';' which delimits - # # parameters. - # # All such characters must be allowed bare in path, and must not be - # # encoded. - # # - Special unreserved characters (RFC 3986), which should not be - # # encoded (even though it wouldn't hurt). - # path_reserved = '!$&\'()*+,/:=@-._~' - # url = httpbin('get/' + path_reserved) - # response = get(url) - # self.assertEqual(response.url, httpbin('get/' + path_reserved)) - - # # Test that percent-encoded unreserved characters in a path get - # # normalised to their un-encoded forms. - # path_unreserved = 'ABCDwxyz1234-._~' - # path_unreserved_escaped = '%41%42%43%44%77%78%79%7A%31%32%33%34%2D%2E%5F%7E' - # url = httpbin('get/' + path_unreserved_escaped) - # response = get(url) - # self.assertEqual(response.url, httpbin('get/' + path_unreserved)) - - # # Re-run all of the same tests on the query part of the URI - # query_fully_escaped = '%3Ca%25b%23c%2Fd%3E=%C3%98%20%C3%A5' - # url = httpbin('get/?' + query_fully_escaped) - # response = get(url) - # self.assertEqual(response.url, httpbin('get/?' + query_fully_escaped)) - - # query = u'<a%25b%23c%2Fd%3E=\u00d8 %C3%A5' - # url = httpbin('get/?' + query) - # response = get(url) - # self.assertEqual(response.url, httpbin('get/?' + query_fully_escaped)) - - # # The legal characters in query happens to be the same as in path - # query_reserved = '!$&\'()*+,/:=@-._~' - # url = httpbin('get/?' + query_reserved) - # response = get(url) - # self.assertEqual(response.url, httpbin('get/?' + query_reserved)) - - # query_unreserved = 'ABCDwxyz=1234-._~' - # query_unreserved_escaped = '%41%42%43%44%77%78%79%7A=%31%32%33%34%2D%2E%5F%7E' - # url = httpbin('get/?' + query_unreserved_escaped) - # response = get(url) - # self.assertEqual(response.url, httpbin('get/?' + query_unreserved)) - - def test_user_agent_transfers(self): - """Issue XX""" - - heads = { - 'User-agent': - 'Mozilla/5.0 (github.com/kennethreitz/requests)' - } - - r = get(httpbin('user-agent'), headers=heads) - self.assertTrue(heads['User-agent'] in r.text) - - heads = { - 'user-agent': - 'Mozilla/5.0 (github.com/kennethreitz/requests)' - } - - r = get(httpbin('user-agent'), headers=heads) - self.assertTrue(heads['user-agent'] in r.text) - - def test_HTTP_200_OK_HEAD(self): - r = head(httpbin('get')) - self.assertEqual(r.status_code, 200) - - def test_HTTP_200_OK_PUT(self): - r = put(httpbin('put')) - self.assertEqual(r.status_code, 200) - - def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self): - - for service in SERVICES: - - auth = ('user', 'pass') - url = service('basic-auth', 'user', 'pass') - - r = get(url, auth=auth) - self.assertEqual(r.status_code, 200) - - r = get(url) - self.assertEqual(r.status_code, 401) - - s = requests.session(auth=auth) - r = get(url, session=s) - self.assertEqual(r.status_code, 200) - - def test_BASICAUTH_HTTP_200_OK_GET(self): - - for service in SERVICES: - - auth = HTTPBasicAuth('user', 'pass') - url = service('basic-auth', 'user', 'pass') - - r = get(url, auth=auth) - self.assertEqual(r.status_code, 200) - - auth = ('user', 'pass') - r = get(url, auth=auth) - self.assertEqual(r.status_code, 200) - - r = get(url) - self.assertEqual(r.status_code, 401) - - s = requests.session(auth=auth) - r = get(url, session=s) - self.assertEqual(r.status_code, 200) - - def test_DIGESTAUTH_HTTP_200_OK_GET(self): - - for service in SERVICES: - - auth = HTTPDigestAuth('user', 'pass') - url = service('digest-auth', 'auth', 'user', 'pass') - - r = get(url, auth=auth) - self.assertEqual(r.status_code, 200) - - r = get(url) - self.assertEqual(r.status_code, 401) - - s = requests.session(auth=auth) - r = get(url, session=s) - self.assertEqual(r.status_code, 200) - - def test_DIGESTAUTH_WRONG_HTTP_401_GET(self): - - for service in SERVICES: - - auth = HTTPDigestAuth('user', 'wrongpass') - url = service('digest-auth', 'auth', 'user', 'pass') - - r = get(url, auth=auth) - self.assertEqual(r.status_code, 401) - - s = requests.session(auth=auth) - r = get(url, session=s) - self.assertEqual(r.status_code, 401) - - def test_POSTBIN_GET_POST_FILES(self): - - for service in SERVICES: - - url = service('post') - post1 = post(url).raise_for_status() - - post1 = post(url, data={'some': 'data'}) - self.assertEqual(post1.status_code, 200) - - with open(__file__) as f: - post2 = post(url, files={'some': f}) - self.assertEqual(post2.status_code, 200) - - post3 = post(url, data='[{"some": "json"}]') - self.assertEqual(post3.status_code, 200) - - def test_POSTBIN_GET_POST_FILES_WITH_PARAMS(self): - - for service in SERVICES: - - with open(__file__) as f: - url = service('post') - post1 = post(url, - files={'some': f}, - data={'some': 'data'}) - - self.assertEqual(post1.status_code, 200) - - def test_POSTBIN_GET_POST_FILES_WITH_HEADERS(self): - - for service in SERVICES: - - url = service('post') - - with open(__file__) as f: - - post2 = post(url, - files={'some': f}, - headers={'User-Agent': 'requests-tests'}) - - self.assertEqual(post2.status_code, 200) - - def test_POSTBIN_GET_POST_FILES_STRINGS(self): - - for service in SERVICES: - - url = service('post') - - post1 = post(url, files={'fname.txt': 'fdata'}) - self.assertEqual(post1.status_code, 200) - - post2 = post(url, files={'fname.txt': 'fdata', 'fname2.txt':'more fdata'}) - self.assertEqual(post2.status_code, 200) - - post3 = post(url, files={'fname.txt': 'fdata', 'fname2.txt':open(__file__,'rb')}) - self.assertEqual(post3.status_code, 200) - - post4 = post(url, files={'fname.txt': 'fdata'}) - self.assertEqual(post4.status_code, 200) - - post5 = post(url, files={'file': ('file.txt', 'more fdata')}) - self.assertEqual(post5.status_code, 200) - - post6 = post(url, files={'fname.txt': '\xe9'}) - self.assertEqual(post6.status_code, 200) - - post7 = post(url, files={'fname.txt': 'fdata to verify'}) - rbody = json.loads(post7.text) - self.assertTrue(rbody.get('files', None)) - self.assertTrue(rbody['files'].get('fname.txt'), None) - self.assertEqual(rbody['files']['fname.txt'], 'fdata to verify') - - - def test_nonzero_evaluation(self): - - for service in SERVICES: - - r = get(service('status', '500')) - self.assertEqual(bool(r), False) - - r = get(service('/get')) - self.assertEqual(bool(r), True) - - def test_request_ok_set(self): - - for service in SERVICES: - - r = get(service('status', '404')) - # print r.status_code - # r.raise_for_status() - self.assertEqual(r.ok, False) - - def test_status_raising(self): - r = get(httpbin('status', '404')) - self.assertRaises(HTTPError, r.raise_for_status) - - r = get(httpbin('status', '200')) - self.assertFalse(r.error) - r.raise_for_status() - - def test_default_status_raising(self): - config = {'danger_mode': True} - args = [httpbin('status', '404')] - kwargs = dict(config=config) - self.assertRaises(HTTPError, get, *args, **kwargs) - - r = get(httpbin('status', '200')) - self.assertEqual(r.status_code, 200) - - def test_decompress_gzip(self): - - r = get(httpbin('gzip')) - r.content.decode('ascii') - - def test_response_has_unicode_url(self): - - for service in SERVICES: - - url = service('get') - - response = get(url) - - assert isinstance(response.url, str) - - def test_unicode_get(self): - - for service in SERVICES: - - url = service('/get') - - get(url, params={'foo': 'føø'}) - get(url, params={'føø': 'føø'}) - get(url, params={'føø': 'føø'}) - get(url, params={'foo': 'foo'}) - get(service('ø'), params={'foo': 'foo'}) - - def test_httpauth_recursion(self): - - http_auth = HTTPBasicAuth('user', 'BADpass') - - for service in SERVICES: - r = get(service('basic-auth', 'user', 'pass'), auth=http_auth) - self.assertEqual(r.status_code, 401) - - def test_urlencoded_post_data(self): - - for service in SERVICES: - - r = post(service('post'), data=dict(test='fooaowpeuf')) - - self.assertEqual(r.status_code, 200) - self.assertEqual(r.headers['content-type'], 'application/json') - self.assertEqual(r.url, service('post')) - - rbody = json.loads(r.text) - - self.assertEqual(rbody.get('form'), dict(test='fooaowpeuf')) - self.assertEqual(rbody.get('data'), '') - - def test_nonurlencoded_post_data(self): - - for service in SERVICES: - - r = post(service('post'), data='fooaowpeuf') - - self.assertEqual(r.status_code, 200) - self.assertEqual(r.headers['content-type'], 'application/json') - self.assertEqual(r.url, service('post')) - - rbody = json.loads(r.text) - # Body wasn't valid url encoded data, so the server returns None as - # "form" and the raw body as "data". - - assert rbody.get('form') in (None, {}) - self.assertEqual(rbody.get('data'), 'fooaowpeuf') - - def test_urlencoded_post_querystring(self): - - for service in SERVICES: - - r = post(service('post'), params=dict(test='fooaowpeuf')) - - self.assertEqual(r.status_code, 200) - self.assertEqual(r.headers['content-type'], 'application/json') - self.assertEqual(r.url, service('post?test=fooaowpeuf')) - - rbody = json.loads(r.text) - self.assertEqual(rbody.get('form'), {}) # No form supplied - self.assertEqual(rbody.get('data'), '') - - def test_urlencoded_post_query_and_data(self): - - for service in SERVICES: - - r = post( - service('post'), - params=dict(test='fooaowpeuf'), - data=dict(test2="foobar")) - - self.assertEqual(r.status_code, 200) - self.assertEqual(r.headers['content-type'], 'application/json') - self.assertEqual(r.url, service('post?test=fooaowpeuf')) - - rbody = json.loads(r.text) - self.assertEqual(rbody.get('form'), dict(test2='foobar')) - self.assertEqual(rbody.get('data'), '') - - def test_nonurlencoded_postdata(self): - - for service in SERVICES: - - r = post(service('post'), data="foobar") - - self.assertEqual(r.status_code, 200) - self.assertEqual(r.headers['content-type'], 'application/json') - - rbody = json.loads(r.text) - - assert rbody.get('form') in (None, {}) - self.assertEqual(rbody.get('data'), 'foobar') - - def test_urlencoded_get_query_multivalued_param(self): - - for service in SERVICES: - - r = get(service('get'), params=dict(test=['foo', 'baz'])) - self.assertEqual(r.status_code, 200) - self.assertEqual(r.url, service('get?test=foo&test=baz')) - - def test_urlencoded_post_querystring_multivalued(self): - - for service in SERVICES: - - r = post(service('post'), params=dict(test=['foo', 'baz'])) - self.assertEqual(r.status_code, 200) - self.assertEqual(r.headers['content-type'], 'application/json') - self.assertEqual(r.url, service('post?test=foo&test=baz')) - - rbody = json.loads(r.text) - self.assertEqual(rbody.get('form'), {}) # No form supplied - self.assertEqual(rbody.get('data'), '') - - def test_urlencoded_post_query_multivalued_and_data(self): - - for service in SERVICES: - - r = post( - service('post'), - params=dict(test=['foo', 'baz']), - data=dict(test2="foobar", test3=['foo', 'baz'])) - - self.assertEqual(r.status_code, 200) - self.assertEqual(r.headers['content-type'], 'application/json') - self.assertEqual(r.url, service('post?test=foo&test=baz')) - - # print(r.text) - # print('-----------------------') - - rbody = json.loads(r.text) - self.assertEqual(rbody.get('form'), dict(test2='foobar', test3=['foo', 'baz'])) - self.assertEqual(rbody.get('data'), '') - - def test_GET_no_redirect(self): - - for service in SERVICES: - - r = get(service('redirect', '3'), allow_redirects=False) - self.assertEqual(r.status_code, 302) - self.assertEqual(len(r.history), 0) - - def test_HEAD_no_redirect(self): - - for service in SERVICES: - - r = head(service('redirect', '3'), allow_redirects=False) - self.assertEqual(r.status_code, 302) - self.assertEqual(len(r.history), 0) - - def test_redirect_history(self): - - for service in SERVICES: - - r = get(service('redirect', '3')) - self.assertEqual(r.status_code, 200) - self.assertEqual(len(r.history), 3) - - def test_relative_redirect_history(self): - - for service in SERVICES: - - r = get(service('relative-redirect', '3')) - self.assertEqual(r.status_code, 200) - self.assertEqual(len(r.history), 3) - - def test_session_HTTP_200_OK_GET(self): - - s = requests.session() - r = get(httpbin('get'), session=s) - self.assertEqual(r.status_code, 200) - - def test_session_persistent_headers(self): - - heads = {'User-agent': 'Mozilla/5.0'} - - s = requests.session() - s.headers = heads - - # Make 2 requests from Session object, should send header both times - r1 = get(httpbin('user-agent'), session=s) - assert heads['User-agent'] in r1.text - - r2 = get(httpbin('user-agent'), session=s) - assert heads['User-agent'] in r2.text - - new_heads = {'User-agent': 'blah'} - r3 = get(httpbin('user-agent'), headers=new_heads, session=s) - assert new_heads['User-agent'] in r3.text - - self.assertEqual(r2.status_code, 200) - - def test_single_hook(self): - - def add_foo_header(args): - if not args.get('headers'): - args['headers'] = {} - - args['headers'].update({ - 'X-Foo': 'foo' - }) - - return args - - for service in SERVICES: - url = service('headers') - response = get(url=url, hooks={'args': add_foo_header}) - - assert 'foo' in response.text - - def test_multiple_hooks(self): - - def add_foo_header(args): - if not args.get('headers'): - args['headers'] = {} - - args['headers'].update({ - 'X-Foo': 'foo' - }) - - return args - - def add_bar_header(args): - if not args.get('headers'): - args['headers'] = {} - - args['headers'].update({ - 'X-Bar': 'bar' - }) - - return args - - for service in SERVICES: - url = service('headers') - - response = get(url=url, - hooks={ - 'args': [add_foo_header, add_bar_header] - } - ) - - assert 'foo' in response.text - assert 'bar' in response.text - - def test_session_persistent_cookies(self): - - s = requests.session() - - # Internally dispatched cookies are sent. - _c = {'kenneth': 'reitz', 'bessie': 'monke'} - r = get(httpbin('cookies'), cookies=_c, session=s) - r = get(httpbin('cookies'), session=s) - - # Those cookies persist transparently. - c = json.loads(r.text).get('cookies') - self.assertEqual(c, _c) - - # Double check. - r = get(httpbin('cookies'), cookies={}, session=s) - c = json.loads(r.text).get('cookies') - self.assertEqual(c, _c) - - # Remove a cookie by setting it's value to None. - r = get(httpbin('cookies'), cookies={'bessie': None}, session=s) - c = json.loads(r.text).get('cookies') - del _c['bessie'] - self.assertEqual(c, _c) - - # Test session-level cookies. - s = requests.session(cookies=_c) - r = get(httpbin('cookies'), session=s) - c = json.loads(r.text).get('cookies') - self.assertEqual(c, _c) - - # Have the server set a cookie. - r = get(httpbin('cookies', 'set', 'k', 'v'), allow_redirects=True, session=s) - c = json.loads(r.text).get('cookies') - - assert 'k' in c - - # And server-set cookie persistience. - r = get(httpbin('cookies'), session=s) - c = json.loads(r.text).get('cookies') - - assert 'k' in c - - def test_session_persistent_params(self): - - params = {'a': 'a_test'} - - s = requests.session() - s.params = params - - # Make 2 requests from Session object, should send header both times - r1 = get(httpbin('get'), session=s) - assert params['a'] in r1.text - - params2 = {'b': 'b_test'} - - r2 = get(httpbin('get'), params=params2, session=s) - assert params['a'] in r2.text - assert params2['b'] in r2.text - - params3 = {'b': 'b_test', 'a': None, 'c': 'c_test'} - - r3 = get(httpbin('get'), params=params3, session=s) - - assert not params['a'] in r3.text - assert params3['b'] in r3.text - assert params3['c'] in r3.text - - def test_session_pickling(self): - - s = requests.session( - headers={'header': 'value'}, - cookies={'a-cookie': 'cookie-value'}, - auth=('username', 'password')) - - ds = pickle.loads(pickle.dumps(s)) - - self.assertEqual(s.headers, ds.headers) - self.assertEqual(s.auth, ds.auth) - - # Cookie doesn't have a good __eq__, so verify manually: - self.assertEqual(len(ds.cookies), 1) - for cookie in ds.cookies: - self.assertCookieHas(cookie, name='a-cookie', value='cookie-value') - - def test_unpickled_session_requests(self): - s = requests.session() - r = get(httpbin('cookies', 'set', 'k', 'v'), allow_redirects=True, session=s) - c = json.loads(r.text).get('cookies') - assert 'k' in c - - ds = pickle.loads(pickle.dumps(s)) - r = get(httpbin('cookies'), session=ds) - c = json.loads(r.text).get('cookies') - assert 'k' in c - - ds1 = pickle.loads(pickle.dumps(requests.session())) - ds2 = pickle.loads(pickle.dumps(requests.session(prefetch=True))) - assert not ds1.prefetch - assert ds2.prefetch - - def test_invalid_content(self): - # WARNING: if you're using a terrible DNS provider (comcast), - # this will fail. - try: - hah = 'http://somedomainthatclearlydoesntexistg.com' - r = get(hah, allow_redirects=False) - except requests.ConnectionError: - pass # \o/ - else: - assert False - - config = {'safe_mode': True} - r = get(hah, allow_redirects=False, config=config) - assert r.content == None - - def test_cached_response(self): - - r1 = get(httpbin('get'), prefetch=False) - assert not r1._content - assert r1.content - assert r1.text - - r2 = get(httpbin('get'), prefetch=True) - assert r2._content - assert r2.content - assert r2.text - - def test_iter_lines(self): - - lines = (0, 2, 10, 100) - - for i in lines: - r = get(httpbin('stream', str(i)), prefetch=False) - lines = list(r.iter_lines()) - len_lines = len(lines) - - self.assertEqual(i, len_lines) - - # Tests that trailing whitespaces within lines do not get stripped. - # Tests that a trailing non-terminated line does not get stripped. - quote = ( - '''Agamemnon \n''' - '''\tWhy will he not upon our fair request\r\n''' - '''\tUntent his person and share the air with us?''' - ) - - # Make a request and monkey-patch its contents - r = get(httpbin('get')) - r.raw = StringIO(quote) - - lines = list(r.iter_lines()) - len_lines = len(lines) - self.assertEqual(len_lines, 3) - - joined = lines[0] + '\n' + lines[1] + '\r\n' + lines[2] - self.assertEqual(joined, quote) - - def test_safe_mode(self): - - safe = requests.session(config=dict(safe_mode=True)) - - # Safe mode creates empty responses for failed requests. - # Iterating on these responses should produce empty sequences - r = get('http://_/', session=safe) - self.assertEqual(list(r.iter_lines()), []) - assert isinstance(r.error, requests.exceptions.ConnectionError) - - r = get('http://_/', session=safe) - self.assertEqual(list(r.iter_content()), []) - assert isinstance(r.error, requests.exceptions.ConnectionError) - - # When not in safe mode, should raise Timeout exception - self.assertRaises( - requests.exceptions.Timeout, - get, - httpbin('stream', '1000'), timeout=0.0001) - - # In safe mode, should return a blank response - r = get(httpbin('stream', '1000'), timeout=0.0001, - config=dict(safe_mode=True)) - assert r.content is None - assert isinstance(r.error, requests.exceptions.Timeout) - - def test_upload_binary_data(self): - - requests.get(httpbin('post'), auth=('a', 'b'), data='\xff') - - def test_useful_exception_for_invalid_port(self): - # If we pass a legitimate URL with an invalid port, we should fail. - self.assertRaises( - ValueError, - get, - 'http://google.com:banana') - - def test_useful_exception_for_invalid_scheme(self): - - # If we pass a legitimate URL with a scheme not supported - # by requests, we should fail. - self.assertRaises( - ValueError, - get, - 'ftp://ftp.kernel.org/pub/') - - def test_can_have_none_in_header_values(self): - try: - # Don't choke on headers with none in the value. - requests.get(httpbin('headers'), headers={'Foo': None}) - except TypeError: - self.fail() - - def test_danger_mode_redirects(self): - s = requests.session() - s.config['danger_mode'] = True - s.get(httpbin('redirect', '4')) - - - def test_empty_response(self): - r = requests.get(httpbin('status', '404')) - r.text - - def test_max_redirects(self): - """Test the max_redirects config variable, normally and under safe_mode.""" - def unsafe_callable(): - requests.get(httpbin('redirect', '3'), config=dict(max_redirects=2)) - self.assertRaises(requests.exceptions.TooManyRedirects, unsafe_callable) - - # add safe mode - response = requests.get(httpbin('redirect', '3'), config=dict(safe_mode=True, max_redirects=2)) - self.assertTrue(response.content is None) - self.assertTrue(isinstance(response.error, requests.exceptions.TooManyRedirects)) - - def test_connection_keepalive_and_close(self): - """Test that we send 'Connection: close' when keep_alive is disabled.""" - # keep-alive should be on by default - r1 = requests.get(httpbin('get')) - # XXX due to proxying issues, test the header sent back by httpbin, rather than - # the header reported in its message body. See kennethreitz/httpbin#46 - self.assertEqual(r1.headers['Connection'].lower(), 'keep-alive') - - # but when we disable it, we should send a 'Connection: close' - # and get the same back: - r2 = requests.get(httpbin('get'), config=dict(keep_alive=False)) - self.assertEqual(r2.headers['Connection'].lower(), 'close') - -if __name__ == '__main__': - unittest.main() diff --git a/tests/test_requests_async.py b/tests/test_requests_async.py deleted file mode 100755 index 3a5a762..0000000 --- a/tests/test_requests_async.py +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -# Path hack. -import sys, os -sys.path.insert(0, os.path.abspath('..')) - -import sys -import unittest - -import select -has_poll = hasattr(select, "poll") - -from requests import async - -sys.path.append('.') -from test_requests import httpbin, RequestsTestSuite, SERVICES - - -class RequestsTestSuiteUsingAsyncApi(RequestsTestSuite): - """Requests async test cases.""" - - def patched(f): - """Automatically send request after creation.""" - - def wrapped(*args, **kwargs): - - request = f(*args, **kwargs) - return async.map([request])[0] - - return wrapped - - # Patched requests.api functions. - global request - request = patched(async.request) - - global delete, get, head, options, patch, post, put - delete = patched(async.delete) - get = patched(async.get) - head = patched(async.head) - options = patched(async.options) - patch = patched(async.patch) - post = patched(async.post) - put = patched(async.put) - - - def test_entry_points(self): - - async.request - - async.delete - async.get - async.head - async.options - async.patch - async.post - async.put - - async.map - async.send - - def test_select_poll(self): - """Test to make sure we don't overwrite the poll""" - self.assertEqual(hasattr(select, "poll"), has_poll) - -if __name__ == '__main__': - unittest.main() diff --git a/tests/test_requests_ext.py b/tests/test_requests_ext.py deleted file mode 100644 index 883bdce..0000000 --- a/tests/test_requests_ext.py +++ /dev/null @@ -1,131 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -# Path hack. -import sys, os -sys.path.insert(0, os.path.abspath('..')) - -import unittest - -import requests -from requests.compat import is_py2, is_py3 - -try: - import omnijson as json -except ImportError: - import json - - -class RequestsTestSuite(unittest.TestCase): - """Requests test cases.""" - - # It goes to eleven. - _multiprocess_can_split_ = True - - def test_addition(self): - assert (1 + 1) == 2 - - - def test_ssl_hostname_ok(self): - requests.get('https://github.com', verify=True) - - - def test_ssl_hostname_not_ok(self): - requests.get('https://kennethreitz.com', verify=False) - - self.assertRaises(requests.exceptions.SSLError, requests.get, 'https://kennethreitz.com') - - - def test_ssl_hostname_session_not_ok(self): - - s = requests.session() - - self.assertRaises(requests.exceptions.SSLError, s.get, 'https://kennethreitz.com') - - s.get('https://kennethreitz.com', verify=False) - - - def test_binary_post(self): - '''We need to be careful how we build the utf-8 string since - unicode literals are a syntax error in python3 - ''' - - if is_py2: - # Blasphemy! - utf8_string = eval("u'Smörgås'.encode('utf-8')") - elif is_py3: - utf8_string = 'Smörgås'.encode('utf-8') - else: - raise EnvironmentError('Flesh out this test for your environment.') - requests.post('http://www.google.com/', data=utf8_string) - - - - def test_unicode_error(self): - url = 'http://blip.fm/~1abvfu' - requests.get(url) - - - def test_chunked_head_redirect(self): - url = "http://t.co/NFrx0zLG" - r = requests.head(url, allow_redirects=True) - self.assertEqual(r.status_code, 200) - - def test_unicode_redirect(self): - '''This url redirects to a location that has a nonstandard - character in it, that breaks requests in python2.7 - - After some research, the cause was identified as an unintended - sideeffect of overriding of str with unicode. - - In the case that the redirected url is actually a malformed - "bytes" object, i.e. a string with character c where - ord(c) > 127, - then unicode(url) breaks. - ''' - r = requests.get('http://www.marketwire.com/mw/release.' + - 'do?id=1628202&sourceType=3') - assert r.ok - - def test_unicode_url_outright(self): - '''This url visits in my browser''' - r = requests.get('http://www.marketwire.com/press-release/' + - 'jp-morgan-behauptet-sich-der-spitze-euro' + - 'p%C3%A4ischer-anleihe-analysten-laut-umf' + - 'rageergebnissen-1628202.htm') - assert r.ok - - def test_redirect_encoding(self): - '''This url redirects to - http://www.dealipedia.com/deal_view_investment.php?r=20012''' - - r = requests.get('http://feedproxy.google.com/~r/Dealipedia' + - 'News/~3/BQtUJRJeZlo/deal_view_investment.' + - 'php') - assert r.ok - - def test_cookies_on_redirects(self): - """Test interaction between cookie handling and redirection.""" - # get a cookie for tinyurl.com ONLY - s = requests.session() - s.get(url='http://tinyurl.com/preview.php?disable=1') - # we should have set a cookie for tinyurl: preview=0 - self.assertIn('preview', s.cookies) - self.assertEqual(s.cookies['preview'], '0') - self.assertEqual(list(s.cookies)[0].name, 'preview') - self.assertEqual(list(s.cookies)[0].domain, 'tinyurl.com') - - # get cookies on another domain - r2 = s.get(url='http://httpbin.org/cookies') - # the cookie is not there - self.assertNotIn('preview', json.loads(r2.text)['cookies']) - - # this redirects to another domain, httpbin.org - # cookies of the first domain should NOT be sent to the next one - r3 = s.get(url='http://tinyurl.com/7zp3jnr') - assert r3.url == 'http://httpbin.org/cookies' - self.assertNotIn('preview', json.loads(r2.text)['cookies']) - -if __name__ == '__main__': - unittest.main() - diff --git a/tests/test_requests_https.py b/tests/test_requests_https.py deleted file mode 100755 index c6ea8f3..0000000 --- a/tests/test_requests_https.py +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import sys, os -import json -import unittest - -# Path hack. -sys.path.insert(0, os.path.abspath('..')) -import requests - -class HTTPSTest(unittest.TestCase): - """Smoke test for https functionality.""" - - smoke_url = "https://github.com" - - def perform_smoke_test(self, verify=False): - result = requests.get(self.smoke_url, verify=verify) - self.assertEqual(result.status_code, 200) - - def test_smoke(self): - """Smoke test without verification.""" - self.perform_smoke_test(verify=False) - - def test_smoke_verified(self): - """Smoke test with SSL verification.""" - self.perform_smoke_test(verify=True) - - -if __name__ == '__main__': - unittest.main() |