blob: 5c48671053526f05967bc2a916bc6befe6f6edee [file] [log] [blame]
Georg Brandlf899dfa2008-05-18 09:12:20 +00001"""Unittests for the various HTTPServer modules.
2
3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
5"""
6
7from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
8from SimpleHTTPServer import SimpleHTTPRequestHandler
9from CGIHTTPServer import CGIHTTPRequestHandler
10
11import os
12import sys
13import base64
14import shutil
15import urllib
16import httplib
17import tempfile
18import threading
19
20import unittest
21from test import test_support
22
23
24class NoLogRequestHandler:
25 def log_message(self, *args):
26 # don't write log messages to stderr
27 pass
28
29
30class TestServerThread(threading.Thread):
31 def __init__(self, test_object, request_handler):
32 threading.Thread.__init__(self)
33 self.request_handler = request_handler
34 self.test_object = test_object
Georg Brandlf899dfa2008-05-18 09:12:20 +000035
36 def run(self):
37 self.server = HTTPServer(('', 0), self.request_handler)
38 self.test_object.PORT = self.server.socket.getsockname()[1]
Antoine Pitrou6ec1a162010-04-25 22:09:32 +000039 self.test_object.server_started.set()
40 self.test_object = None
Georg Brandlf899dfa2008-05-18 09:12:20 +000041 try:
Antoine Pitrou6ec1a162010-04-25 22:09:32 +000042 self.server.serve_forever(0.05)
Georg Brandlf899dfa2008-05-18 09:12:20 +000043 finally:
44 self.server.server_close()
45
46 def stop(self):
47 self.server.shutdown()
48
49
50class BaseTestCase(unittest.TestCase):
51 def setUp(self):
Antoine Pitrou6ec1a162010-04-25 22:09:32 +000052 self.server_started = threading.Event()
Georg Brandlf899dfa2008-05-18 09:12:20 +000053 self.thread = TestServerThread(self, self.request_handler)
54 self.thread.start()
Antoine Pitrou6ec1a162010-04-25 22:09:32 +000055 self.server_started.wait()
Georg Brandlf899dfa2008-05-18 09:12:20 +000056
57 def tearDown(self):
Georg Brandlf899dfa2008-05-18 09:12:20 +000058 self.thread.stop()
59
60 def request(self, uri, method='GET', body=None, headers={}):
61 self.connection = httplib.HTTPConnection('localhost', self.PORT)
62 self.connection.request(method, uri, body, headers)
63 return self.connection.getresponse()
64
65
66class BaseHTTPServerTestCase(BaseTestCase):
67 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
68 protocol_version = 'HTTP/1.1'
69 default_request_version = 'HTTP/1.1'
70
71 def do_TEST(self):
72 self.send_response(204)
73 self.send_header('Content-Type', 'text/html')
74 self.send_header('Connection', 'close')
75 self.end_headers()
76
77 def do_KEEP(self):
78 self.send_response(204)
79 self.send_header('Content-Type', 'text/html')
80 self.send_header('Connection', 'keep-alive')
81 self.end_headers()
82
83 def do_KEYERROR(self):
84 self.send_error(999)
85
86 def do_CUSTOM(self):
87 self.send_response(999)
88 self.send_header('Content-Type', 'text/html')
89 self.send_header('Connection', 'close')
90 self.end_headers()
91
92 def setUp(self):
93 BaseTestCase.setUp(self)
94 self.con = httplib.HTTPConnection('localhost', self.PORT)
95 self.con.connect()
96
97 def test_command(self):
98 self.con.request('GET', '/')
99 res = self.con.getresponse()
100 self.assertEquals(res.status, 501)
101
102 def test_request_line_trimming(self):
103 self.con._http_vsn_str = 'HTTP/1.1\n'
104 self.con.putrequest('GET', '/')
105 self.con.endheaders()
106 res = self.con.getresponse()
107 self.assertEquals(res.status, 501)
108
109 def test_version_bogus(self):
110 self.con._http_vsn_str = 'FUBAR'
111 self.con.putrequest('GET', '/')
112 self.con.endheaders()
113 res = self.con.getresponse()
114 self.assertEquals(res.status, 400)
115
116 def test_version_digits(self):
117 self.con._http_vsn_str = 'HTTP/9.9.9'
118 self.con.putrequest('GET', '/')
119 self.con.endheaders()
120 res = self.con.getresponse()
121 self.assertEquals(res.status, 400)
122
123 def test_version_none_get(self):
124 self.con._http_vsn_str = ''
125 self.con.putrequest('GET', '/')
126 self.con.endheaders()
127 res = self.con.getresponse()
128 self.assertEquals(res.status, 501)
129
130 def test_version_none(self):
131 self.con._http_vsn_str = ''
132 self.con.putrequest('PUT', '/')
133 self.con.endheaders()
134 res = self.con.getresponse()
135 self.assertEquals(res.status, 400)
136
137 def test_version_invalid(self):
138 self.con._http_vsn = 99
139 self.con._http_vsn_str = 'HTTP/9.9'
140 self.con.putrequest('GET', '/')
141 self.con.endheaders()
142 res = self.con.getresponse()
143 self.assertEquals(res.status, 505)
144
145 def test_send_blank(self):
146 self.con._http_vsn_str = ''
147 self.con.putrequest('', '')
148 self.con.endheaders()
149 res = self.con.getresponse()
150 self.assertEquals(res.status, 400)
151
152 def test_header_close(self):
153 self.con.putrequest('GET', '/')
154 self.con.putheader('Connection', 'close')
155 self.con.endheaders()
156 res = self.con.getresponse()
157 self.assertEquals(res.status, 501)
158
159 def test_head_keep_alive(self):
160 self.con._http_vsn_str = 'HTTP/1.1'
161 self.con.putrequest('GET', '/')
162 self.con.putheader('Connection', 'keep-alive')
163 self.con.endheaders()
164 res = self.con.getresponse()
165 self.assertEquals(res.status, 501)
166
167 def test_handler(self):
168 self.con.request('TEST', '/')
169 res = self.con.getresponse()
170 self.assertEquals(res.status, 204)
171
172 def test_return_header_keep_alive(self):
173 self.con.request('KEEP', '/')
174 res = self.con.getresponse()
175 self.assertEquals(res.getheader('Connection'), 'keep-alive')
176 self.con.request('TEST', '/')
177
178 def test_internal_key_error(self):
179 self.con.request('KEYERROR', '/')
180 res = self.con.getresponse()
181 self.assertEquals(res.status, 999)
182
183 def test_return_custom_status(self):
184 self.con.request('CUSTOM', '/')
185 res = self.con.getresponse()
186 self.assertEquals(res.status, 999)
187
188
189class SimpleHTTPServerTestCase(BaseTestCase):
190 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
191 pass
192
193 def setUp(self):
194 BaseTestCase.setUp(self)
Georg Brandl7bb16532008-05-20 06:47:31 +0000195 self.cwd = os.getcwd()
196 basetempdir = tempfile.gettempdir()
197 os.chdir(basetempdir)
Georg Brandlf899dfa2008-05-18 09:12:20 +0000198 self.data = 'We are the knights who say Ni!'
Georg Brandl7bb16532008-05-20 06:47:31 +0000199 self.tempdir = tempfile.mkdtemp(dir=basetempdir)
Georg Brandlf899dfa2008-05-18 09:12:20 +0000200 self.tempdir_name = os.path.basename(self.tempdir)
Georg Brandl7bb16532008-05-20 06:47:31 +0000201 temp = open(os.path.join(self.tempdir, 'test'), 'wb')
202 temp.write(self.data)
203 temp.close()
Georg Brandlf899dfa2008-05-18 09:12:20 +0000204
205 def tearDown(self):
206 try:
Georg Brandl7bb16532008-05-20 06:47:31 +0000207 os.chdir(self.cwd)
Georg Brandlf899dfa2008-05-18 09:12:20 +0000208 try:
209 shutil.rmtree(self.tempdir)
210 except:
211 pass
212 finally:
213 BaseTestCase.tearDown(self)
214
215 def check_status_and_reason(self, response, status, data=None):
216 body = response.read()
217 self.assert_(response)
218 self.assertEquals(response.status, status)
219 self.assert_(response.reason != None)
220 if data:
221 self.assertEqual(data, body)
222
223 def test_get(self):
224 #constructs the path relative to the root directory of the HTTPServer
Georg Brandl7bb16532008-05-20 06:47:31 +0000225 response = self.request(self.tempdir_name + '/test')
Georg Brandlf899dfa2008-05-18 09:12:20 +0000226 self.check_status_and_reason(response, 200, data=self.data)
227 response = self.request(self.tempdir_name + '/')
228 self.check_status_and_reason(response, 200)
229 response = self.request(self.tempdir_name)
230 self.check_status_and_reason(response, 301)
231 response = self.request('/ThisDoesNotExist')
232 self.check_status_and_reason(response, 404)
233 response = self.request('/' + 'ThisDoesNotExist' + '/')
234 self.check_status_and_reason(response, 404)
235 f = open(os.path.join(self.tempdir_name, 'index.html'), 'w')
236 response = self.request('/' + self.tempdir_name + '/')
237 self.check_status_and_reason(response, 200)
238 if os.name == 'posix':
239 # chmod won't work as expected on Windows platforms
240 os.chmod(self.tempdir, 0)
241 response = self.request(self.tempdir_name + '/')
242 self.check_status_and_reason(response, 404)
243 os.chmod(self.tempdir, 0755)
244
245 def test_head(self):
246 response = self.request(
Georg Brandl7bb16532008-05-20 06:47:31 +0000247 self.tempdir_name + '/test', method='HEAD')
Georg Brandlf899dfa2008-05-18 09:12:20 +0000248 self.check_status_and_reason(response, 200)
249 self.assertEqual(response.getheader('content-length'),
250 str(len(self.data)))
251 self.assertEqual(response.getheader('content-type'),
252 'application/octet-stream')
253
254 def test_invalid_requests(self):
255 response = self.request('/', method='FOO')
256 self.check_status_and_reason(response, 501)
257 # requests must be case sensitive,so this should fail too
258 response = self.request('/', method='get')
259 self.check_status_and_reason(response, 501)
260 response = self.request('/', method='GETs')
261 self.check_status_and_reason(response, 501)
262
263
264cgi_file1 = """\
265#!%s
266
267print "Content-type: text/html"
268print
269print "Hello World"
270"""
271
272cgi_file2 = """\
273#!%s
274import cgi
275
276print "Content-type: text/html"
277print
278
279form = cgi.FieldStorage()
Georg Brandlb740f6a2008-05-20 06:15:36 +0000280print "%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),\
Georg Brandlf899dfa2008-05-18 09:12:20 +0000281 form.getfirst("bacon"))
282"""
283
284class CGIHTTPServerTestCase(BaseTestCase):
285 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
286 pass
287
288 def setUp(self):
289 BaseTestCase.setUp(self)
290 self.parent_dir = tempfile.mkdtemp()
291 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
292 os.mkdir(self.cgi_dir)
293
Florent Xicluna61b9c3e2010-03-23 13:30:12 +0000294 # The shebang line should be pure ASCII: use symlink if possible.
295 # See issue #7668.
296 if hasattr(os, 'symlink'):
297 self.pythonexe = os.path.join(self.parent_dir, 'python')
298 os.symlink(sys.executable, self.pythonexe)
299 else:
300 self.pythonexe = sys.executable
301
Georg Brandlf899dfa2008-05-18 09:12:20 +0000302 self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
303 with open(self.file1_path, 'w') as file1:
Florent Xicluna61b9c3e2010-03-23 13:30:12 +0000304 file1.write(cgi_file1 % self.pythonexe)
Georg Brandlf899dfa2008-05-18 09:12:20 +0000305 os.chmod(self.file1_path, 0777)
306
307 self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
308 with open(self.file2_path, 'w') as file2:
Florent Xicluna61b9c3e2010-03-23 13:30:12 +0000309 file2.write(cgi_file2 % self.pythonexe)
Georg Brandlf899dfa2008-05-18 09:12:20 +0000310 os.chmod(self.file2_path, 0777)
311
Georg Brandl7bb16532008-05-20 06:47:31 +0000312 self.cwd = os.getcwd()
Georg Brandlf899dfa2008-05-18 09:12:20 +0000313 os.chdir(self.parent_dir)
314
315 def tearDown(self):
316 try:
Georg Brandl7bb16532008-05-20 06:47:31 +0000317 os.chdir(self.cwd)
Florent Xicluna61b9c3e2010-03-23 13:30:12 +0000318 if self.pythonexe != sys.executable:
319 os.remove(self.pythonexe)
Georg Brandlf899dfa2008-05-18 09:12:20 +0000320 os.remove(self.file1_path)
321 os.remove(self.file2_path)
322 os.rmdir(self.cgi_dir)
323 os.rmdir(self.parent_dir)
324 finally:
325 BaseTestCase.tearDown(self)
326
327 def test_headers_and_content(self):
328 res = self.request('/cgi-bin/file1.py')
329 self.assertEquals(('Hello World\n', 'text/html', 200), \
330 (res.read(), res.getheader('Content-type'), res.status))
331
332 def test_post(self):
333 params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
334 headers = {'Content-type' : 'application/x-www-form-urlencoded'}
335 res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
336
337 self.assertEquals(res.read(), '1, python, 123456\n')
338
339 def test_invaliduri(self):
340 res = self.request('/cgi-bin/invalid')
341 res.read()
342 self.assertEquals(res.status, 404)
343
344 def test_authorization(self):
345 headers = {'Authorization' : 'Basic %s' % \
346 base64.b64encode('username:pass')}
347 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
348 self.assertEquals(('Hello World\n', 'text/html', 200), \
349 (res.read(), res.getheader('Content-type'), res.status))
350
351
352def test_main(verbose=None):
Antoine Pitrou7f4f8c32009-11-02 22:04:54 +0000353 cwd = os.getcwd()
354 env = os.environ.copy()
Georg Brandlf899dfa2008-05-18 09:12:20 +0000355 try:
Georg Brandlf899dfa2008-05-18 09:12:20 +0000356 test_support.run_unittest(BaseHTTPServerTestCase,
Georg Brandlb740f6a2008-05-20 06:15:36 +0000357 SimpleHTTPServerTestCase,
358 CGIHTTPServerTestCase
Georg Brandlf899dfa2008-05-18 09:12:20 +0000359 )
360 finally:
Antoine Pitrou7f4f8c32009-11-02 22:04:54 +0000361 test_support.reap_children()
362 os.environ.clear()
363 os.environ.update(env)
Georg Brandlf899dfa2008-05-18 09:12:20 +0000364 os.chdir(cwd)
365
366if __name__ == '__main__':
367 test_main()