blob: 02be57a3b2b2456c0975c52dba89f434b6e48d99 [file] [log] [blame]
Georg Brandlb533e262008-05-25 18:19:30 +00001"""Unittests for the various HTTPServer modules.
2
3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
5"""
6
Georg Brandl24420152008-05-26 16:32:26 +00007from http.server import BaseHTTPRequestHandler, HTTPServer, \
8 SimpleHTTPRequestHandler, CGIHTTPRequestHandler
Georg Brandlb533e262008-05-25 18:19:30 +00009
10import os
11import sys
12import base64
13import shutil
14import urllib
Georg Brandl24420152008-05-26 16:32:26 +000015import http.client
Georg Brandlb533e262008-05-25 18:19:30 +000016import tempfile
17import threading
18
19import unittest
20from test import support
21
22
23class NoLogRequestHandler:
24 def log_message(self, *args):
25 # don't write log messages to stderr
26 pass
27
28
29class TestServerThread(threading.Thread):
30 def __init__(self, test_object, request_handler):
31 threading.Thread.__init__(self)
32 self.request_handler = request_handler
33 self.test_object = test_object
34 self.test_object.lock.acquire()
35
36 def run(self):
37 self.server = HTTPServer(('', 0), self.request_handler)
38 self.test_object.PORT = self.server.socket.getsockname()[1]
39 self.test_object.lock.release()
40 try:
41 self.server.serve_forever()
42 finally:
43 self.server.server_close()
44
45 def stop(self):
46 self.server.shutdown()
47
48
49class BaseTestCase(unittest.TestCase):
50 def setUp(self):
51 self.lock = threading.Lock()
52 self.thread = TestServerThread(self, self.request_handler)
53 self.thread.start()
54 self.lock.acquire()
55
56 def tearDown(self):
57 self.lock.release()
58 self.thread.stop()
59
60 def request(self, uri, method='GET', body=None, headers={}):
Georg Brandl24420152008-05-26 16:32:26 +000061 self.connection = http.client.HTTPConnection('localhost', self.PORT)
Georg Brandlb533e262008-05-25 18:19:30 +000062 self.connection.request(method, uri, body, headers)
63 return self.connection.getresponse()
64
65
66class BaseHTTPServerTestCase(BaseTestCase):
67 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
68 protocol_version = 'HTTP/1.1'
69 default_request_version = 'HTTP/1.1'
70
71 def do_TEST(self):
72 self.send_response(204)
73 self.send_header('Content-Type', 'text/html')
74 self.send_header('Connection', 'close')
75 self.end_headers()
76
77 def do_KEEP(self):
78 self.send_response(204)
79 self.send_header('Content-Type', 'text/html')
80 self.send_header('Connection', 'keep-alive')
81 self.end_headers()
82
83 def do_KEYERROR(self):
84 self.send_error(999)
85
86 def do_CUSTOM(self):
87 self.send_response(999)
88 self.send_header('Content-Type', 'text/html')
89 self.send_header('Connection', 'close')
90 self.end_headers()
91
92 def setUp(self):
93 BaseTestCase.setUp(self)
Georg Brandl24420152008-05-26 16:32:26 +000094 self.con = http.client.HTTPConnection('localhost', self.PORT)
Georg Brandlb533e262008-05-25 18:19:30 +000095 self.con.connect()
96
97 def test_command(self):
98 self.con.request('GET', '/')
99 res = self.con.getresponse()
100 self.assertEquals(res.status, 501)
101
102 def test_request_line_trimming(self):
103 self.con._http_vsn_str = 'HTTP/1.1\n'
104 self.con.putrequest('GET', '/')
105 self.con.endheaders()
106 res = self.con.getresponse()
107 self.assertEquals(res.status, 501)
108
109 def test_version_bogus(self):
110 self.con._http_vsn_str = 'FUBAR'
111 self.con.putrequest('GET', '/')
112 self.con.endheaders()
113 res = self.con.getresponse()
114 self.assertEquals(res.status, 400)
115
116 def test_version_digits(self):
117 self.con._http_vsn_str = 'HTTP/9.9.9'
118 self.con.putrequest('GET', '/')
119 self.con.endheaders()
120 res = self.con.getresponse()
121 self.assertEquals(res.status, 400)
122
123 def test_version_none_get(self):
124 self.con._http_vsn_str = ''
125 self.con.putrequest('GET', '/')
126 self.con.endheaders()
127 res = self.con.getresponse()
128 self.assertEquals(res.status, 501)
129
130 def test_version_none(self):
131 self.con._http_vsn_str = ''
132 self.con.putrequest('PUT', '/')
133 self.con.endheaders()
134 res = self.con.getresponse()
135 self.assertEquals(res.status, 400)
136
137 def test_version_invalid(self):
138 self.con._http_vsn = 99
139 self.con._http_vsn_str = 'HTTP/9.9'
140 self.con.putrequest('GET', '/')
141 self.con.endheaders()
142 res = self.con.getresponse()
143 self.assertEquals(res.status, 505)
144
145 def test_send_blank(self):
146 self.con._http_vsn_str = ''
147 self.con.putrequest('', '')
148 self.con.endheaders()
149 res = self.con.getresponse()
150 self.assertEquals(res.status, 400)
151
152 def test_header_close(self):
153 self.con.putrequest('GET', '/')
154 self.con.putheader('Connection', 'close')
155 self.con.endheaders()
156 res = self.con.getresponse()
157 self.assertEquals(res.status, 501)
158
159 def test_head_keep_alive(self):
160 self.con._http_vsn_str = 'HTTP/1.1'
161 self.con.putrequest('GET', '/')
162 self.con.putheader('Connection', 'keep-alive')
163 self.con.endheaders()
164 res = self.con.getresponse()
165 self.assertEquals(res.status, 501)
166
167 def test_handler(self):
168 self.con.request('TEST', '/')
169 res = self.con.getresponse()
170 self.assertEquals(res.status, 204)
171
172 def test_return_header_keep_alive(self):
173 self.con.request('KEEP', '/')
174 res = self.con.getresponse()
175 self.assertEquals(res.getheader('Connection'), 'keep-alive')
176 self.con.request('TEST', '/')
177
178 def test_internal_key_error(self):
179 self.con.request('KEYERROR', '/')
180 res = self.con.getresponse()
181 self.assertEquals(res.status, 999)
182
183 def test_return_custom_status(self):
184 self.con.request('CUSTOM', '/')
185 res = self.con.getresponse()
186 self.assertEquals(res.status, 999)
187
188
189class SimpleHTTPServerTestCase(BaseTestCase):
190 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
191 pass
192
193 def setUp(self):
194 BaseTestCase.setUp(self)
195 self.cwd = os.getcwd()
196 basetempdir = tempfile.gettempdir()
197 os.chdir(basetempdir)
198 self.data = b'We are the knights who say Ni!'
199 self.tempdir = tempfile.mkdtemp(dir=basetempdir)
200 self.tempdir_name = os.path.basename(self.tempdir)
201 temp = open(os.path.join(self.tempdir, 'test'), 'wb')
202 temp.write(self.data)
203 temp.close()
204
205 def tearDown(self):
206 try:
207 os.chdir(self.cwd)
208 try:
209 shutil.rmtree(self.tempdir)
210 except:
211 pass
212 finally:
213 BaseTestCase.tearDown(self)
214
215 def check_status_and_reason(self, response, status, data=None):
216 body = response.read()
217 self.assert_(response)
218 self.assertEquals(response.status, status)
219 self.assert_(response.reason != None)
220 if data:
221 self.assertEqual(data, body)
222
223 def test_get(self):
224 #constructs the path relative to the root directory of the HTTPServer
225 response = self.request(self.tempdir_name + '/test')
226 self.check_status_and_reason(response, 200, data=self.data)
227 response = self.request(self.tempdir_name + '/')
228 self.check_status_and_reason(response, 200)
229 response = self.request(self.tempdir_name)
230 self.check_status_and_reason(response, 301)
231 response = self.request('/ThisDoesNotExist')
232 self.check_status_and_reason(response, 404)
233 response = self.request('/' + 'ThisDoesNotExist' + '/')
234 self.check_status_and_reason(response, 404)
235 f = open(os.path.join(self.tempdir_name, 'index.html'), 'w')
236 response = self.request('/' + self.tempdir_name + '/')
237 self.check_status_and_reason(response, 200)
238 if os.name == 'posix':
239 # chmod won't work as expected on Windows platforms
240 os.chmod(self.tempdir, 0)
241 response = self.request(self.tempdir_name + '/')
242 self.check_status_and_reason(response, 404)
243 os.chmod(self.tempdir, 0o755)
244
245 def test_head(self):
246 response = self.request(
247 self.tempdir_name + '/test', method='HEAD')
248 self.check_status_and_reason(response, 200)
249 self.assertEqual(response.getheader('content-length'),
250 str(len(self.data)))
251 self.assertEqual(response.getheader('content-type'),
252 'application/octet-stream')
253
254 def test_invalid_requests(self):
255 response = self.request('/', method='FOO')
256 self.check_status_and_reason(response, 501)
257 # requests must be case sensitive,so this should fail too
258 response = self.request('/', method='get')
259 self.check_status_and_reason(response, 501)
260 response = self.request('/', method='GETs')
261 self.check_status_and_reason(response, 501)
262
263
264cgi_file1 = """\
265#!%s
266
267print("Content-type: text/html")
268print()
269print("Hello World")
270"""
271
272cgi_file2 = """\
273#!%s
274import cgi
275
276print("Content-type: text/html")
277print()
278
279form = cgi.FieldStorage()
280print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),\
281 form.getfirst("bacon")))
282"""
283
284class CGIHTTPServerTestCase(BaseTestCase):
285 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
286 pass
287
288 def setUp(self):
289 BaseTestCase.setUp(self)
290 self.parent_dir = tempfile.mkdtemp()
291 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
292 os.mkdir(self.cgi_dir)
293
294 self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
295 with open(self.file1_path, 'w') as file1:
296 file1.write(cgi_file1 % sys.executable)
297 os.chmod(self.file1_path, 0o777)
298
299 self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
300 with open(self.file2_path, 'w') as file2:
301 file2.write(cgi_file2 % sys.executable)
302 os.chmod(self.file2_path, 0o777)
303
304 self.cwd = os.getcwd()
305 os.chdir(self.parent_dir)
306
307 def tearDown(self):
308 try:
309 os.chdir(self.cwd)
310 os.remove(self.file1_path)
311 os.remove(self.file2_path)
312 os.rmdir(self.cgi_dir)
313 os.rmdir(self.parent_dir)
314 finally:
315 BaseTestCase.tearDown(self)
316
317 def test_headers_and_content(self):
318 res = self.request('/cgi-bin/file1.py')
319 self.assertEquals((b'Hello World\n', 'text/html', 200), \
320 (res.read(), res.getheader('Content-type'), res.status))
321
322 def test_post(self):
323 params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
324 headers = {'Content-type' : 'application/x-www-form-urlencoded'}
325 res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
326
327 self.assertEquals(res.read(), b'1, python, 123456\n')
328
329 def test_invaliduri(self):
330 res = self.request('/cgi-bin/invalid')
331 res.read()
332 self.assertEquals(res.status, 404)
333
334 def test_authorization(self):
335 headers = {b'Authorization' : b'Basic ' +
336 base64.b64encode(b'username:pass')}
337 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
338 self.assertEquals((b'Hello World\n', 'text/html', 200), \
339 (res.read(), res.getheader('Content-type'), res.status))
340
341
342def test_main(verbose=None):
343 try:
344 cwd = os.getcwd()
Georg Brandl24420152008-05-26 16:32:26 +0000345 support.run_unittest(BaseHTTPServerTestCase,
Georg Brandlb533e262008-05-25 18:19:30 +0000346 SimpleHTTPServerTestCase,
347 CGIHTTPServerTestCase
348 )
349 finally:
350 os.chdir(cwd)
351
352if __name__ == '__main__':
353 test_main()