blob: 88d4a3d683291aca94d42871630c7db43e693db5 [file] [log] [blame]
Phillip J. Eby5cf565d2006-06-09 16:40:18 +00001from __future__ import nested_scopes # Backward compat for 2.1
Collin Winterc2898c52007-04-25 17:29:52 +00002from unittest import TestCase
Phillip J. Eby5cf565d2006-06-09 16:40:18 +00003from wsgiref.util import setup_testing_defaults
4from wsgiref.headers import Headers
5from wsgiref.handlers import BaseHandler, BaseCGIHandler
6from wsgiref import util
7from wsgiref.validate import validator
8from wsgiref.simple_server import WSGIServer, WSGIRequestHandler, demo_app
9from wsgiref.simple_server import make_server
10from StringIO import StringIO
Georg Brandle152a772008-05-24 18:31:28 +000011from SocketServer import BaseServer
Antoine Pitroub3c169b2009-11-03 16:41:20 +000012import os
13import re
14import sys
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000015
Collin Winterc2898c52007-04-25 17:29:52 +000016from test import test_support
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000017
18class MockServer(WSGIServer):
19 """Non-socket HTTP server"""
20
21 def __init__(self, server_address, RequestHandlerClass):
22 BaseServer.__init__(self, server_address, RequestHandlerClass)
23 self.server_bind()
24
25 def server_bind(self):
26 host, port = self.server_address
27 self.server_name = host
28 self.server_port = port
29 self.setup_environ()
30
31
32class MockHandler(WSGIRequestHandler):
33 """Non-socket HTTP handler"""
34 def setup(self):
35 self.connection = self.request
36 self.rfile, self.wfile = self.connection
37
38 def finish(self):
39 pass
40
41
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000042def hello_app(environ,start_response):
43 start_response("200 OK", [
44 ('Content-Type','text/plain'),
45 ('Date','Mon, 05 Jun 2006 18:49:54 GMT')
46 ])
47 return ["Hello, world!"]
48
49def run_amock(app=hello_app, data="GET / HTTP/1.0\n\n"):
50 server = make_server("", 80, app, MockServer, MockHandler)
51 inp, out, err, olderr = StringIO(data), StringIO(), StringIO(), sys.stderr
52 sys.stderr = err
53
54 try:
55 server.finish_request((inp,out), ("127.0.0.1",8888))
56 finally:
57 sys.stderr = olderr
58
59 return out.getvalue(), err.getvalue()
60
61
Phillip J. Eby403019b2006-06-12 04:04:32 +000062def compare_generic_iter(make_it,match):
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000063 """Utility to compare a generic 2.1/2.2+ iterator with an iterable
64
65 If running under Python 2.2+, this tests the iterator using iter()/next(),
66 as well as __getitem__. 'make_it' must be a function returning a fresh
67 iterator to be tested (since this may test the iterator twice)."""
68
69 it = make_it()
70 n = 0
71 for item in match:
Phillip J. Eby403019b2006-06-12 04:04:32 +000072 if not it[n]==item: raise AssertionError
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000073 n+=1
74 try:
75 it[n]
76 except IndexError:
77 pass
78 else:
79 raise AssertionError("Too many items from __getitem__",it)
80
81 try:
82 iter, StopIteration
83 except NameError:
84 pass
85 else:
86 # Only test iter mode under 2.2+
87 it = make_it()
Phillip J. Eby403019b2006-06-12 04:04:32 +000088 if not iter(it) is it: raise AssertionError
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000089 for item in match:
Phillip J. Eby403019b2006-06-12 04:04:32 +000090 if not it.next()==item: raise AssertionError
91 try:
92 it.next()
93 except StopIteration:
94 pass
95 else:
96 raise AssertionError("Too many items from .next()",it)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000097
98
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000099class IntegrationTests(TestCase):
100
101 def check_hello(self, out, has_length=True):
102 self.assertEqual(out,
103 "HTTP/1.0 200 OK\r\n"
104 "Server: WSGIServer/0.1 Python/"+sys.version.split()[0]+"\r\n"
105 "Content-Type: text/plain\r\n"
106 "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
107 (has_length and "Content-Length: 13\r\n" or "") +
108 "\r\n"
109 "Hello, world!"
110 )
111
112 def test_plain_hello(self):
113 out, err = run_amock()
114 self.check_hello(out)
115
116 def test_validated_hello(self):
117 out, err = run_amock(validator(hello_app))
118 # the middleware doesn't support len(), so content-length isn't there
119 self.check_hello(out, has_length=False)
120
121 def test_simple_validation_error(self):
122 def bad_app(environ,start_response):
123 start_response("200 OK", ('Content-Type','text/plain'))
124 return ["Hello, world!"]
125 out, err = run_amock(validator(bad_app))
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000126 self.assertTrue(out.endswith(
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000127 "A server error occurred. Please contact the administrator."
128 ))
129 self.assertEqual(
130 err.splitlines()[-2],
131 "AssertionError: Headers (('Content-Type', 'text/plain')) must"
132 " be of type list: <type 'tuple'>"
133 )
134
135
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000136class UtilityTests(TestCase):
137
138 def checkShift(self,sn_in,pi_in,part,sn_out,pi_out):
139 env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in}
140 util.setup_testing_defaults(env)
141 self.assertEqual(util.shift_path_info(env),part)
142 self.assertEqual(env['PATH_INFO'],pi_out)
143 self.assertEqual(env['SCRIPT_NAME'],sn_out)
144 return env
145
146 def checkDefault(self, key, value, alt=None):
147 # Check defaulting when empty
148 env = {}
149 util.setup_testing_defaults(env)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000150 if isinstance(value, StringIO):
151 self.assertIsInstance(env[key], StringIO)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000152 else:
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000153 self.assertEqual(env[key], value)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000154
155 # Check existing value
156 env = {key:alt}
157 util.setup_testing_defaults(env)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000158 self.assertTrue(env[key] is alt)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000159
160 def checkCrossDefault(self,key,value,**kw):
161 util.setup_testing_defaults(kw)
162 self.assertEqual(kw[key],value)
163
164 def checkAppURI(self,uri,**kw):
165 util.setup_testing_defaults(kw)
166 self.assertEqual(util.application_uri(kw),uri)
167
168 def checkReqURI(self,uri,query=1,**kw):
169 util.setup_testing_defaults(kw)
170 self.assertEqual(util.request_uri(kw,query),uri)
171
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000172 def checkFW(self,text,size,match):
173
174 def make_it(text=text,size=size):
175 return util.FileWrapper(StringIO(text),size)
176
Phillip J. Eby403019b2006-06-12 04:04:32 +0000177 compare_generic_iter(make_it,match)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000178
179 it = make_it()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000180 self.assertFalse(it.filelike.closed)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000181
182 for item in it:
183 pass
184
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000185 self.assertFalse(it.filelike.closed)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000186
187 it.close()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000188 self.assertTrue(it.filelike.closed)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000189
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000190 def testSimpleShifts(self):
191 self.checkShift('','/', '', '/', '')
192 self.checkShift('','/x', 'x', '/x', '')
193 self.checkShift('/','', None, '/', '')
194 self.checkShift('/a','/x/y', 'x', '/a/x', '/y')
195 self.checkShift('/a','/x/', 'x', '/a/x', '/')
196
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000197 def testNormalizedShifts(self):
198 self.checkShift('/a/b', '/../y', '..', '/a', '/y')
199 self.checkShift('', '/../y', '..', '', '/y')
200 self.checkShift('/a/b', '//y', 'y', '/a/b/y', '')
201 self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/')
202 self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '')
203 self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/')
204 self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/')
205 self.checkShift('/a/b', '///', '', '/a/b/', '')
206 self.checkShift('/a/b', '/.//', '', '/a/b/', '')
207 self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
208 self.checkShift('/a/b', '/.', None, '/a/b', '')
209
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000210 def testDefaults(self):
211 for key, value in [
212 ('SERVER_NAME','127.0.0.1'),
213 ('SERVER_PORT', '80'),
214 ('SERVER_PROTOCOL','HTTP/1.0'),
215 ('HTTP_HOST','127.0.0.1'),
216 ('REQUEST_METHOD','GET'),
217 ('SCRIPT_NAME',''),
218 ('PATH_INFO','/'),
219 ('wsgi.version', (1,0)),
220 ('wsgi.run_once', 0),
221 ('wsgi.multithread', 0),
222 ('wsgi.multiprocess', 0),
223 ('wsgi.input', StringIO("")),
224 ('wsgi.errors', StringIO()),
225 ('wsgi.url_scheme','http'),
226 ]:
227 self.checkDefault(key,value)
228
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000229 def testCrossDefaults(self):
230 self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar")
231 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on")
232 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1")
233 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes")
234 self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo")
235 self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo")
236 self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on")
237
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000238 def testGuessScheme(self):
239 self.assertEqual(util.guess_scheme({}), "http")
240 self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http")
241 self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https")
242 self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https")
243 self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https")
244
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000245 def testAppURIs(self):
246 self.checkAppURI("http://127.0.0.1/")
247 self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
Serhiy Storchakae1986922014-01-12 12:11:47 +0200248 self.checkAppURI("http://127.0.0.1/sp%E4m", SCRIPT_NAME="/sp\xe4m")
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000249 self.checkAppURI("http://spam.example.com:2071/",
250 HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071")
251 self.checkAppURI("http://spam.example.com/",
252 SERVER_NAME="spam.example.com")
253 self.checkAppURI("http://127.0.0.1/",
254 HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com")
255 self.checkAppURI("https://127.0.0.1/", HTTPS="on")
256 self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000",
257 HTTP_HOST=None)
258
259 def testReqURIs(self):
260 self.checkReqURI("http://127.0.0.1/")
261 self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
Serhiy Storchakae1986922014-01-12 12:11:47 +0200262 self.checkReqURI("http://127.0.0.1/sp%E4m", SCRIPT_NAME="/sp\xe4m")
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000263 self.checkReqURI("http://127.0.0.1/spammity/spam",
264 SCRIPT_NAME="/spammity", PATH_INFO="/spam")
Serhiy Storchakae1986922014-01-12 12:11:47 +0200265 self.checkReqURI("http://127.0.0.1/spammity/sp%E4m",
266 SCRIPT_NAME="/spammity", PATH_INFO="/sp\xe4m")
Senthil Kumaran559395f2010-12-29 06:30:19 +0000267 self.checkReqURI("http://127.0.0.1/spammity/spam;ham",
268 SCRIPT_NAME="/spammity", PATH_INFO="/spam;ham")
269 self.checkReqURI("http://127.0.0.1/spammity/spam;cookie=1234,5678",
270 SCRIPT_NAME="/spammity", PATH_INFO="/spam;cookie=1234,5678")
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000271 self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
272 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
Serhiy Storchakae1986922014-01-12 12:11:47 +0200273 self.checkReqURI("http://127.0.0.1/spammity/spam?s%E4y=ni",
274 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="s%E4y=ni")
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000275 self.checkReqURI("http://127.0.0.1/spammity/spam", 0,
276 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
277
278 def testFileWrapper(self):
279 self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10])
280
281 def testHopByHop(self):
282 for hop in (
283 "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization "
284 "TE Trailers Transfer-Encoding Upgrade"
285 ).split():
286 for alt in hop, hop.title(), hop.upper(), hop.lower():
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000287 self.assertTrue(util.is_hop_by_hop(alt))
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000288
289 # Not comprehensive, just a few random header names
290 for hop in (
291 "Accept Cache-Control Date Pragma Trailer Via Warning"
292 ).split():
293 for alt in hop, hop.title(), hop.upper(), hop.lower():
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000294 self.assertFalse(util.is_hop_by_hop(alt))
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000295
296class HeaderTests(TestCase):
297
298 def testMappingInterface(self):
299 test = [('x','y')]
300 self.assertEqual(len(Headers([])),0)
301 self.assertEqual(len(Headers(test[:])),1)
302 self.assertEqual(Headers(test[:]).keys(), ['x'])
303 self.assertEqual(Headers(test[:]).values(), ['y'])
304 self.assertEqual(Headers(test[:]).items(), test)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000305 self.assertFalse(Headers(test).items() is test) # must be copy!
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000306
307 h=Headers([])
308 del h['foo'] # should not raise an error
309
310 h['Foo'] = 'bar'
311 for m in h.has_key, h.__contains__, h.get, h.get_all, h.__getitem__:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000312 self.assertTrue(m('foo'))
313 self.assertTrue(m('Foo'))
314 self.assertTrue(m('FOO'))
315 self.assertFalse(m('bar'))
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000316
317 self.assertEqual(h['foo'],'bar')
318 h['foo'] = 'baz'
319 self.assertEqual(h['FOO'],'baz')
320 self.assertEqual(h.get_all('foo'),['baz'])
321
322 self.assertEqual(h.get("foo","whee"), "baz")
323 self.assertEqual(h.get("zoo","whee"), "whee")
324 self.assertEqual(h.setdefault("foo","whee"), "baz")
325 self.assertEqual(h.setdefault("zoo","whee"), "whee")
326 self.assertEqual(h["foo"],"baz")
327 self.assertEqual(h["zoo"],"whee")
328
329 def testRequireList(self):
330 self.assertRaises(TypeError, Headers, "foo")
331
332
333 def testExtras(self):
334 h = Headers([])
335 self.assertEqual(str(h),'\r\n')
336
337 h.add_header('foo','bar',baz="spam")
338 self.assertEqual(h['foo'], 'bar; baz="spam"')
339 self.assertEqual(str(h),'foo: bar; baz="spam"\r\n\r\n')
340
341 h.add_header('Foo','bar',cheese=None)
342 self.assertEqual(h.get_all('foo'),
343 ['bar; baz="spam"', 'bar; cheese'])
344
345 self.assertEqual(str(h),
346 'foo: bar; baz="spam"\r\n'
347 'Foo: bar; cheese\r\n'
348 '\r\n'
349 )
350
351
352class ErrorHandler(BaseCGIHandler):
353 """Simple handler subclass for testing BaseHandler"""
354
Antoine Pitroub3c169b2009-11-03 16:41:20 +0000355 # BaseHandler records the OS environment at import time, but envvars
356 # might have been changed later by other tests, which trips up
357 # HandlerTests.testEnviron().
358 os_environ = dict(os.environ.items())
359
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000360 def __init__(self,**kw):
361 setup_testing_defaults(kw)
362 BaseCGIHandler.__init__(
363 self, StringIO(''), StringIO(), StringIO(), kw,
364 multithread=True, multiprocess=True
365 )
366
367class TestHandler(ErrorHandler):
368 """Simple handler subclass for testing BaseHandler, w/error passthru"""
369
370 def handle_error(self):
371 raise # for testing, we want to see what's happening
372
373
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000374class HandlerTests(TestCase):
375
376 def checkEnvironAttrs(self, handler):
377 env = handler.environ
378 for attr in [
379 'version','multithread','multiprocess','run_once','file_wrapper'
380 ]:
381 if attr=='file_wrapper' and handler.wsgi_file_wrapper is None:
382 continue
383 self.assertEqual(getattr(handler,'wsgi_'+attr),env['wsgi.'+attr])
384
385 def checkOSEnviron(self,handler):
386 empty = {}; setup_testing_defaults(empty)
387 env = handler.environ
388 from os import environ
389 for k,v in environ.items():
Ezio Melottidde5b942010-02-03 05:37:26 +0000390 if k not in empty:
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000391 self.assertEqual(env[k],v)
392 for k,v in empty.items():
Ezio Melottidde5b942010-02-03 05:37:26 +0000393 self.assertIn(k, env)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000394
395 def testEnviron(self):
396 h = TestHandler(X="Y")
397 h.setup_environ()
398 self.checkEnvironAttrs(h)
399 self.checkOSEnviron(h)
400 self.assertEqual(h.environ["X"],"Y")
401
402 def testCGIEnviron(self):
403 h = BaseCGIHandler(None,None,None,{})
404 h.setup_environ()
405 for key in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors':
Ezio Melottidde5b942010-02-03 05:37:26 +0000406 self.assertIn(key, h.environ)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000407
408 def testScheme(self):
409 h=TestHandler(HTTPS="on"); h.setup_environ()
410 self.assertEqual(h.environ['wsgi.url_scheme'],'https')
411 h=TestHandler(); h.setup_environ()
412 self.assertEqual(h.environ['wsgi.url_scheme'],'http')
413
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000414 def testAbstractMethods(self):
415 h = BaseHandler()
416 for name in [
417 '_flush','get_stdin','get_stderr','add_cgi_vars'
418 ]:
419 self.assertRaises(NotImplementedError, getattr(h,name))
420 self.assertRaises(NotImplementedError, h._write, "test")
421
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000422 def testContentLength(self):
423 # Demo one reason iteration is better than write()... ;)
424
425 def trivial_app1(e,s):
426 s('200 OK',[])
427 return [e['wsgi.url_scheme']]
428
429 def trivial_app2(e,s):
430 s('200 OK',[])(e['wsgi.url_scheme'])
431 return []
432
Antoine Pitrou77c1b382011-01-06 17:19:05 +0000433 def trivial_app4(e,s):
434 # Simulate a response to a HEAD request
435 s('200 OK',[('Content-Length', '12345')])
436 return []
437
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000438 h = TestHandler()
439 h.run(trivial_app1)
440 self.assertEqual(h.stdout.getvalue(),
441 "Status: 200 OK\r\n"
442 "Content-Length: 4\r\n"
443 "\r\n"
444 "http")
445
446 h = TestHandler()
447 h.run(trivial_app2)
448 self.assertEqual(h.stdout.getvalue(),
449 "Status: 200 OK\r\n"
450 "\r\n"
451 "http")
452
453
Antoine Pitrou77c1b382011-01-06 17:19:05 +0000454 h = TestHandler()
455 h.run(trivial_app4)
456 self.assertEqual(h.stdout.getvalue(),
457 b'Status: 200 OK\r\n'
458 b'Content-Length: 12345\r\n'
459 b'\r\n')
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000460
461 def testBasicErrorOutput(self):
462
463 def non_error_app(e,s):
464 s('200 OK',[])
465 return []
466
467 def error_app(e,s):
468 raise AssertionError("This should be caught by handler")
469
470 h = ErrorHandler()
471 h.run(non_error_app)
472 self.assertEqual(h.stdout.getvalue(),
473 "Status: 200 OK\r\n"
474 "Content-Length: 0\r\n"
475 "\r\n")
476 self.assertEqual(h.stderr.getvalue(),"")
477
478 h = ErrorHandler()
479 h.run(error_app)
480 self.assertEqual(h.stdout.getvalue(),
481 "Status: %s\r\n"
482 "Content-Type: text/plain\r\n"
483 "Content-Length: %d\r\n"
484 "\r\n%s" % (h.error_status,len(h.error_body),h.error_body))
485
Antoine Pitrouaf45b112010-01-04 23:28:16 +0000486 self.assertNotEqual(h.stderr.getvalue().find("AssertionError"), -1)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000487
488 def testErrorAfterOutput(self):
489 MSG = "Some output has been sent"
490 def error_app(e,s):
491 s("200 OK",[])(MSG)
492 raise AssertionError("This should be caught by handler")
493
494 h = ErrorHandler()
495 h.run(error_app)
496 self.assertEqual(h.stdout.getvalue(),
497 "Status: 200 OK\r\n"
498 "\r\n"+MSG)
Antoine Pitrouaf45b112010-01-04 23:28:16 +0000499 self.assertNotEqual(h.stderr.getvalue().find("AssertionError"), -1)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000500
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000501 def testHeaderFormats(self):
502
503 def non_error_app(e,s):
504 s('200 OK',[])
505 return []
506
507 stdpat = (
508 r"HTTP/%s 200 OK\r\n"
509 r"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n"
510 r"%s" r"Content-Length: 0\r\n" r"\r\n"
511 )
512 shortpat = (
513 "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
514 )
515
516 for ssw in "FooBar/1.0", None:
517 sw = ssw and "Server: %s\r\n" % ssw or ""
518
519 for version in "1.0", "1.1":
520 for proto in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1":
521
522 h = TestHandler(SERVER_PROTOCOL=proto)
523 h.origin_server = False
524 h.http_version = version
525 h.server_software = ssw
526 h.run(non_error_app)
527 self.assertEqual(shortpat,h.stdout.getvalue())
528
529 h = TestHandler(SERVER_PROTOCOL=proto)
530 h.origin_server = True
531 h.http_version = version
532 h.server_software = ssw
533 h.run(non_error_app)
534 if proto=="HTTP/0.9":
535 self.assertEqual(h.stdout.getvalue(),"")
536 else:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000537 self.assertTrue(
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000538 re.match(stdpat%(version,sw), h.stdout.getvalue()),
539 (stdpat%(version,sw), h.stdout.getvalue())
540 )
541
Antoine Pitroue97a24d2012-10-21 14:09:05 +0200542 def testCloseOnError(self):
543 side_effects = {'close_called': False}
544 MSG = b"Some output has been sent"
545 def error_app(e,s):
546 s("200 OK",[])(MSG)
547 class CrashyIterable(object):
548 def __iter__(self):
549 while True:
550 yield b'blah'
551 raise AssertionError("This should be caught by handler")
552
553 def close(self):
554 side_effects['close_called'] = True
555 return CrashyIterable()
556
557 h = ErrorHandler()
558 h.run(error_app)
559 self.assertEqual(side_effects['close_called'], True)
560
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000561
562def test_main():
Collin Winterc2898c52007-04-25 17:29:52 +0000563 test_support.run_unittest(__name__)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000564
565if __name__ == "__main__":
566 test_main()