| """Unittests for the various HTTPServer modules. |
| |
| Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>, |
| Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest. |
| """ |
| |
| from http.server import BaseHTTPRequestHandler, HTTPServer, \ |
| SimpleHTTPRequestHandler, CGIHTTPRequestHandler |
| |
| import os |
| import sys |
| import base64 |
| import shutil |
| import urllib.parse |
| import http.client |
| import tempfile |
| import threading |
| |
| import unittest |
| from test import support |
| |
| class NoLogRequestHandler: |
| def log_message(self, *args): |
| # don't write log messages to stderr |
| pass |
| |
| def read(self, n=None): |
| return '' |
| |
| |
| class TestServerThread(threading.Thread): |
| def __init__(self, test_object, request_handler): |
| threading.Thread.__init__(self) |
| self.request_handler = request_handler |
| self.test_object = test_object |
| self.test_object.lock.acquire() |
| |
| def run(self): |
| self.server = HTTPServer(('', 0), self.request_handler) |
| self.test_object.PORT = self.server.socket.getsockname()[1] |
| self.test_object.lock.release() |
| try: |
| self.server.serve_forever() |
| finally: |
| self.server.server_close() |
| |
| def stop(self): |
| self.server.shutdown() |
| |
| |
| class BaseTestCase(unittest.TestCase): |
| def setUp(self): |
| self.lock = threading.Lock() |
| self.thread = TestServerThread(self, self.request_handler) |
| self.thread.start() |
| self.lock.acquire() |
| |
| def tearDown(self): |
| self.lock.release() |
| self.thread.stop() |
| |
| def request(self, uri, method='GET', body=None, headers={}): |
| self.connection = http.client.HTTPConnection('localhost', self.PORT) |
| self.connection.request(method, uri, body, headers) |
| return self.connection.getresponse() |
| |
| |
| class BaseHTTPServerTestCase(BaseTestCase): |
| class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): |
| protocol_version = 'HTTP/1.1' |
| default_request_version = 'HTTP/1.1' |
| |
| def do_TEST(self): |
| self.send_response(204) |
| self.send_header('Content-Type', 'text/html') |
| self.send_header('Connection', 'close') |
| self.end_headers() |
| |
| def do_KEEP(self): |
| self.send_response(204) |
| self.send_header('Content-Type', 'text/html') |
| self.send_header('Connection', 'keep-alive') |
| self.end_headers() |
| |
| def do_KEYERROR(self): |
| self.send_error(999) |
| |
| def do_CUSTOM(self): |
| self.send_response(999) |
| self.send_header('Content-Type', 'text/html') |
| self.send_header('Connection', 'close') |
| self.end_headers() |
| |
| def setUp(self): |
| BaseTestCase.setUp(self) |
| self.con = http.client.HTTPConnection('localhost', self.PORT) |
| self.con.connect() |
| |
| def test_command(self): |
| self.con.request('GET', '/') |
| res = self.con.getresponse() |
| self.assertEquals(res.status, 501) |
| |
| def test_request_line_trimming(self): |
| self.con._http_vsn_str = 'HTTP/1.1\n' |
| self.con.putrequest('GET', '/') |
| self.con.endheaders() |
| res = self.con.getresponse() |
| self.assertEquals(res.status, 501) |
| |
| def test_version_bogus(self): |
| self.con._http_vsn_str = 'FUBAR' |
| self.con.putrequest('GET', '/') |
| self.con.endheaders() |
| res = self.con.getresponse() |
| self.assertEquals(res.status, 400) |
| |
| def test_version_digits(self): |
| self.con._http_vsn_str = 'HTTP/9.9.9' |
| self.con.putrequest('GET', '/') |
| self.con.endheaders() |
| res = self.con.getresponse() |
| self.assertEquals(res.status, 400) |
| |
| def test_version_none_get(self): |
| self.con._http_vsn_str = '' |
| self.con.putrequest('GET', '/') |
| self.con.endheaders() |
| res = self.con.getresponse() |
| self.assertEquals(res.status, 501) |
| |
| def test_version_none(self): |
| self.con._http_vsn_str = '' |
| self.con.putrequest('PUT', '/') |
| self.con.endheaders() |
| res = self.con.getresponse() |
| self.assertEquals(res.status, 400) |
| |
| def test_version_invalid(self): |
| self.con._http_vsn = 99 |
| self.con._http_vsn_str = 'HTTP/9.9' |
| self.con.putrequest('GET', '/') |
| self.con.endheaders() |
| res = self.con.getresponse() |
| self.assertEquals(res.status, 505) |
| |
| def test_send_blank(self): |
| self.con._http_vsn_str = '' |
| self.con.putrequest('', '') |
| self.con.endheaders() |
| res = self.con.getresponse() |
| self.assertEquals(res.status, 400) |
| |
| def test_header_close(self): |
| self.con.putrequest('GET', '/') |
| self.con.putheader('Connection', 'close') |
| self.con.endheaders() |
| res = self.con.getresponse() |
| self.assertEquals(res.status, 501) |
| |
| def test_head_keep_alive(self): |
| self.con._http_vsn_str = 'HTTP/1.1' |
| self.con.putrequest('GET', '/') |
| self.con.putheader('Connection', 'keep-alive') |
| self.con.endheaders() |
| res = self.con.getresponse() |
| self.assertEquals(res.status, 501) |
| |
| def test_handler(self): |
| self.con.request('TEST', '/') |
| res = self.con.getresponse() |
| self.assertEquals(res.status, 204) |
| |
| def test_return_header_keep_alive(self): |
| self.con.request('KEEP', '/') |
| res = self.con.getresponse() |
| self.assertEquals(res.getheader('Connection'), 'keep-alive') |
| self.con.request('TEST', '/') |
| |
| def test_internal_key_error(self): |
| self.con.request('KEYERROR', '/') |
| res = self.con.getresponse() |
| self.assertEquals(res.status, 999) |
| |
| def test_return_custom_status(self): |
| self.con.request('CUSTOM', '/') |
| res = self.con.getresponse() |
| self.assertEquals(res.status, 999) |
| |
| |
| class SimpleHTTPServerTestCase(BaseTestCase): |
| class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler): |
| pass |
| |
| def setUp(self): |
| BaseTestCase.setUp(self) |
| self.cwd = os.getcwd() |
| basetempdir = tempfile.gettempdir() |
| os.chdir(basetempdir) |
| self.data = b'We are the knights who say Ni!' |
| self.tempdir = tempfile.mkdtemp(dir=basetempdir) |
| self.tempdir_name = os.path.basename(self.tempdir) |
| temp = open(os.path.join(self.tempdir, 'test'), 'wb') |
| temp.write(self.data) |
| temp.close() |
| |
| def tearDown(self): |
| try: |
| os.chdir(self.cwd) |
| try: |
| shutil.rmtree(self.tempdir) |
| except: |
| pass |
| finally: |
| BaseTestCase.tearDown(self) |
| |
| def check_status_and_reason(self, response, status, data=None): |
| body = response.read() |
| self.assert_(response) |
| self.assertEquals(response.status, status) |
| self.assert_(response.reason != None) |
| if data: |
| self.assertEqual(data, body) |
| |
| def test_get(self): |
| #constructs the path relative to the root directory of the HTTPServer |
| response = self.request(self.tempdir_name + '/test') |
| self.check_status_and_reason(response, 200, data=self.data) |
| response = self.request(self.tempdir_name + '/') |
| self.check_status_and_reason(response, 200) |
| response = self.request(self.tempdir_name) |
| self.check_status_and_reason(response, 301) |
| response = self.request('/ThisDoesNotExist') |
| self.check_status_and_reason(response, 404) |
| response = self.request('/' + 'ThisDoesNotExist' + '/') |
| self.check_status_and_reason(response, 404) |
| f = open(os.path.join(self.tempdir_name, 'index.html'), 'w') |
| response = self.request('/' + self.tempdir_name + '/') |
| self.check_status_and_reason(response, 200) |
| if os.name == 'posix': |
| # chmod won't work as expected on Windows platforms |
| os.chmod(self.tempdir, 0) |
| response = self.request(self.tempdir_name + '/') |
| self.check_status_and_reason(response, 404) |
| os.chmod(self.tempdir, 0o755) |
| |
| def test_head(self): |
| response = self.request( |
| self.tempdir_name + '/test', method='HEAD') |
| self.check_status_and_reason(response, 200) |
| self.assertEqual(response.getheader('content-length'), |
| str(len(self.data))) |
| self.assertEqual(response.getheader('content-type'), |
| 'application/octet-stream') |
| |
| def test_invalid_requests(self): |
| response = self.request('/', method='FOO') |
| self.check_status_and_reason(response, 501) |
| # requests must be case sensitive,so this should fail too |
| response = self.request('/', method='get') |
| self.check_status_and_reason(response, 501) |
| response = self.request('/', method='GETs') |
| self.check_status_and_reason(response, 501) |
| |
| |
| cgi_file1 = """\ |
| #!%s |
| |
| print("Content-type: text/html") |
| print() |
| print("Hello World") |
| """ |
| |
| cgi_file2 = """\ |
| #!%s |
| import cgi |
| |
| print("Content-type: text/html") |
| print() |
| |
| form = cgi.FieldStorage() |
| print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),\ |
| form.getfirst("bacon"))) |
| """ |
| |
| class CGIHTTPServerTestCase(BaseTestCase): |
| class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler): |
| pass |
| |
| def setUp(self): |
| BaseTestCase.setUp(self) |
| self.parent_dir = tempfile.mkdtemp() |
| self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin') |
| os.mkdir(self.cgi_dir) |
| |
| self.file1_path = os.path.join(self.cgi_dir, 'file1.py') |
| with open(self.file1_path, 'w') as file1: |
| file1.write(cgi_file1 % sys.executable) |
| os.chmod(self.file1_path, 0o777) |
| |
| self.file2_path = os.path.join(self.cgi_dir, 'file2.py') |
| with open(self.file2_path, 'w') as file2: |
| file2.write(cgi_file2 % sys.executable) |
| os.chmod(self.file2_path, 0o777) |
| |
| self.cwd = os.getcwd() |
| os.chdir(self.parent_dir) |
| |
| def tearDown(self): |
| try: |
| os.chdir(self.cwd) |
| os.remove(self.file1_path) |
| os.remove(self.file2_path) |
| os.rmdir(self.cgi_dir) |
| os.rmdir(self.parent_dir) |
| finally: |
| BaseTestCase.tearDown(self) |
| |
| def test_headers_and_content(self): |
| res = self.request('/cgi-bin/file1.py') |
| self.assertEquals((b'Hello World\n', 'text/html', 200), \ |
| (res.read(), res.getheader('Content-type'), res.status)) |
| |
| def test_post(self): |
| params = urllib.parse.urlencode( |
| {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456}) |
| headers = {'Content-type' : 'application/x-www-form-urlencoded'} |
| res = self.request('/cgi-bin/file2.py', 'POST', params, headers) |
| |
| self.assertEquals(res.read(), b'1, python, 123456\n') |
| |
| def test_invaliduri(self): |
| res = self.request('/cgi-bin/invalid') |
| res.read() |
| self.assertEquals(res.status, 404) |
| |
| def test_authorization(self): |
| headers = {b'Authorization' : b'Basic ' + |
| base64.b64encode(b'username:pass')} |
| res = self.request('/cgi-bin/file1.py', 'GET', headers=headers) |
| self.assertEquals((b'Hello World\n', 'text/html', 200), \ |
| (res.read(), res.getheader('Content-type'), res.status)) |
| |
| |
| def test_main(verbose=None): |
| try: |
| cwd = os.getcwd() |
| support.run_unittest(BaseHTTPServerTestCase, |
| SimpleHTTPServerTestCase, |
| CGIHTTPServerTestCase |
| ) |
| finally: |
| os.chdir(cwd) |
| |
| if __name__ == '__main__': |
| test_main() |