blob: 9cb11a6e70fa87cd30db1425ac73bc72e37cf941 [file] [log] [blame]
Georg Brandlf899dfa2008-05-18 09:12:20 +00001"""Unittests for the various HTTPServer modules.
2
3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
5"""
6
7from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
8from SimpleHTTPServer import SimpleHTTPRequestHandler
9from CGIHTTPServer import CGIHTTPRequestHandler
10
11import os
12import sys
13import base64
14import shutil
15import urllib
16import httplib
17import tempfile
18import threading
19
20import unittest
21from test import test_support
22
23
24class NoLogRequestHandler:
25 def log_message(self, *args):
26 # don't write log messages to stderr
27 pass
28
29
30class TestServerThread(threading.Thread):
31 def __init__(self, test_object, request_handler):
32 threading.Thread.__init__(self)
33 self.request_handler = request_handler
34 self.test_object = test_object
35 self.test_object.lock.acquire()
36
37 def run(self):
38 self.server = HTTPServer(('', 0), self.request_handler)
39 self.test_object.PORT = self.server.socket.getsockname()[1]
40 self.test_object.lock.release()
41 try:
42 self.server.serve_forever()
43 finally:
44 self.server.server_close()
45
46 def stop(self):
47 self.server.shutdown()
48
49
50class BaseTestCase(unittest.TestCase):
51 def setUp(self):
52 self.lock = threading.Lock()
53 self.thread = TestServerThread(self, self.request_handler)
54 self.thread.start()
55 self.lock.acquire()
56
57 def tearDown(self):
58 self.lock.release()
59 self.thread.stop()
60
61 def request(self, uri, method='GET', body=None, headers={}):
62 self.connection = httplib.HTTPConnection('localhost', self.PORT)
63 self.connection.request(method, uri, body, headers)
64 return self.connection.getresponse()
65
66
67class BaseHTTPServerTestCase(BaseTestCase):
68 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
69 protocol_version = 'HTTP/1.1'
70 default_request_version = 'HTTP/1.1'
71
72 def do_TEST(self):
73 self.send_response(204)
74 self.send_header('Content-Type', 'text/html')
75 self.send_header('Connection', 'close')
76 self.end_headers()
77
78 def do_KEEP(self):
79 self.send_response(204)
80 self.send_header('Content-Type', 'text/html')
81 self.send_header('Connection', 'keep-alive')
82 self.end_headers()
83
84 def do_KEYERROR(self):
85 self.send_error(999)
86
87 def do_CUSTOM(self):
88 self.send_response(999)
89 self.send_header('Content-Type', 'text/html')
90 self.send_header('Connection', 'close')
91 self.end_headers()
92
93 def setUp(self):
94 BaseTestCase.setUp(self)
95 self.con = httplib.HTTPConnection('localhost', self.PORT)
96 self.con.connect()
97
98 def test_command(self):
99 self.con.request('GET', '/')
100 res = self.con.getresponse()
101 self.assertEquals(res.status, 501)
102
103 def test_request_line_trimming(self):
104 self.con._http_vsn_str = 'HTTP/1.1\n'
105 self.con.putrequest('GET', '/')
106 self.con.endheaders()
107 res = self.con.getresponse()
108 self.assertEquals(res.status, 501)
109
110 def test_version_bogus(self):
111 self.con._http_vsn_str = 'FUBAR'
112 self.con.putrequest('GET', '/')
113 self.con.endheaders()
114 res = self.con.getresponse()
115 self.assertEquals(res.status, 400)
116
117 def test_version_digits(self):
118 self.con._http_vsn_str = 'HTTP/9.9.9'
119 self.con.putrequest('GET', '/')
120 self.con.endheaders()
121 res = self.con.getresponse()
122 self.assertEquals(res.status, 400)
123
124 def test_version_none_get(self):
125 self.con._http_vsn_str = ''
126 self.con.putrequest('GET', '/')
127 self.con.endheaders()
128 res = self.con.getresponse()
129 self.assertEquals(res.status, 501)
130
131 def test_version_none(self):
132 self.con._http_vsn_str = ''
133 self.con.putrequest('PUT', '/')
134 self.con.endheaders()
135 res = self.con.getresponse()
136 self.assertEquals(res.status, 400)
137
138 def test_version_invalid(self):
139 self.con._http_vsn = 99
140 self.con._http_vsn_str = 'HTTP/9.9'
141 self.con.putrequest('GET', '/')
142 self.con.endheaders()
143 res = self.con.getresponse()
144 self.assertEquals(res.status, 505)
145
146 def test_send_blank(self):
147 self.con._http_vsn_str = ''
148 self.con.putrequest('', '')
149 self.con.endheaders()
150 res = self.con.getresponse()
151 self.assertEquals(res.status, 400)
152
153 def test_header_close(self):
154 self.con.putrequest('GET', '/')
155 self.con.putheader('Connection', 'close')
156 self.con.endheaders()
157 res = self.con.getresponse()
158 self.assertEquals(res.status, 501)
159
160 def test_head_keep_alive(self):
161 self.con._http_vsn_str = 'HTTP/1.1'
162 self.con.putrequest('GET', '/')
163 self.con.putheader('Connection', 'keep-alive')
164 self.con.endheaders()
165 res = self.con.getresponse()
166 self.assertEquals(res.status, 501)
167
168 def test_handler(self):
169 self.con.request('TEST', '/')
170 res = self.con.getresponse()
171 self.assertEquals(res.status, 204)
172
173 def test_return_header_keep_alive(self):
174 self.con.request('KEEP', '/')
175 res = self.con.getresponse()
176 self.assertEquals(res.getheader('Connection'), 'keep-alive')
177 self.con.request('TEST', '/')
178
179 def test_internal_key_error(self):
180 self.con.request('KEYERROR', '/')
181 res = self.con.getresponse()
182 self.assertEquals(res.status, 999)
183
184 def test_return_custom_status(self):
185 self.con.request('CUSTOM', '/')
186 res = self.con.getresponse()
187 self.assertEquals(res.status, 999)
188
189
190class SimpleHTTPServerTestCase(BaseTestCase):
191 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
192 pass
193
194 def setUp(self):
195 BaseTestCase.setUp(self)
196 os.chdir(tempfile.gettempdir())
197 self.data = 'We are the knights who say Ni!'
198 self.tempdir = tempfile.mkdtemp(dir=tempfile.gettempdir())
199 self.tempdir_name = os.path.basename(self.tempdir)
200 self.tempfile = tempfile.NamedTemporaryFile(dir=self.tempdir)
201 self.tempfile.file.write(self.data)
202 self.tempfile.file.flush()
203 self.tempfile_name = os.path.basename(self.tempfile.name)
204
205 def tearDown(self):
206 try:
207 self.tempfile.close()
208 try:
209 shutil.rmtree(self.tempdir)
210 except:
211 pass
212 finally:
213 BaseTestCase.tearDown(self)
214
215 def check_status_and_reason(self, response, status, data=None):
216 body = response.read()
217 self.assert_(response)
218 self.assertEquals(response.status, status)
219 self.assert_(response.reason != None)
220 if data:
221 self.assertEqual(data, body)
222
223 def test_get(self):
224 #constructs the path relative to the root directory of the HTTPServer
225 response = self.request(self.tempdir_name + '/' + self.tempfile_name)
226 self.check_status_and_reason(response, 200, data=self.data)
227 response = self.request(self.tempdir_name + '/')
228 self.check_status_and_reason(response, 200)
229 response = self.request(self.tempdir_name)
230 self.check_status_and_reason(response, 301)
231 response = self.request('/ThisDoesNotExist')
232 self.check_status_and_reason(response, 404)
233 response = self.request('/' + 'ThisDoesNotExist' + '/')
234 self.check_status_and_reason(response, 404)
235 f = open(os.path.join(self.tempdir_name, 'index.html'), 'w')
236 response = self.request('/' + self.tempdir_name + '/')
237 self.check_status_and_reason(response, 200)
238 if os.name == 'posix':
239 # chmod won't work as expected on Windows platforms
240 os.chmod(self.tempdir, 0)
241 response = self.request(self.tempdir_name + '/')
242 self.check_status_and_reason(response, 404)
243 os.chmod(self.tempdir, 0755)
244
245 def test_head(self):
246 response = self.request(
247 self.tempdir_name + '/'+ self.tempfile_name, method='HEAD')
248 self.check_status_and_reason(response, 200)
249 self.assertEqual(response.getheader('content-length'),
250 str(len(self.data)))
251 self.assertEqual(response.getheader('content-type'),
252 'application/octet-stream')
253
254 def test_invalid_requests(self):
255 response = self.request('/', method='FOO')
256 self.check_status_and_reason(response, 501)
257 # requests must be case sensitive,so this should fail too
258 response = self.request('/', method='get')
259 self.check_status_and_reason(response, 501)
260 response = self.request('/', method='GETs')
261 self.check_status_and_reason(response, 501)
262
263
264cgi_file1 = """\
265#!%s
266
267print "Content-type: text/html"
268print
269print "Hello World"
270"""
271
272cgi_file2 = """\
273#!%s
274import cgi
275
276print "Content-type: text/html"
277print
278
279form = cgi.FieldStorage()
Georg Brandlb740f6a2008-05-20 06:15:36 +0000280print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),\
Georg Brandlf899dfa2008-05-18 09:12:20 +0000281 form.getfirst("bacon"))
282"""
283
284class CGIHTTPServerTestCase(BaseTestCase):
285 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
286 pass
287
288 def setUp(self):
289 BaseTestCase.setUp(self)
290 self.parent_dir = tempfile.mkdtemp()
291 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
292 os.mkdir(self.cgi_dir)
293
294 self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
295 with open(self.file1_path, 'w') as file1:
296 file1.write(cgi_file1 % sys.executable)
297 os.chmod(self.file1_path, 0777)
298
299 self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
300 with open(self.file2_path, 'w') as file2:
301 file2.write(cgi_file2 % sys.executable)
302 os.chmod(self.file2_path, 0777)
303
304 os.chdir(self.parent_dir)
305
306 def tearDown(self):
307 try:
308 os.remove(self.file1_path)
309 os.remove(self.file2_path)
310 os.rmdir(self.cgi_dir)
311 os.rmdir(self.parent_dir)
312 finally:
313 BaseTestCase.tearDown(self)
314
315 def test_headers_and_content(self):
316 res = self.request('/cgi-bin/file1.py')
317 self.assertEquals(('Hello World\n', 'text/html', 200), \
318 (res.read(), res.getheader('Content-type'), res.status))
319
320 def test_post(self):
321 params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
322 headers = {'Content-type' : 'application/x-www-form-urlencoded'}
323 res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
324
325 self.assertEquals(res.read(), '1, python, 123456\n')
326
327 def test_invaliduri(self):
328 res = self.request('/cgi-bin/invalid')
329 res.read()
330 self.assertEquals(res.status, 404)
331
332 def test_authorization(self):
333 headers = {'Authorization' : 'Basic %s' % \
334 base64.b64encode('username:pass')}
335 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
336 self.assertEquals(('Hello World\n', 'text/html', 200), \
337 (res.read(), res.getheader('Content-type'), res.status))
338
339
340def test_main(verbose=None):
341 try:
342 cwd = os.getcwd()
343 test_support.run_unittest(BaseHTTPServerTestCase,
Georg Brandlb740f6a2008-05-20 06:15:36 +0000344 SimpleHTTPServerTestCase,
345 CGIHTTPServerTestCase
Georg Brandlf899dfa2008-05-18 09:12:20 +0000346 )
347 finally:
348 os.chdir(cwd)
349
350if __name__ == '__main__':
351 test_main()