blob: 31321c3fa6a32825ec198ab8330c76494fa13bb1 [file] [log] [blame]
Georg Brandlf899dfa2008-05-18 09:12:20 +00001"""Unittests for the various HTTPServer modules.
2
3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
5"""
6
7from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
8from SimpleHTTPServer import SimpleHTTPRequestHandler
9from CGIHTTPServer import CGIHTTPRequestHandler
10
11import os
12import sys
13import base64
14import shutil
15import urllib
16import httplib
17import tempfile
18import threading
19
20import unittest
21from test import test_support
22
23
24class NoLogRequestHandler:
25 def log_message(self, *args):
26 # don't write log messages to stderr
27 pass
28
29
30class TestServerThread(threading.Thread):
31 def __init__(self, test_object, request_handler):
32 threading.Thread.__init__(self)
33 self.request_handler = request_handler
34 self.test_object = test_object
35 self.test_object.lock.acquire()
36
37 def run(self):
38 self.server = HTTPServer(('', 0), self.request_handler)
39 self.test_object.PORT = self.server.socket.getsockname()[1]
40 self.test_object.lock.release()
41 try:
42 self.server.serve_forever()
43 finally:
44 self.server.server_close()
45
46 def stop(self):
47 self.server.shutdown()
48
49
50class BaseTestCase(unittest.TestCase):
51 def setUp(self):
52 self.lock = threading.Lock()
53 self.thread = TestServerThread(self, self.request_handler)
54 self.thread.start()
55 self.lock.acquire()
56
57 def tearDown(self):
58 self.lock.release()
59 self.thread.stop()
60
61 def request(self, uri, method='GET', body=None, headers={}):
62 self.connection = httplib.HTTPConnection('localhost', self.PORT)
63 self.connection.request(method, uri, body, headers)
64 return self.connection.getresponse()
65
66
67class BaseHTTPServerTestCase(BaseTestCase):
68 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
69 protocol_version = 'HTTP/1.1'
70 default_request_version = 'HTTP/1.1'
71
72 def do_TEST(self):
73 self.send_response(204)
74 self.send_header('Content-Type', 'text/html')
75 self.send_header('Connection', 'close')
76 self.end_headers()
77
78 def do_KEEP(self):
79 self.send_response(204)
80 self.send_header('Content-Type', 'text/html')
81 self.send_header('Connection', 'keep-alive')
82 self.end_headers()
83
84 def do_KEYERROR(self):
85 self.send_error(999)
86
87 def do_CUSTOM(self):
88 self.send_response(999)
89 self.send_header('Content-Type', 'text/html')
90 self.send_header('Connection', 'close')
91 self.end_headers()
92
93 def setUp(self):
94 BaseTestCase.setUp(self)
95 self.con = httplib.HTTPConnection('localhost', self.PORT)
96 self.con.connect()
97
98 def test_command(self):
99 self.con.request('GET', '/')
100 res = self.con.getresponse()
101 self.assertEquals(res.status, 501)
102
103 def test_request_line_trimming(self):
104 self.con._http_vsn_str = 'HTTP/1.1\n'
105 self.con.putrequest('GET', '/')
106 self.con.endheaders()
107 res = self.con.getresponse()
108 self.assertEquals(res.status, 501)
109
110 def test_version_bogus(self):
111 self.con._http_vsn_str = 'FUBAR'
112 self.con.putrequest('GET', '/')
113 self.con.endheaders()
114 res = self.con.getresponse()
115 self.assertEquals(res.status, 400)
116
117 def test_version_digits(self):
118 self.con._http_vsn_str = 'HTTP/9.9.9'
119 self.con.putrequest('GET', '/')
120 self.con.endheaders()
121 res = self.con.getresponse()
122 self.assertEquals(res.status, 400)
123
124 def test_version_none_get(self):
125 self.con._http_vsn_str = ''
126 self.con.putrequest('GET', '/')
127 self.con.endheaders()
128 res = self.con.getresponse()
129 self.assertEquals(res.status, 501)
130
131 def test_version_none(self):
132 self.con._http_vsn_str = ''
133 self.con.putrequest('PUT', '/')
134 self.con.endheaders()
135 res = self.con.getresponse()
136 self.assertEquals(res.status, 400)
137
138 def test_version_invalid(self):
139 self.con._http_vsn = 99
140 self.con._http_vsn_str = 'HTTP/9.9'
141 self.con.putrequest('GET', '/')
142 self.con.endheaders()
143 res = self.con.getresponse()
144 self.assertEquals(res.status, 505)
145
146 def test_send_blank(self):
147 self.con._http_vsn_str = ''
148 self.con.putrequest('', '')
149 self.con.endheaders()
150 res = self.con.getresponse()
151 self.assertEquals(res.status, 400)
152
153 def test_header_close(self):
154 self.con.putrequest('GET', '/')
155 self.con.putheader('Connection', 'close')
156 self.con.endheaders()
157 res = self.con.getresponse()
158 self.assertEquals(res.status, 501)
159
160 def test_head_keep_alive(self):
161 self.con._http_vsn_str = 'HTTP/1.1'
162 self.con.putrequest('GET', '/')
163 self.con.putheader('Connection', 'keep-alive')
164 self.con.endheaders()
165 res = self.con.getresponse()
166 self.assertEquals(res.status, 501)
167
168 def test_handler(self):
169 self.con.request('TEST', '/')
170 res = self.con.getresponse()
171 self.assertEquals(res.status, 204)
172
173 def test_return_header_keep_alive(self):
174 self.con.request('KEEP', '/')
175 res = self.con.getresponse()
176 self.assertEquals(res.getheader('Connection'), 'keep-alive')
177 self.con.request('TEST', '/')
178
179 def test_internal_key_error(self):
180 self.con.request('KEYERROR', '/')
181 res = self.con.getresponse()
182 self.assertEquals(res.status, 999)
183
184 def test_return_custom_status(self):
185 self.con.request('CUSTOM', '/')
186 res = self.con.getresponse()
187 self.assertEquals(res.status, 999)
188
189
190class SimpleHTTPServerTestCase(BaseTestCase):
191 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
192 pass
193
194 def setUp(self):
195 BaseTestCase.setUp(self)
Georg Brandl7bb16532008-05-20 06:47:31 +0000196 self.cwd = os.getcwd()
197 basetempdir = tempfile.gettempdir()
198 os.chdir(basetempdir)
Georg Brandlf899dfa2008-05-18 09:12:20 +0000199 self.data = 'We are the knights who say Ni!'
Georg Brandl7bb16532008-05-20 06:47:31 +0000200 self.tempdir = tempfile.mkdtemp(dir=basetempdir)
Georg Brandlf899dfa2008-05-18 09:12:20 +0000201 self.tempdir_name = os.path.basename(self.tempdir)
Georg Brandl7bb16532008-05-20 06:47:31 +0000202 temp = open(os.path.join(self.tempdir, 'test'), 'wb')
203 temp.write(self.data)
204 temp.close()
Georg Brandlf899dfa2008-05-18 09:12:20 +0000205
206 def tearDown(self):
207 try:
Georg Brandl7bb16532008-05-20 06:47:31 +0000208 os.chdir(self.cwd)
Georg Brandlf899dfa2008-05-18 09:12:20 +0000209 try:
210 shutil.rmtree(self.tempdir)
211 except:
212 pass
213 finally:
214 BaseTestCase.tearDown(self)
215
216 def check_status_and_reason(self, response, status, data=None):
217 body = response.read()
218 self.assert_(response)
219 self.assertEquals(response.status, status)
220 self.assert_(response.reason != None)
221 if data:
222 self.assertEqual(data, body)
223
224 def test_get(self):
225 #constructs the path relative to the root directory of the HTTPServer
Georg Brandl7bb16532008-05-20 06:47:31 +0000226 response = self.request(self.tempdir_name + '/test')
Georg Brandlf899dfa2008-05-18 09:12:20 +0000227 self.check_status_and_reason(response, 200, data=self.data)
228 response = self.request(self.tempdir_name + '/')
229 self.check_status_and_reason(response, 200)
230 response = self.request(self.tempdir_name)
231 self.check_status_and_reason(response, 301)
232 response = self.request('/ThisDoesNotExist')
233 self.check_status_and_reason(response, 404)
234 response = self.request('/' + 'ThisDoesNotExist' + '/')
235 self.check_status_and_reason(response, 404)
236 f = open(os.path.join(self.tempdir_name, 'index.html'), 'w')
237 response = self.request('/' + self.tempdir_name + '/')
238 self.check_status_and_reason(response, 200)
239 if os.name == 'posix':
240 # chmod won't work as expected on Windows platforms
241 os.chmod(self.tempdir, 0)
242 response = self.request(self.tempdir_name + '/')
243 self.check_status_and_reason(response, 404)
244 os.chmod(self.tempdir, 0755)
245
246 def test_head(self):
247 response = self.request(
Georg Brandl7bb16532008-05-20 06:47:31 +0000248 self.tempdir_name + '/test', method='HEAD')
Georg Brandlf899dfa2008-05-18 09:12:20 +0000249 self.check_status_and_reason(response, 200)
250 self.assertEqual(response.getheader('content-length'),
251 str(len(self.data)))
252 self.assertEqual(response.getheader('content-type'),
253 'application/octet-stream')
254
255 def test_invalid_requests(self):
256 response = self.request('/', method='FOO')
257 self.check_status_and_reason(response, 501)
258 # requests must be case sensitive,so this should fail too
259 response = self.request('/', method='get')
260 self.check_status_and_reason(response, 501)
261 response = self.request('/', method='GETs')
262 self.check_status_and_reason(response, 501)
263
264
265cgi_file1 = """\
266#!%s
267
268print "Content-type: text/html"
269print
270print "Hello World"
271"""
272
273cgi_file2 = """\
274#!%s
275import cgi
276
277print "Content-type: text/html"
278print
279
280form = cgi.FieldStorage()
Georg Brandlb740f6a2008-05-20 06:15:36 +0000281print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),\
Georg Brandlf899dfa2008-05-18 09:12:20 +0000282 form.getfirst("bacon"))
283"""
284
285class CGIHTTPServerTestCase(BaseTestCase):
286 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
287 pass
288
289 def setUp(self):
290 BaseTestCase.setUp(self)
291 self.parent_dir = tempfile.mkdtemp()
292 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
293 os.mkdir(self.cgi_dir)
294
295 self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
296 with open(self.file1_path, 'w') as file1:
297 file1.write(cgi_file1 % sys.executable)
298 os.chmod(self.file1_path, 0777)
299
300 self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
301 with open(self.file2_path, 'w') as file2:
302 file2.write(cgi_file2 % sys.executable)
303 os.chmod(self.file2_path, 0777)
304
Georg Brandl7bb16532008-05-20 06:47:31 +0000305 self.cwd = os.getcwd()
Georg Brandlf899dfa2008-05-18 09:12:20 +0000306 os.chdir(self.parent_dir)
307
308 def tearDown(self):
309 try:
Georg Brandl7bb16532008-05-20 06:47:31 +0000310 os.chdir(self.cwd)
Georg Brandlf899dfa2008-05-18 09:12:20 +0000311 os.remove(self.file1_path)
312 os.remove(self.file2_path)
313 os.rmdir(self.cgi_dir)
314 os.rmdir(self.parent_dir)
315 finally:
316 BaseTestCase.tearDown(self)
317
318 def test_headers_and_content(self):
319 res = self.request('/cgi-bin/file1.py')
320 self.assertEquals(('Hello World\n', 'text/html', 200), \
321 (res.read(), res.getheader('Content-type'), res.status))
322
323 def test_post(self):
324 params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
325 headers = {'Content-type' : 'application/x-www-form-urlencoded'}
326 res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
327
328 self.assertEquals(res.read(), '1, python, 123456\n')
329
330 def test_invaliduri(self):
331 res = self.request('/cgi-bin/invalid')
332 res.read()
333 self.assertEquals(res.status, 404)
334
335 def test_authorization(self):
336 headers = {'Authorization' : 'Basic %s' % \
337 base64.b64encode('username:pass')}
338 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
339 self.assertEquals(('Hello World\n', 'text/html', 200), \
340 (res.read(), res.getheader('Content-type'), res.status))
341
342
343def test_main(verbose=None):
344 try:
345 cwd = os.getcwd()
346 test_support.run_unittest(BaseHTTPServerTestCase,
Georg Brandlb740f6a2008-05-20 06:15:36 +0000347 SimpleHTTPServerTestCase,
348 CGIHTTPServerTestCase
Georg Brandlf899dfa2008-05-18 09:12:20 +0000349 )
350 finally:
351 os.chdir(cwd)
352
353if __name__ == '__main__':
354 test_main()