aboutsummaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rwxr-xr-xtests/test_cookies.py207
-rwxr-xr-xtests/test_requests.py907
-rwxr-xr-xtests/test_requests_async.py67
-rw-r--r--tests/test_requests_ext.py131
-rwxr-xr-xtests/test_requests_https.py31
5 files changed, 0 insertions, 1343 deletions
diff --git a/tests/test_cookies.py b/tests/test_cookies.py
deleted file mode 100755
index dca7d2c..0000000
--- a/tests/test_cookies.py
+++ /dev/null
@@ -1,207 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import sys
-import json
-import os
-import tempfile
-import unittest
-
-# Path hack.
-sys.path.insert(0, os.path.abspath('..'))
-import requests
-from requests.compat import cookielib
-
-# More hacks
-sys.path.append('.')
-from test_requests import httpbin, TestBaseMixin
-
-class CookieTests(TestBaseMixin, unittest.TestCase):
-
- def test_cookies_from_response(self):
- """Basic test that we correctly parse received cookies in the Response object."""
- r = requests.get(httpbin('cookies', 'set', 'myname', 'myvalue'))
-
- # test deprecated dictionary interface
- self.assertEqual(r.cookies['myname'], 'myvalue')
- # test CookieJar interface
- jar = r.cookies
- self.assertEqual(len(jar), 1)
- cookie_from_jar = list(jar)[0]
- self.assertCookieHas(cookie_from_jar, name='myname', value='myvalue')
-
- q = requests.get(httpbin('cookies'), cookies=jar)
- self.assertEqual(json.loads(q.text)['cookies'], {'myname': 'myvalue'})
-
- def test_crossdomain_cookies(self):
- """Cookies should not be sent to domains they didn't originate from."""
- r = requests.get("http://github.com")
- c = r.cookies
- # github should send us cookies
- self.assertTrue(len(c) >= 1)
-
- # github cookies should not be sent to httpbin.org:
- r2 = requests.get(httpbin('cookies'), cookies=c)
- self.assertEqual(json.loads(r2.text)['cookies'], {})
-
- # let's do this again using the session object
- s = requests.session()
- s.get("http://github.com")
- self.assertTrue(len(s.cookies) >= 1)
- r = s.get(httpbin('cookies'))
- self.assertEqual(json.loads(r.text)['cookies'], {})
- # we can set a cookie and get exactly that same-domain cookie back:
- r = s.get(httpbin('cookies', 'set', 'myname', 'myvalue'))
- self.assertEqual(json.loads(r.text)['cookies'], {'myname': 'myvalue'})
-
- def test_overwrite(self):
- """Cookies should get overwritten when appropriate."""
- r = requests.get(httpbin('cookies', 'set', 'shimon', 'yochai'))
- cookies = r.cookies
- requests.get(httpbin('cookies', 'set', 'elazar', 'shimon'), cookies=cookies)
- r = requests.get(httpbin('cookies'), cookies=cookies)
- self.assertEqual(json.loads(r.text)['cookies'],
- {'shimon': 'yochai', 'elazar': 'shimon'})
- # overwrite the value of 'shimon'
- r = requests.get(httpbin('cookies', 'set', 'shimon', 'gamaliel'), cookies=cookies)
- self.assertEqual(len(cookies), 2)
- r = requests.get(httpbin('cookies'), cookies=cookies)
- self.assertEqual(json.loads(r.text)['cookies'],
- {'shimon': 'gamaliel', 'elazar': 'shimon'})
-
- def test_redirects(self):
- """Test that cookies set by a 302 page are correctly processed."""
- r = requests.get(httpbin('cookies', 'set', 'redirects', 'work'))
- self.assertEqual(r.history[0].status_code, 302)
- expected_cookies = {'redirects': 'work'}
- self.assertEqual(json.loads(r.text)['cookies'], expected_cookies)
-
- r2 = requests.get(httpbin('cookies', 'set', 'very', 'well'), cookies=r.cookies)
- expected_cookies = {'redirects': 'work', 'very': 'well'}
- self.assertEqual(json.loads(r2.text)['cookies'], expected_cookies)
- self.assertTrue(r.cookies is r2.cookies)
-
- def test_none_cookie(self):
- """Regression test: don't send a Cookie header with a string value of 'None'!"""
- page = json.loads(requests.get(httpbin('headers')).text)
- self.assertTrue('Cookie' not in page['headers'])
-
- def test_secure_cookies(self):
- """Test that secure cookies can only be sent via https."""
- header = "Set-Cookie: ThisIsA=SecureCookie; Path=/; Secure; HttpOnly"
- url = 'https://httpbin.org/response-headers?%s' % (requests.utils.quote(header),)
- cookies = requests.get(url, verify=False).cookies
- self.assertEqual(len(cookies), 1)
- self.assertEqual(list(cookies)[0].secure, True)
-
- secure_resp = requests.get('https://httpbin.org/cookies', cookies=cookies, verify=False)
- secure_cookies_sent = json.loads(secure_resp.text)['cookies']
- self.assertEqual(secure_cookies_sent, {'ThisIsA': 'SecureCookie'})
-
- insecure_resp = requests.get('http://httpbin.org/cookies', cookies=cookies)
- insecure_cookies_sent = json.loads(insecure_resp.text)['cookies']
- self.assertEqual(insecure_cookies_sent, {})
-
-class LWPCookieJarTest(TestBaseMixin, unittest.TestCase):
- """Check store/load of cookies to FileCookieJar's, specifically LWPCookieJar's."""
-
- COOKIEJAR_CLASS = cookielib.LWPCookieJar
-
- def setUp(self):
- # blank the file
- self.cookiejar_file = tempfile.NamedTemporaryFile()
- self.cookiejar_filename = self.cookiejar_file.name
- cookiejar = self.COOKIEJAR_CLASS(self.cookiejar_filename)
- cookiejar.save()
-
- def tearDown(self):
- try:
- self.cookiejar_file.close()
- except OSError:
- pass
-
- def test_cookiejar_persistence(self):
- """Test that we can save cookies to a FileCookieJar."""
- cookiejar = self.COOKIEJAR_CLASS(self.cookiejar_filename)
- cookiejar.load()
- # initially should be blank
- self.assertEqual(len(cookiejar), 0)
-
- response = requests.get(httpbin('cookies', 'set', 'key', 'value'), cookies=cookiejar)
- self.assertEqual(len(cookiejar), 1)
- cookie = list(cookiejar)[0]
- self.assertEqual(json.loads(response.text)['cookies'], {'key': 'value'})
- self.assertCookieHas(cookie, name='key', value='value')
-
- # save and reload the cookies from the file:
- cookiejar.save(ignore_discard=True)
- cookiejar_2 = self.COOKIEJAR_CLASS(self.cookiejar_filename)
- cookiejar_2.load(ignore_discard=True)
- self.assertEqual(len(cookiejar_2), 1)
- cookie_2 = list(cookiejar_2)[0]
- # this cookie should have been saved with the correct domain restriction:
- self.assertCookieHas(cookie_2, name='key', value='value',
- domain='httpbin.org', path='/')
-
- # httpbin sets session cookies, so if we don't ignore the discard attribute,
- # there should be no cookie:
- cookiejar_3 = self.COOKIEJAR_CLASS(self.cookiejar_filename)
- cookiejar_3.load()
- self.assertEqual(len(cookiejar_3), 0)
-
- def test_crossdomain(self):
- """Test persistence of the domains associated with the cookies."""
- cookiejar = self.COOKIEJAR_CLASS(self.cookiejar_filename)
- cookiejar.load()
- self.assertEqual(len(cookiejar), 0)
-
- # github sets a cookie
- requests.get("http://github.com", cookies=cookiejar)
- num_github_cookies = len(cookiejar)
- self.assertTrue(num_github_cookies >= 1)
- # httpbin sets another
- requests.get(httpbin('cookies', 'set', 'key', 'value'), cookies=cookiejar)
- num_total_cookies = len(cookiejar)
- self.assertTrue(num_total_cookies >= 2)
- self.assertTrue(num_total_cookies > num_github_cookies)
-
- # save and load
- cookiejar.save(ignore_discard=True)
- cookiejar_2 = self.COOKIEJAR_CLASS(self.cookiejar_filename)
- cookiejar_2.load(ignore_discard=True)
- self.assertEqual(len(cookiejar_2), num_total_cookies)
- r = requests.get(httpbin('cookies'), cookies=cookiejar_2)
- self.assertEqual(json.loads(r.text)['cookies'], {'key': 'value'})
-
- def test_persistent_cookies(self):
- """Test that we correctly interpret persistent cookies."""
- # httpbin's normal cookie methods don't send persistent cookies,
- # so cook up the appropriate header and force it to send
- header = "Set-Cookie: Persistent=CookiesAreScary; expires=Sun, 04-May-2032 04:56:50 GMT; path=/"
- url = httpbin('response-headers?%s' % (requests.utils.quote(header),))
- cookiejar = self.COOKIEJAR_CLASS(self.cookiejar_filename)
-
- requests.get(url, cookies=cookiejar)
- self.assertEqual(len(cookiejar), 1)
- self.assertCookieHas(list(cookiejar)[0], name='Persistent', value='CookiesAreScary')
-
- requests.get(httpbin('cookies', 'set', 'ThisCookieIs', 'SessionOnly'), cookies=cookiejar)
- self.assertEqual(len(cookiejar), 2)
- self.assertEqual(len([c for c in cookiejar if c.name == 'Persistent']), 1)
- self.assertEqual(len([c for c in cookiejar if c.name == 'ThisCookieIs']), 1)
-
- # save and load
- cookiejar.save()
- cookiejar_2 = self.COOKIEJAR_CLASS(self.cookiejar_filename)
- cookiejar_2.load()
- # we should only load the persistent cookie
- self.assertEqual(len(cookiejar_2), 1)
- self.assertCookieHas(list(cookiejar_2)[0], name='Persistent', value='CookiesAreScary')
-
-class MozCookieJarTest(LWPCookieJarTest):
- """Same test, but substitute MozillaCookieJar."""
-
- COOKIEJAR_CLASS = cookielib.MozillaCookieJar
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/tests/test_requests.py b/tests/test_requests.py
deleted file mode 100755
index 530008a..0000000
--- a/tests/test_requests.py
+++ /dev/null
@@ -1,907 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-# from __future__ import unicode_literals
-
-# Path hack.
-import sys
-import os
-sys.path.insert(0, os.path.abspath('..'))
-
-import json
-import os
-import unittest
-import pickle
-
-import requests
-from requests.compat import str, StringIO
-# import envoy
-from requests import HTTPError
-from requests import get, post, head, put
-from requests.auth import HTTPBasicAuth, HTTPDigestAuth
-
-if 'HTTPBIN_URL' not in os.environ:
- os.environ['HTTPBIN_URL'] = 'http://httpbin.org/'
-
-HTTPBIN_URL = os.environ.get('HTTPBIN_URL')
-
-
-def httpbin(*suffix):
- """Returns url for HTTPBIN resource."""
- return HTTPBIN_URL + '/'.join(suffix)
-
-
-SERVICES = (httpbin, )
-
-_httpbin = False
-
-
-class TestSetup(object):
- """Requests test cases."""
-
- # It goes to eleven.
- _multiprocess_can_split_ = True
-
- def setUp(self):
-
- global _httpbin
-
- if (not 'HTTPBIN_URL' in os.environ) and not _httpbin:
- # c = envoy.connect('httpbin %s' % (PORT))
- # time.sleep(1)
- _httpbin = True
-
-class TestBaseMixin(object):
-
- def assertCookieHas(self, cookie, **kwargs):
- """Assert that a cookie has various specified properties."""
- for attr, expected_value in kwargs.items():
- cookie_attr = getattr(cookie, attr)
- message = 'Failed comparison for %s: %s != %s' % (attr, cookie_attr, expected_value)
- self.assertEqual(cookie_attr, expected_value, message)
-
-class RequestsTestSuite(TestSetup, TestBaseMixin, unittest.TestCase):
- """Requests test cases."""
-
- def test_entry_points(self):
-
- requests.session
- requests.session().get
- requests.session().head
- requests.get
- requests.head
- requests.put
- requests.patch
- requests.post
-
- def test_invalid_url(self):
- self.assertRaises(ValueError, get, 'hiwpefhipowhefopw')
-
- def test_path_is_not_double_encoded(self):
- request = requests.Request("http://0.0.0.0/get/test case")
-
- self.assertEqual(request.path_url, "/get/test%20case")
-
- def test_HTTP_200_OK_GET(self):
- r = get(httpbin('get'))
- self.assertEqual(r.status_code, 200)
-
- def test_response_sent(self):
- r = get(httpbin('get'))
-
- self.assertTrue(r.request.sent)
-
- def test_HTTP_302_ALLOW_REDIRECT_GET(self):
- r = get(httpbin('redirect', '1'))
- self.assertEqual(r.status_code, 200)
-
- def test_HTTP_302_GET(self):
- r = get(httpbin('redirect', '1'), allow_redirects=False)
- self.assertEqual(r.status_code, 302)
-
- def test_HTTP_200_OK_GET_WITH_PARAMS(self):
- heads = {'User-agent': 'Mozilla/5.0'}
-
- r = get(httpbin('user-agent'), headers=heads)
-
- assert heads['User-agent'] in r.text
- self.assertEqual(r.status_code, 200)
-
- def test_HTTP_200_OK_GET_WITH_MIXED_PARAMS(self):
- heads = {'User-agent': 'Mozilla/5.0'}
-
- r = get(httpbin('get') + '?test=true', params={'q': 'test'}, headers=heads)
- self.assertEqual(r.status_code, 200)
-
- # def test_unicode_headers(self):
- # # Simply calling requests with a unicode instance should simply work
- # # when the characters are all representable using latin-1:
- # heads = { u'User-Agent': u'Requests Test Suite' }
- # requests.get(url=httpbin('get'), headers=heads)
-
- # # Characters outside latin-1 should raise an exception:
- # heads = { u'User-Agent': u'\u30cd\u30c3\u30c8\u30ef\u30fc\u30af' }
- # self.assertRaises(UnicodeEncodeError, requests.get,
- # url=httpbin('get'), headers=heads)
-
- # def test_session_with_escaped_url(self):
- # # Test a URL that contains percent-escaped characters
- # # This URL should not be modified (double-escaped)
- # # Tests:
- # # - Quoted illegal characters ("%20" (' '), "%3C" ('<'), "%3E" ('>'))
- # # - Quoted reserved characters ("%25" ('%'), "%23" ('#'), "%2F" ('/'))
- # # - Quoted non-ASCII characters ("%C3%98", "%C3%A5")
- # path_fully_escaped = '%3Ca%25b%23c%2Fd%3E/%C3%98%20%C3%A5'
- # url = httpbin('get/' + path_fully_escaped)
- # response = get(url)
- # self.assertEqual(response.url, httpbin('get/' + path_fully_escaped))
-
- # # Test that illegal characters in a path get properly percent-escaped
- # # Tests:
- # # - Bare illegal characters (space, '<')
- # # - Bare non-ASCII characters ('\u00d8')
- # path = u'<a%25b%23c%2Fd%3E/\u00d8 %C3%A5'
- # url = httpbin('get/' + path)
- # response = get(url)
- # self.assertEqual(response.url, httpbin('get/' + path_fully_escaped))
-
- # # Test that reserved characters in a path do not get percent-escaped
- # # Tests:
- # # - All reserved characters (RFC 3986), except '?', '#', '[' and ']',
- # # which are not allowed in the path, and ';' which delimits
- # # parameters.
- # # All such characters must be allowed bare in path, and must not be
- # # encoded.
- # # - Special unreserved characters (RFC 3986), which should not be
- # # encoded (even though it wouldn't hurt).
- # path_reserved = '!$&\'()*+,/:=@-._~'
- # url = httpbin('get/' + path_reserved)
- # response = get(url)
- # self.assertEqual(response.url, httpbin('get/' + path_reserved))
-
- # # Test that percent-encoded unreserved characters in a path get
- # # normalised to their un-encoded forms.
- # path_unreserved = 'ABCDwxyz1234-._~'
- # path_unreserved_escaped = '%41%42%43%44%77%78%79%7A%31%32%33%34%2D%2E%5F%7E'
- # url = httpbin('get/' + path_unreserved_escaped)
- # response = get(url)
- # self.assertEqual(response.url, httpbin('get/' + path_unreserved))
-
- # # Re-run all of the same tests on the query part of the URI
- # query_fully_escaped = '%3Ca%25b%23c%2Fd%3E=%C3%98%20%C3%A5'
- # url = httpbin('get/?' + query_fully_escaped)
- # response = get(url)
- # self.assertEqual(response.url, httpbin('get/?' + query_fully_escaped))
-
- # query = u'<a%25b%23c%2Fd%3E=\u00d8 %C3%A5'
- # url = httpbin('get/?' + query)
- # response = get(url)
- # self.assertEqual(response.url, httpbin('get/?' + query_fully_escaped))
-
- # # The legal characters in query happens to be the same as in path
- # query_reserved = '!$&\'()*+,/:=@-._~'
- # url = httpbin('get/?' + query_reserved)
- # response = get(url)
- # self.assertEqual(response.url, httpbin('get/?' + query_reserved))
-
- # query_unreserved = 'ABCDwxyz=1234-._~'
- # query_unreserved_escaped = '%41%42%43%44%77%78%79%7A=%31%32%33%34%2D%2E%5F%7E'
- # url = httpbin('get/?' + query_unreserved_escaped)
- # response = get(url)
- # self.assertEqual(response.url, httpbin('get/?' + query_unreserved))
-
- def test_user_agent_transfers(self):
- """Issue XX"""
-
- heads = {
- 'User-agent':
- 'Mozilla/5.0 (github.com/kennethreitz/requests)'
- }
-
- r = get(httpbin('user-agent'), headers=heads)
- self.assertTrue(heads['User-agent'] in r.text)
-
- heads = {
- 'user-agent':
- 'Mozilla/5.0 (github.com/kennethreitz/requests)'
- }
-
- r = get(httpbin('user-agent'), headers=heads)
- self.assertTrue(heads['user-agent'] in r.text)
-
- def test_HTTP_200_OK_HEAD(self):
- r = head(httpbin('get'))
- self.assertEqual(r.status_code, 200)
-
- def test_HTTP_200_OK_PUT(self):
- r = put(httpbin('put'))
- self.assertEqual(r.status_code, 200)
-
- def test_BASICAUTH_TUPLE_HTTP_200_OK_GET(self):
-
- for service in SERVICES:
-
- auth = ('user', 'pass')
- url = service('basic-auth', 'user', 'pass')
-
- r = get(url, auth=auth)
- self.assertEqual(r.status_code, 200)
-
- r = get(url)
- self.assertEqual(r.status_code, 401)
-
- s = requests.session(auth=auth)
- r = get(url, session=s)
- self.assertEqual(r.status_code, 200)
-
- def test_BASICAUTH_HTTP_200_OK_GET(self):
-
- for service in SERVICES:
-
- auth = HTTPBasicAuth('user', 'pass')
- url = service('basic-auth', 'user', 'pass')
-
- r = get(url, auth=auth)
- self.assertEqual(r.status_code, 200)
-
- auth = ('user', 'pass')
- r = get(url, auth=auth)
- self.assertEqual(r.status_code, 200)
-
- r = get(url)
- self.assertEqual(r.status_code, 401)
-
- s = requests.session(auth=auth)
- r = get(url, session=s)
- self.assertEqual(r.status_code, 200)
-
- def test_DIGESTAUTH_HTTP_200_OK_GET(self):
-
- for service in SERVICES:
-
- auth = HTTPDigestAuth('user', 'pass')
- url = service('digest-auth', 'auth', 'user', 'pass')
-
- r = get(url, auth=auth)
- self.assertEqual(r.status_code, 200)
-
- r = get(url)
- self.assertEqual(r.status_code, 401)
-
- s = requests.session(auth=auth)
- r = get(url, session=s)
- self.assertEqual(r.status_code, 200)
-
- def test_DIGESTAUTH_WRONG_HTTP_401_GET(self):
-
- for service in SERVICES:
-
- auth = HTTPDigestAuth('user', 'wrongpass')
- url = service('digest-auth', 'auth', 'user', 'pass')
-
- r = get(url, auth=auth)
- self.assertEqual(r.status_code, 401)
-
- s = requests.session(auth=auth)
- r = get(url, session=s)
- self.assertEqual(r.status_code, 401)
-
- def test_POSTBIN_GET_POST_FILES(self):
-
- for service in SERVICES:
-
- url = service('post')
- post1 = post(url).raise_for_status()
-
- post1 = post(url, data={'some': 'data'})
- self.assertEqual(post1.status_code, 200)
-
- with open(__file__) as f:
- post2 = post(url, files={'some': f})
- self.assertEqual(post2.status_code, 200)
-
- post3 = post(url, data='[{"some": "json"}]')
- self.assertEqual(post3.status_code, 200)
-
- def test_POSTBIN_GET_POST_FILES_WITH_PARAMS(self):
-
- for service in SERVICES:
-
- with open(__file__) as f:
- url = service('post')
- post1 = post(url,
- files={'some': f},
- data={'some': 'data'})
-
- self.assertEqual(post1.status_code, 200)
-
- def test_POSTBIN_GET_POST_FILES_WITH_HEADERS(self):
-
- for service in SERVICES:
-
- url = service('post')
-
- with open(__file__) as f:
-
- post2 = post(url,
- files={'some': f},
- headers={'User-Agent': 'requests-tests'})
-
- self.assertEqual(post2.status_code, 200)
-
- def test_POSTBIN_GET_POST_FILES_STRINGS(self):
-
- for service in SERVICES:
-
- url = service('post')
-
- post1 = post(url, files={'fname.txt': 'fdata'})
- self.assertEqual(post1.status_code, 200)
-
- post2 = post(url, files={'fname.txt': 'fdata', 'fname2.txt':'more fdata'})
- self.assertEqual(post2.status_code, 200)
-
- post3 = post(url, files={'fname.txt': 'fdata', 'fname2.txt':open(__file__,'rb')})
- self.assertEqual(post3.status_code, 200)
-
- post4 = post(url, files={'fname.txt': 'fdata'})
- self.assertEqual(post4.status_code, 200)
-
- post5 = post(url, files={'file': ('file.txt', 'more fdata')})
- self.assertEqual(post5.status_code, 200)
-
- post6 = post(url, files={'fname.txt': '\xe9'})
- self.assertEqual(post6.status_code, 200)
-
- post7 = post(url, files={'fname.txt': 'fdata to verify'})
- rbody = json.loads(post7.text)
- self.assertTrue(rbody.get('files', None))
- self.assertTrue(rbody['files'].get('fname.txt'), None)
- self.assertEqual(rbody['files']['fname.txt'], 'fdata to verify')
-
-
- def test_nonzero_evaluation(self):
-
- for service in SERVICES:
-
- r = get(service('status', '500'))
- self.assertEqual(bool(r), False)
-
- r = get(service('/get'))
- self.assertEqual(bool(r), True)
-
- def test_request_ok_set(self):
-
- for service in SERVICES:
-
- r = get(service('status', '404'))
- # print r.status_code
- # r.raise_for_status()
- self.assertEqual(r.ok, False)
-
- def test_status_raising(self):
- r = get(httpbin('status', '404'))
- self.assertRaises(HTTPError, r.raise_for_status)
-
- r = get(httpbin('status', '200'))
- self.assertFalse(r.error)
- r.raise_for_status()
-
- def test_default_status_raising(self):
- config = {'danger_mode': True}
- args = [httpbin('status', '404')]
- kwargs = dict(config=config)
- self.assertRaises(HTTPError, get, *args, **kwargs)
-
- r = get(httpbin('status', '200'))
- self.assertEqual(r.status_code, 200)
-
- def test_decompress_gzip(self):
-
- r = get(httpbin('gzip'))
- r.content.decode('ascii')
-
- def test_response_has_unicode_url(self):
-
- for service in SERVICES:
-
- url = service('get')
-
- response = get(url)
-
- assert isinstance(response.url, str)
-
- def test_unicode_get(self):
-
- for service in SERVICES:
-
- url = service('/get')
-
- get(url, params={'foo': 'føø'})
- get(url, params={'føø': 'føø'})
- get(url, params={'føø': 'føø'})
- get(url, params={'foo': 'foo'})
- get(service('ø'), params={'foo': 'foo'})
-
- def test_httpauth_recursion(self):
-
- http_auth = HTTPBasicAuth('user', 'BADpass')
-
- for service in SERVICES:
- r = get(service('basic-auth', 'user', 'pass'), auth=http_auth)
- self.assertEqual(r.status_code, 401)
-
- def test_urlencoded_post_data(self):
-
- for service in SERVICES:
-
- r = post(service('post'), data=dict(test='fooaowpeuf'))
-
- self.assertEqual(r.status_code, 200)
- self.assertEqual(r.headers['content-type'], 'application/json')
- self.assertEqual(r.url, service('post'))
-
- rbody = json.loads(r.text)
-
- self.assertEqual(rbody.get('form'), dict(test='fooaowpeuf'))
- self.assertEqual(rbody.get('data'), '')
-
- def test_nonurlencoded_post_data(self):
-
- for service in SERVICES:
-
- r = post(service('post'), data='fooaowpeuf')
-
- self.assertEqual(r.status_code, 200)
- self.assertEqual(r.headers['content-type'], 'application/json')
- self.assertEqual(r.url, service('post'))
-
- rbody = json.loads(r.text)
- # Body wasn't valid url encoded data, so the server returns None as
- # "form" and the raw body as "data".
-
- assert rbody.get('form') in (None, {})
- self.assertEqual(rbody.get('data'), 'fooaowpeuf')
-
- def test_urlencoded_post_querystring(self):
-
- for service in SERVICES:
-
- r = post(service('post'), params=dict(test='fooaowpeuf'))
-
- self.assertEqual(r.status_code, 200)
- self.assertEqual(r.headers['content-type'], 'application/json')
- self.assertEqual(r.url, service('post?test=fooaowpeuf'))
-
- rbody = json.loads(r.text)
- self.assertEqual(rbody.get('form'), {}) # No form supplied
- self.assertEqual(rbody.get('data'), '')
-
- def test_urlencoded_post_query_and_data(self):
-
- for service in SERVICES:
-
- r = post(
- service('post'),
- params=dict(test='fooaowpeuf'),
- data=dict(test2="foobar"))
-
- self.assertEqual(r.status_code, 200)
- self.assertEqual(r.headers['content-type'], 'application/json')
- self.assertEqual(r.url, service('post?test=fooaowpeuf'))
-
- rbody = json.loads(r.text)
- self.assertEqual(rbody.get('form'), dict(test2='foobar'))
- self.assertEqual(rbody.get('data'), '')
-
- def test_nonurlencoded_postdata(self):
-
- for service in SERVICES:
-
- r = post(service('post'), data="foobar")
-
- self.assertEqual(r.status_code, 200)
- self.assertEqual(r.headers['content-type'], 'application/json')
-
- rbody = json.loads(r.text)
-
- assert rbody.get('form') in (None, {})
- self.assertEqual(rbody.get('data'), 'foobar')
-
- def test_urlencoded_get_query_multivalued_param(self):
-
- for service in SERVICES:
-
- r = get(service('get'), params=dict(test=['foo', 'baz']))
- self.assertEqual(r.status_code, 200)
- self.assertEqual(r.url, service('get?test=foo&test=baz'))
-
- def test_urlencoded_post_querystring_multivalued(self):
-
- for service in SERVICES:
-
- r = post(service('post'), params=dict(test=['foo', 'baz']))
- self.assertEqual(r.status_code, 200)
- self.assertEqual(r.headers['content-type'], 'application/json')
- self.assertEqual(r.url, service('post?test=foo&test=baz'))
-
- rbody = json.loads(r.text)
- self.assertEqual(rbody.get('form'), {}) # No form supplied
- self.assertEqual(rbody.get('data'), '')
-
- def test_urlencoded_post_query_multivalued_and_data(self):
-
- for service in SERVICES:
-
- r = post(
- service('post'),
- params=dict(test=['foo', 'baz']),
- data=dict(test2="foobar", test3=['foo', 'baz']))
-
- self.assertEqual(r.status_code, 200)
- self.assertEqual(r.headers['content-type'], 'application/json')
- self.assertEqual(r.url, service('post?test=foo&test=baz'))
-
- # print(r.text)
- # print('-----------------------')
-
- rbody = json.loads(r.text)
- self.assertEqual(rbody.get('form'), dict(test2='foobar', test3=['foo', 'baz']))
- self.assertEqual(rbody.get('data'), '')
-
- def test_GET_no_redirect(self):
-
- for service in SERVICES:
-
- r = get(service('redirect', '3'), allow_redirects=False)
- self.assertEqual(r.status_code, 302)
- self.assertEqual(len(r.history), 0)
-
- def test_HEAD_no_redirect(self):
-
- for service in SERVICES:
-
- r = head(service('redirect', '3'), allow_redirects=False)
- self.assertEqual(r.status_code, 302)
- self.assertEqual(len(r.history), 0)
-
- def test_redirect_history(self):
-
- for service in SERVICES:
-
- r = get(service('redirect', '3'))
- self.assertEqual(r.status_code, 200)
- self.assertEqual(len(r.history), 3)
-
- def test_relative_redirect_history(self):
-
- for service in SERVICES:
-
- r = get(service('relative-redirect', '3'))
- self.assertEqual(r.status_code, 200)
- self.assertEqual(len(r.history), 3)
-
- def test_session_HTTP_200_OK_GET(self):
-
- s = requests.session()
- r = get(httpbin('get'), session=s)
- self.assertEqual(r.status_code, 200)
-
- def test_session_persistent_headers(self):
-
- heads = {'User-agent': 'Mozilla/5.0'}
-
- s = requests.session()
- s.headers = heads
-
- # Make 2 requests from Session object, should send header both times
- r1 = get(httpbin('user-agent'), session=s)
- assert heads['User-agent'] in r1.text
-
- r2 = get(httpbin('user-agent'), session=s)
- assert heads['User-agent'] in r2.text
-
- new_heads = {'User-agent': 'blah'}
- r3 = get(httpbin('user-agent'), headers=new_heads, session=s)
- assert new_heads['User-agent'] in r3.text
-
- self.assertEqual(r2.status_code, 200)
-
- def test_single_hook(self):
-
- def add_foo_header(args):
- if not args.get('headers'):
- args['headers'] = {}
-
- args['headers'].update({
- 'X-Foo': 'foo'
- })
-
- return args
-
- for service in SERVICES:
- url = service('headers')
- response = get(url=url, hooks={'args': add_foo_header})
-
- assert 'foo' in response.text
-
- def test_multiple_hooks(self):
-
- def add_foo_header(args):
- if not args.get('headers'):
- args['headers'] = {}
-
- args['headers'].update({
- 'X-Foo': 'foo'
- })
-
- return args
-
- def add_bar_header(args):
- if not args.get('headers'):
- args['headers'] = {}
-
- args['headers'].update({
- 'X-Bar': 'bar'
- })
-
- return args
-
- for service in SERVICES:
- url = service('headers')
-
- response = get(url=url,
- hooks={
- 'args': [add_foo_header, add_bar_header]
- }
- )
-
- assert 'foo' in response.text
- assert 'bar' in response.text
-
- def test_session_persistent_cookies(self):
-
- s = requests.session()
-
- # Internally dispatched cookies are sent.
- _c = {'kenneth': 'reitz', 'bessie': 'monke'}
- r = get(httpbin('cookies'), cookies=_c, session=s)
- r = get(httpbin('cookies'), session=s)
-
- # Those cookies persist transparently.
- c = json.loads(r.text).get('cookies')
- self.assertEqual(c, _c)
-
- # Double check.
- r = get(httpbin('cookies'), cookies={}, session=s)
- c = json.loads(r.text).get('cookies')
- self.assertEqual(c, _c)
-
- # Remove a cookie by setting it's value to None.
- r = get(httpbin('cookies'), cookies={'bessie': None}, session=s)
- c = json.loads(r.text).get('cookies')
- del _c['bessie']
- self.assertEqual(c, _c)
-
- # Test session-level cookies.
- s = requests.session(cookies=_c)
- r = get(httpbin('cookies'), session=s)
- c = json.loads(r.text).get('cookies')
- self.assertEqual(c, _c)
-
- # Have the server set a cookie.
- r = get(httpbin('cookies', 'set', 'k', 'v'), allow_redirects=True, session=s)
- c = json.loads(r.text).get('cookies')
-
- assert 'k' in c
-
- # And server-set cookie persistience.
- r = get(httpbin('cookies'), session=s)
- c = json.loads(r.text).get('cookies')
-
- assert 'k' in c
-
- def test_session_persistent_params(self):
-
- params = {'a': 'a_test'}
-
- s = requests.session()
- s.params = params
-
- # Make 2 requests from Session object, should send header both times
- r1 = get(httpbin('get'), session=s)
- assert params['a'] in r1.text
-
- params2 = {'b': 'b_test'}
-
- r2 = get(httpbin('get'), params=params2, session=s)
- assert params['a'] in r2.text
- assert params2['b'] in r2.text
-
- params3 = {'b': 'b_test', 'a': None, 'c': 'c_test'}
-
- r3 = get(httpbin('get'), params=params3, session=s)
-
- assert not params['a'] in r3.text
- assert params3['b'] in r3.text
- assert params3['c'] in r3.text
-
- def test_session_pickling(self):
-
- s = requests.session(
- headers={'header': 'value'},
- cookies={'a-cookie': 'cookie-value'},
- auth=('username', 'password'))
-
- ds = pickle.loads(pickle.dumps(s))
-
- self.assertEqual(s.headers, ds.headers)
- self.assertEqual(s.auth, ds.auth)
-
- # Cookie doesn't have a good __eq__, so verify manually:
- self.assertEqual(len(ds.cookies), 1)
- for cookie in ds.cookies:
- self.assertCookieHas(cookie, name='a-cookie', value='cookie-value')
-
- def test_unpickled_session_requests(self):
- s = requests.session()
- r = get(httpbin('cookies', 'set', 'k', 'v'), allow_redirects=True, session=s)
- c = json.loads(r.text).get('cookies')
- assert 'k' in c
-
- ds = pickle.loads(pickle.dumps(s))
- r = get(httpbin('cookies'), session=ds)
- c = json.loads(r.text).get('cookies')
- assert 'k' in c
-
- ds1 = pickle.loads(pickle.dumps(requests.session()))
- ds2 = pickle.loads(pickle.dumps(requests.session(prefetch=True)))
- assert not ds1.prefetch
- assert ds2.prefetch
-
- def test_invalid_content(self):
- # WARNING: if you're using a terrible DNS provider (comcast),
- # this will fail.
- try:
- hah = 'http://somedomainthatclearlydoesntexistg.com'
- r = get(hah, allow_redirects=False)
- except requests.ConnectionError:
- pass # \o/
- else:
- assert False
-
- config = {'safe_mode': True}
- r = get(hah, allow_redirects=False, config=config)
- assert r.content == None
-
- def test_cached_response(self):
-
- r1 = get(httpbin('get'), prefetch=False)
- assert not r1._content
- assert r1.content
- assert r1.text
-
- r2 = get(httpbin('get'), prefetch=True)
- assert r2._content
- assert r2.content
- assert r2.text
-
- def test_iter_lines(self):
-
- lines = (0, 2, 10, 100)
-
- for i in lines:
- r = get(httpbin('stream', str(i)), prefetch=False)
- lines = list(r.iter_lines())
- len_lines = len(lines)
-
- self.assertEqual(i, len_lines)
-
- # Tests that trailing whitespaces within lines do not get stripped.
- # Tests that a trailing non-terminated line does not get stripped.
- quote = (
- '''Agamemnon \n'''
- '''\tWhy will he not upon our fair request\r\n'''
- '''\tUntent his person and share the air with us?'''
- )
-
- # Make a request and monkey-patch its contents
- r = get(httpbin('get'))
- r.raw = StringIO(quote)
-
- lines = list(r.iter_lines())
- len_lines = len(lines)
- self.assertEqual(len_lines, 3)
-
- joined = lines[0] + '\n' + lines[1] + '\r\n' + lines[2]
- self.assertEqual(joined, quote)
-
- def test_safe_mode(self):
-
- safe = requests.session(config=dict(safe_mode=True))
-
- # Safe mode creates empty responses for failed requests.
- # Iterating on these responses should produce empty sequences
- r = get('http://_/', session=safe)
- self.assertEqual(list(r.iter_lines()), [])
- assert isinstance(r.error, requests.exceptions.ConnectionError)
-
- r = get('http://_/', session=safe)
- self.assertEqual(list(r.iter_content()), [])
- assert isinstance(r.error, requests.exceptions.ConnectionError)
-
- # When not in safe mode, should raise Timeout exception
- self.assertRaises(
- requests.exceptions.Timeout,
- get,
- httpbin('stream', '1000'), timeout=0.0001)
-
- # In safe mode, should return a blank response
- r = get(httpbin('stream', '1000'), timeout=0.0001,
- config=dict(safe_mode=True))
- assert r.content is None
- assert isinstance(r.error, requests.exceptions.Timeout)
-
- def test_upload_binary_data(self):
-
- requests.get(httpbin('post'), auth=('a', 'b'), data='\xff')
-
- def test_useful_exception_for_invalid_port(self):
- # If we pass a legitimate URL with an invalid port, we should fail.
- self.assertRaises(
- ValueError,
- get,
- 'http://google.com:banana')
-
- def test_useful_exception_for_invalid_scheme(self):
-
- # If we pass a legitimate URL with a scheme not supported
- # by requests, we should fail.
- self.assertRaises(
- ValueError,
- get,
- 'ftp://ftp.kernel.org/pub/')
-
- def test_can_have_none_in_header_values(self):
- try:
- # Don't choke on headers with none in the value.
- requests.get(httpbin('headers'), headers={'Foo': None})
- except TypeError:
- self.fail()
-
- def test_danger_mode_redirects(self):
- s = requests.session()
- s.config['danger_mode'] = True
- s.get(httpbin('redirect', '4'))
-
-
- def test_empty_response(self):
- r = requests.get(httpbin('status', '404'))
- r.text
-
- def test_max_redirects(self):
- """Test the max_redirects config variable, normally and under safe_mode."""
- def unsafe_callable():
- requests.get(httpbin('redirect', '3'), config=dict(max_redirects=2))
- self.assertRaises(requests.exceptions.TooManyRedirects, unsafe_callable)
-
- # add safe mode
- response = requests.get(httpbin('redirect', '3'), config=dict(safe_mode=True, max_redirects=2))
- self.assertTrue(response.content is None)
- self.assertTrue(isinstance(response.error, requests.exceptions.TooManyRedirects))
-
- def test_connection_keepalive_and_close(self):
- """Test that we send 'Connection: close' when keep_alive is disabled."""
- # keep-alive should be on by default
- r1 = requests.get(httpbin('get'))
- # XXX due to proxying issues, test the header sent back by httpbin, rather than
- # the header reported in its message body. See kennethreitz/httpbin#46
- self.assertEqual(r1.headers['Connection'].lower(), 'keep-alive')
-
- # but when we disable it, we should send a 'Connection: close'
- # and get the same back:
- r2 = requests.get(httpbin('get'), config=dict(keep_alive=False))
- self.assertEqual(r2.headers['Connection'].lower(), 'close')
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/tests/test_requests_async.py b/tests/test_requests_async.py
deleted file mode 100755
index 3a5a762..0000000
--- a/tests/test_requests_async.py
+++ /dev/null
@@ -1,67 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-# Path hack.
-import sys, os
-sys.path.insert(0, os.path.abspath('..'))
-
-import sys
-import unittest
-
-import select
-has_poll = hasattr(select, "poll")
-
-from requests import async
-
-sys.path.append('.')
-from test_requests import httpbin, RequestsTestSuite, SERVICES
-
-
-class RequestsTestSuiteUsingAsyncApi(RequestsTestSuite):
- """Requests async test cases."""
-
- def patched(f):
- """Automatically send request after creation."""
-
- def wrapped(*args, **kwargs):
-
- request = f(*args, **kwargs)
- return async.map([request])[0]
-
- return wrapped
-
- # Patched requests.api functions.
- global request
- request = patched(async.request)
-
- global delete, get, head, options, patch, post, put
- delete = patched(async.delete)
- get = patched(async.get)
- head = patched(async.head)
- options = patched(async.options)
- patch = patched(async.patch)
- post = patched(async.post)
- put = patched(async.put)
-
-
- def test_entry_points(self):
-
- async.request
-
- async.delete
- async.get
- async.head
- async.options
- async.patch
- async.post
- async.put
-
- async.map
- async.send
-
- def test_select_poll(self):
- """Test to make sure we don't overwrite the poll"""
- self.assertEqual(hasattr(select, "poll"), has_poll)
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/tests/test_requests_ext.py b/tests/test_requests_ext.py
deleted file mode 100644
index 883bdce..0000000
--- a/tests/test_requests_ext.py
+++ /dev/null
@@ -1,131 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-# Path hack.
-import sys, os
-sys.path.insert(0, os.path.abspath('..'))
-
-import unittest
-
-import requests
-from requests.compat import is_py2, is_py3
-
-try:
- import omnijson as json
-except ImportError:
- import json
-
-
-class RequestsTestSuite(unittest.TestCase):
- """Requests test cases."""
-
- # It goes to eleven.
- _multiprocess_can_split_ = True
-
- def test_addition(self):
- assert (1 + 1) == 2
-
-
- def test_ssl_hostname_ok(self):
- requests.get('https://github.com', verify=True)
-
-
- def test_ssl_hostname_not_ok(self):
- requests.get('https://kennethreitz.com', verify=False)
-
- self.assertRaises(requests.exceptions.SSLError, requests.get, 'https://kennethreitz.com')
-
-
- def test_ssl_hostname_session_not_ok(self):
-
- s = requests.session()
-
- self.assertRaises(requests.exceptions.SSLError, s.get, 'https://kennethreitz.com')
-
- s.get('https://kennethreitz.com', verify=False)
-
-
- def test_binary_post(self):
- '''We need to be careful how we build the utf-8 string since
- unicode literals are a syntax error in python3
- '''
-
- if is_py2:
- # Blasphemy!
- utf8_string = eval("u'Smörgås'.encode('utf-8')")
- elif is_py3:
- utf8_string = 'Smörgås'.encode('utf-8')
- else:
- raise EnvironmentError('Flesh out this test for your environment.')
- requests.post('http://www.google.com/', data=utf8_string)
-
-
-
- def test_unicode_error(self):
- url = 'http://blip.fm/~1abvfu'
- requests.get(url)
-
-
- def test_chunked_head_redirect(self):
- url = "http://t.co/NFrx0zLG"
- r = requests.head(url, allow_redirects=True)
- self.assertEqual(r.status_code, 200)
-
- def test_unicode_redirect(self):
- '''This url redirects to a location that has a nonstandard
- character in it, that breaks requests in python2.7
-
- After some research, the cause was identified as an unintended
- sideeffect of overriding of str with unicode.
-
- In the case that the redirected url is actually a malformed
- "bytes" object, i.e. a string with character c where
- ord(c) > 127,
- then unicode(url) breaks.
- '''
- r = requests.get('http://www.marketwire.com/mw/release.' +
- 'do?id=1628202&sourceType=3')
- assert r.ok
-
- def test_unicode_url_outright(self):
- '''This url visits in my browser'''
- r = requests.get('http://www.marketwire.com/press-release/' +
- 'jp-morgan-behauptet-sich-der-spitze-euro' +
- 'p%C3%A4ischer-anleihe-analysten-laut-umf' +
- 'rageergebnissen-1628202.htm')
- assert r.ok
-
- def test_redirect_encoding(self):
- '''This url redirects to
- http://www.dealipedia.com/deal_view_investment.php?r=20012'''
-
- r = requests.get('http://feedproxy.google.com/~r/Dealipedia' +
- 'News/~3/BQtUJRJeZlo/deal_view_investment.' +
- 'php')
- assert r.ok
-
- def test_cookies_on_redirects(self):
- """Test interaction between cookie handling and redirection."""
- # get a cookie for tinyurl.com ONLY
- s = requests.session()
- s.get(url='http://tinyurl.com/preview.php?disable=1')
- # we should have set a cookie for tinyurl: preview=0
- self.assertIn('preview', s.cookies)
- self.assertEqual(s.cookies['preview'], '0')
- self.assertEqual(list(s.cookies)[0].name, 'preview')
- self.assertEqual(list(s.cookies)[0].domain, 'tinyurl.com')
-
- # get cookies on another domain
- r2 = s.get(url='http://httpbin.org/cookies')
- # the cookie is not there
- self.assertNotIn('preview', json.loads(r2.text)['cookies'])
-
- # this redirects to another domain, httpbin.org
- # cookies of the first domain should NOT be sent to the next one
- r3 = s.get(url='http://tinyurl.com/7zp3jnr')
- assert r3.url == 'http://httpbin.org/cookies'
- self.assertNotIn('preview', json.loads(r2.text)['cookies'])
-
-if __name__ == '__main__':
- unittest.main()
-
diff --git a/tests/test_requests_https.py b/tests/test_requests_https.py
deleted file mode 100755
index c6ea8f3..0000000
--- a/tests/test_requests_https.py
+++ /dev/null
@@ -1,31 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-import sys, os
-import json
-import unittest
-
-# Path hack.
-sys.path.insert(0, os.path.abspath('..'))
-import requests
-
-class HTTPSTest(unittest.TestCase):
- """Smoke test for https functionality."""
-
- smoke_url = "https://github.com"
-
- def perform_smoke_test(self, verify=False):
- result = requests.get(self.smoke_url, verify=verify)
- self.assertEqual(result.status_code, 200)
-
- def test_smoke(self):
- """Smoke test without verification."""
- self.perform_smoke_test(verify=False)
-
- def test_smoke_verified(self):
- """Smoke test with SSL verification."""
- self.perform_smoke_test(verify=True)
-
-
-if __name__ == '__main__':
- unittest.main()