blob: 3ec014018bc3ecb3a06082b6a0957baf194a6f0e [file] [log] [blame]
Georg Brandlf899dfa2008-05-18 09:12:20 +00001"""Unittests for the various HTTPServer modules.
2
3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
5"""
6
7from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
8from SimpleHTTPServer import SimpleHTTPRequestHandler
9from CGIHTTPServer import CGIHTTPRequestHandler
Gregory P. Smith923ba362009-04-06 06:33:26 +000010import CGIHTTPServer
Georg Brandlf899dfa2008-05-18 09:12:20 +000011
12import os
13import sys
14import base64
15import shutil
16import urllib
17import httplib
18import tempfile
19import threading
20
21import unittest
22from test import test_support
23
24
25class NoLogRequestHandler:
26 def log_message(self, *args):
27 # don't write log messages to stderr
28 pass
29
30
31class TestServerThread(threading.Thread):
32 def __init__(self, test_object, request_handler):
33 threading.Thread.__init__(self)
34 self.request_handler = request_handler
35 self.test_object = test_object
36 self.test_object.lock.acquire()
37
38 def run(self):
39 self.server = HTTPServer(('', 0), self.request_handler)
40 self.test_object.PORT = self.server.socket.getsockname()[1]
41 self.test_object.lock.release()
42 try:
43 self.server.serve_forever()
44 finally:
45 self.server.server_close()
46
47 def stop(self):
48 self.server.shutdown()
49
50
51class BaseTestCase(unittest.TestCase):
52 def setUp(self):
Nick Coghlan87c03b32009-10-17 15:23:08 +000053 os.environ = test_support.EnvironmentVarGuard()
Georg Brandlf899dfa2008-05-18 09:12:20 +000054 self.lock = threading.Lock()
55 self.thread = TestServerThread(self, self.request_handler)
56 self.thread.start()
57 self.lock.acquire()
58
59 def tearDown(self):
60 self.lock.release()
61 self.thread.stop()
Nick Coghlan87c03b32009-10-17 15:23:08 +000062 os.environ.__exit__()
Georg Brandlf899dfa2008-05-18 09:12:20 +000063
64 def request(self, uri, method='GET', body=None, headers={}):
65 self.connection = httplib.HTTPConnection('localhost', self.PORT)
66 self.connection.request(method, uri, body, headers)
67 return self.connection.getresponse()
68
69
70class BaseHTTPServerTestCase(BaseTestCase):
71 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
72 protocol_version = 'HTTP/1.1'
73 default_request_version = 'HTTP/1.1'
74
75 def do_TEST(self):
76 self.send_response(204)
77 self.send_header('Content-Type', 'text/html')
78 self.send_header('Connection', 'close')
79 self.end_headers()
80
81 def do_KEEP(self):
82 self.send_response(204)
83 self.send_header('Content-Type', 'text/html')
84 self.send_header('Connection', 'keep-alive')
85 self.end_headers()
86
87 def do_KEYERROR(self):
88 self.send_error(999)
89
90 def do_CUSTOM(self):
91 self.send_response(999)
92 self.send_header('Content-Type', 'text/html')
93 self.send_header('Connection', 'close')
94 self.end_headers()
95
96 def setUp(self):
97 BaseTestCase.setUp(self)
98 self.con = httplib.HTTPConnection('localhost', self.PORT)
99 self.con.connect()
100
101 def test_command(self):
102 self.con.request('GET', '/')
103 res = self.con.getresponse()
104 self.assertEquals(res.status, 501)
105
106 def test_request_line_trimming(self):
107 self.con._http_vsn_str = 'HTTP/1.1\n'
108 self.con.putrequest('GET', '/')
109 self.con.endheaders()
110 res = self.con.getresponse()
111 self.assertEquals(res.status, 501)
112
113 def test_version_bogus(self):
114 self.con._http_vsn_str = 'FUBAR'
115 self.con.putrequest('GET', '/')
116 self.con.endheaders()
117 res = self.con.getresponse()
118 self.assertEquals(res.status, 400)
119
120 def test_version_digits(self):
121 self.con._http_vsn_str = 'HTTP/9.9.9'
122 self.con.putrequest('GET', '/')
123 self.con.endheaders()
124 res = self.con.getresponse()
125 self.assertEquals(res.status, 400)
126
127 def test_version_none_get(self):
128 self.con._http_vsn_str = ''
129 self.con.putrequest('GET', '/')
130 self.con.endheaders()
131 res = self.con.getresponse()
132 self.assertEquals(res.status, 501)
133
134 def test_version_none(self):
135 self.con._http_vsn_str = ''
136 self.con.putrequest('PUT', '/')
137 self.con.endheaders()
138 res = self.con.getresponse()
139 self.assertEquals(res.status, 400)
140
141 def test_version_invalid(self):
142 self.con._http_vsn = 99
143 self.con._http_vsn_str = 'HTTP/9.9'
144 self.con.putrequest('GET', '/')
145 self.con.endheaders()
146 res = self.con.getresponse()
147 self.assertEquals(res.status, 505)
148
149 def test_send_blank(self):
150 self.con._http_vsn_str = ''
151 self.con.putrequest('', '')
152 self.con.endheaders()
153 res = self.con.getresponse()
154 self.assertEquals(res.status, 400)
155
156 def test_header_close(self):
157 self.con.putrequest('GET', '/')
158 self.con.putheader('Connection', 'close')
159 self.con.endheaders()
160 res = self.con.getresponse()
161 self.assertEquals(res.status, 501)
162
163 def test_head_keep_alive(self):
164 self.con._http_vsn_str = 'HTTP/1.1'
165 self.con.putrequest('GET', '/')
166 self.con.putheader('Connection', 'keep-alive')
167 self.con.endheaders()
168 res = self.con.getresponse()
169 self.assertEquals(res.status, 501)
170
171 def test_handler(self):
172 self.con.request('TEST', '/')
173 res = self.con.getresponse()
174 self.assertEquals(res.status, 204)
175
176 def test_return_header_keep_alive(self):
177 self.con.request('KEEP', '/')
178 res = self.con.getresponse()
179 self.assertEquals(res.getheader('Connection'), 'keep-alive')
180 self.con.request('TEST', '/')
181
182 def test_internal_key_error(self):
183 self.con.request('KEYERROR', '/')
184 res = self.con.getresponse()
185 self.assertEquals(res.status, 999)
186
187 def test_return_custom_status(self):
188 self.con.request('CUSTOM', '/')
189 res = self.con.getresponse()
190 self.assertEquals(res.status, 999)
191
192
193class SimpleHTTPServerTestCase(BaseTestCase):
194 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
195 pass
196
197 def setUp(self):
198 BaseTestCase.setUp(self)
Georg Brandl7bb16532008-05-20 06:47:31 +0000199 self.cwd = os.getcwd()
200 basetempdir = tempfile.gettempdir()
201 os.chdir(basetempdir)
Georg Brandlf899dfa2008-05-18 09:12:20 +0000202 self.data = 'We are the knights who say Ni!'
Georg Brandl7bb16532008-05-20 06:47:31 +0000203 self.tempdir = tempfile.mkdtemp(dir=basetempdir)
Georg Brandlf899dfa2008-05-18 09:12:20 +0000204 self.tempdir_name = os.path.basename(self.tempdir)
Georg Brandl7bb16532008-05-20 06:47:31 +0000205 temp = open(os.path.join(self.tempdir, 'test'), 'wb')
206 temp.write(self.data)
207 temp.close()
Georg Brandlf899dfa2008-05-18 09:12:20 +0000208
209 def tearDown(self):
210 try:
Georg Brandl7bb16532008-05-20 06:47:31 +0000211 os.chdir(self.cwd)
Georg Brandlf899dfa2008-05-18 09:12:20 +0000212 try:
213 shutil.rmtree(self.tempdir)
214 except:
215 pass
216 finally:
217 BaseTestCase.tearDown(self)
218
219 def check_status_and_reason(self, response, status, data=None):
220 body = response.read()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000221 self.assertTrue(response)
Georg Brandlf899dfa2008-05-18 09:12:20 +0000222 self.assertEquals(response.status, status)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000223 self.assertTrue(response.reason != None)
Georg Brandlf899dfa2008-05-18 09:12:20 +0000224 if data:
225 self.assertEqual(data, body)
226
227 def test_get(self):
228 #constructs the path relative to the root directory of the HTTPServer
Georg Brandl7bb16532008-05-20 06:47:31 +0000229 response = self.request(self.tempdir_name + '/test')
Georg Brandlf899dfa2008-05-18 09:12:20 +0000230 self.check_status_and_reason(response, 200, data=self.data)
231 response = self.request(self.tempdir_name + '/')
232 self.check_status_and_reason(response, 200)
233 response = self.request(self.tempdir_name)
234 self.check_status_and_reason(response, 301)
235 response = self.request('/ThisDoesNotExist')
236 self.check_status_and_reason(response, 404)
237 response = self.request('/' + 'ThisDoesNotExist' + '/')
238 self.check_status_and_reason(response, 404)
239 f = open(os.path.join(self.tempdir_name, 'index.html'), 'w')
240 response = self.request('/' + self.tempdir_name + '/')
241 self.check_status_and_reason(response, 200)
242 if os.name == 'posix':
243 # chmod won't work as expected on Windows platforms
244 os.chmod(self.tempdir, 0)
245 response = self.request(self.tempdir_name + '/')
246 self.check_status_and_reason(response, 404)
247 os.chmod(self.tempdir, 0755)
248
249 def test_head(self):
250 response = self.request(
Georg Brandl7bb16532008-05-20 06:47:31 +0000251 self.tempdir_name + '/test', method='HEAD')
Georg Brandlf899dfa2008-05-18 09:12:20 +0000252 self.check_status_and_reason(response, 200)
253 self.assertEqual(response.getheader('content-length'),
254 str(len(self.data)))
255 self.assertEqual(response.getheader('content-type'),
256 'application/octet-stream')
257
258 def test_invalid_requests(self):
259 response = self.request('/', method='FOO')
260 self.check_status_and_reason(response, 501)
261 # requests must be case sensitive,so this should fail too
262 response = self.request('/', method='get')
263 self.check_status_and_reason(response, 501)
264 response = self.request('/', method='GETs')
265 self.check_status_and_reason(response, 501)
266
267
268cgi_file1 = """\
269#!%s
270
271print "Content-type: text/html"
272print
273print "Hello World"
274"""
275
276cgi_file2 = """\
277#!%s
278import cgi
279
280print "Content-type: text/html"
281print
282
283form = cgi.FieldStorage()
Georg Brandlb740f6a2008-05-20 06:15:36 +0000284print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),\
Georg Brandlf899dfa2008-05-18 09:12:20 +0000285 form.getfirst("bacon"))
286"""
287
288class CGIHTTPServerTestCase(BaseTestCase):
289 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
290 pass
291
292 def setUp(self):
293 BaseTestCase.setUp(self)
294 self.parent_dir = tempfile.mkdtemp()
295 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
296 os.mkdir(self.cgi_dir)
297
298 self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
299 with open(self.file1_path, 'w') as file1:
300 file1.write(cgi_file1 % sys.executable)
301 os.chmod(self.file1_path, 0777)
302
303 self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
304 with open(self.file2_path, 'w') as file2:
305 file2.write(cgi_file2 % sys.executable)
306 os.chmod(self.file2_path, 0777)
307
Georg Brandl7bb16532008-05-20 06:47:31 +0000308 self.cwd = os.getcwd()
Georg Brandlf899dfa2008-05-18 09:12:20 +0000309 os.chdir(self.parent_dir)
310
311 def tearDown(self):
312 try:
Georg Brandl7bb16532008-05-20 06:47:31 +0000313 os.chdir(self.cwd)
Georg Brandlf899dfa2008-05-18 09:12:20 +0000314 os.remove(self.file1_path)
315 os.remove(self.file2_path)
316 os.rmdir(self.cgi_dir)
317 os.rmdir(self.parent_dir)
318 finally:
319 BaseTestCase.tearDown(self)
320
Gregory P. Smith923ba362009-04-06 06:33:26 +0000321 def test_url_collapse_path_split(self):
322 test_vectors = {
323 '': ('/', ''),
324 '..': IndexError,
325 '/.//..': IndexError,
326 '/': ('/', ''),
327 '//': ('/', ''),
328 '/\\': ('/', '\\'),
329 '/.//': ('/', ''),
330 'cgi-bin/file1.py': ('/cgi-bin', 'file1.py'),
331 '/cgi-bin/file1.py': ('/cgi-bin', 'file1.py'),
332 'a': ('/', 'a'),
333 '/a': ('/', 'a'),
334 '//a': ('/', 'a'),
335 './a': ('/', 'a'),
336 './C:/': ('/C:', ''),
337 '/a/b': ('/a', 'b'),
338 '/a/b/': ('/a/b', ''),
339 '/a/b/c/..': ('/a/b', ''),
340 '/a/b/c/../d': ('/a/b', 'd'),
341 '/a/b/c/../d/e/../f': ('/a/b/d', 'f'),
342 '/a/b/c/../d/e/../../f': ('/a/b', 'f'),
343 '/a/b/c/../d/e/.././././..//f': ('/a/b', 'f'),
344 '../a/b/c/../d/e/.././././..//f': IndexError,
345 '/a/b/c/../d/e/../../../f': ('/a', 'f'),
346 '/a/b/c/../d/e/../../../../f': ('/', 'f'),
347 '/a/b/c/../d/e/../../../../../f': IndexError,
348 '/a/b/c/../d/e/../../../../f/..': ('/', ''),
349 }
350 for path, expected in test_vectors.iteritems():
351 if isinstance(expected, type) and issubclass(expected, Exception):
352 self.assertRaises(expected,
353 CGIHTTPServer._url_collapse_path_split, path)
354 else:
355 actual = CGIHTTPServer._url_collapse_path_split(path)
356 self.assertEquals(expected, actual,
357 msg='path = %r\nGot: %r\nWanted: %r' % (
358 path, actual, expected))
359
Georg Brandlf899dfa2008-05-18 09:12:20 +0000360 def test_headers_and_content(self):
361 res = self.request('/cgi-bin/file1.py')
362 self.assertEquals(('Hello World\n', 'text/html', 200), \
363 (res.read(), res.getheader('Content-type'), res.status))
364
365 def test_post(self):
366 params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
367 headers = {'Content-type' : 'application/x-www-form-urlencoded'}
368 res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
369
370 self.assertEquals(res.read(), '1, python, 123456\n')
371
372 def test_invaliduri(self):
373 res = self.request('/cgi-bin/invalid')
374 res.read()
375 self.assertEquals(res.status, 404)
376
377 def test_authorization(self):
378 headers = {'Authorization' : 'Basic %s' % \
379 base64.b64encode('username:pass')}
380 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
381 self.assertEquals(('Hello World\n', 'text/html', 200), \
382 (res.read(), res.getheader('Content-type'), res.status))
383
Gregory P. Smith923ba362009-04-06 06:33:26 +0000384 def test_no_leading_slash(self):
385 # http://bugs.python.org/issue2254
386 res = self.request('cgi-bin/file1.py')
387 self.assertEquals(('Hello World\n', 'text/html', 200),
388 (res.read(), res.getheader('Content-type'), res.status))
389
Georg Brandlf899dfa2008-05-18 09:12:20 +0000390
391def test_main(verbose=None):
392 try:
393 cwd = os.getcwd()
394 test_support.run_unittest(BaseHTTPServerTestCase,
Nick Coghlan87c03b32009-10-17 15:23:08 +0000395 SimpleHTTPServerTestCase,
396 CGIHTTPServerTestCase
397 )
Georg Brandlf899dfa2008-05-18 09:12:20 +0000398 finally:
399 os.chdir(cwd)
400
401if __name__ == '__main__':
402 test_main()