blob: b893bcf05c97a17ae7f3558874ed5b504520b4c0 [file] [log] [blame]
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001from __future__ import nested_scopes # Backward compat for 2.1
Guido van Rossumd8faa362007-04-27 19:54:29 +00002from unittest import TestCase
Thomas Wouters0e3f5912006-08-11 14:57:12 +00003from wsgiref.util import setup_testing_defaults
4from wsgiref.headers import Headers
5from wsgiref.handlers import BaseHandler, BaseCGIHandler
6from wsgiref import util
7from wsgiref.validate import validator
8from wsgiref.simple_server import WSGIServer, WSGIRequestHandler, demo_app
9from wsgiref.simple_server import make_server
Jeremy Hyltone6b59c52007-08-10 19:13:33 +000010from io import StringIO, BytesIO, BufferedReader
Alexandre Vassalottice261952008-05-12 02:31:37 +000011from socketserver import BaseServer
Thomas Wouters0e3f5912006-08-11 14:57:12 +000012import re, sys
13
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Thomas Wouters0e3f5912006-08-11 14:57:12 +000015
16class MockServer(WSGIServer):
17 """Non-socket HTTP server"""
18
19 def __init__(self, server_address, RequestHandlerClass):
20 BaseServer.__init__(self, server_address, RequestHandlerClass)
21 self.server_bind()
22
23 def server_bind(self):
24 host, port = self.server_address
25 self.server_name = host
26 self.server_port = port
27 self.setup_environ()
28
29
30class MockHandler(WSGIRequestHandler):
31 """Non-socket HTTP handler"""
32 def setup(self):
33 self.connection = self.request
34 self.rfile, self.wfile = self.connection
35
36 def finish(self):
37 pass
38
39
40
41
42
43def hello_app(environ,start_response):
44 start_response("200 OK", [
45 ('Content-Type','text/plain'),
46 ('Date','Mon, 05 Jun 2006 18:49:54 GMT')
47 ])
48 return ["Hello, world!"]
49
Guido van Rossum6a10e022007-08-08 17:01:45 +000050def run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"):
Thomas Wouters0e3f5912006-08-11 14:57:12 +000051 server = make_server("", 80, app, MockServer, MockHandler)
Jeremy Hyltone6b59c52007-08-10 19:13:33 +000052 inp = BufferedReader(BytesIO(data))
Antoine Pitrou38a66ad2009-01-03 18:41:49 +000053 out = BytesIO()
Jeremy Hyltone6b59c52007-08-10 19:13:33 +000054 olderr = sys.stderr
55 err = sys.stderr = StringIO()
Thomas Wouters0e3f5912006-08-11 14:57:12 +000056
57 try:
Jeremy Hyltone6b59c52007-08-10 19:13:33 +000058 server.finish_request((inp, out), ("127.0.0.1",8888))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000059 finally:
60 sys.stderr = olderr
61
62 return out.getvalue(), err.getvalue()
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86def compare_generic_iter(make_it,match):
87 """Utility to compare a generic 2.1/2.2+ iterator with an iterable
88
89 If running under Python 2.2+, this tests the iterator using iter()/next(),
90 as well as __getitem__. 'make_it' must be a function returning a fresh
91 iterator to be tested (since this may test the iterator twice)."""
92
93 it = make_it()
94 n = 0
95 for item in match:
96 if not it[n]==item: raise AssertionError
97 n+=1
98 try:
99 it[n]
100 except IndexError:
101 pass
102 else:
103 raise AssertionError("Too many items from __getitem__",it)
104
105 try:
106 iter, StopIteration
107 except NameError:
108 pass
109 else:
110 # Only test iter mode under 2.2+
111 it = make_it()
112 if not iter(it) is it: raise AssertionError
113 for item in match:
Georg Brandla18af4e2007-04-21 15:47:16 +0000114 if not next(it) == item: raise AssertionError
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000115 try:
Georg Brandla18af4e2007-04-21 15:47:16 +0000116 next(it)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117 except StopIteration:
118 pass
119 else:
Georg Brandla18af4e2007-04-21 15:47:16 +0000120 raise AssertionError("Too many items from .__next__()", it)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000121
122
123
124
125
126
127class IntegrationTests(TestCase):
128
129 def check_hello(self, out, has_length=True):
130 self.assertEqual(out,
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000131 ("HTTP/1.0 200 OK\r\n"
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000132 "Server: WSGIServer/0.1 Python/"+sys.version.split()[0]+"\r\n"
133 "Content-Type: text/plain\r\n"
134 "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
135 (has_length and "Content-Length: 13\r\n" or "") +
136 "\r\n"
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000137 "Hello, world!").encode("iso-8859-1")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000138 )
139
140 def test_plain_hello(self):
141 out, err = run_amock()
142 self.check_hello(out)
143
144 def test_validated_hello(self):
145 out, err = run_amock(validator(hello_app))
146 # the middleware doesn't support len(), so content-length isn't there
147 self.check_hello(out, has_length=False)
148
149 def test_simple_validation_error(self):
150 def bad_app(environ,start_response):
151 start_response("200 OK", ('Content-Type','text/plain'))
152 return ["Hello, world!"]
153 out, err = run_amock(validator(bad_app))
154 self.failUnless(out.endswith(
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000155 b"A server error occurred. Please contact the administrator."
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000156 ))
157 self.assertEqual(
158 err.splitlines()[-2],
159 "AssertionError: Headers (('Content-Type', 'text/plain')) must"
Martin v. Löwis250ad612008-04-07 05:43:42 +0000160 " be of type list: <class 'tuple'>"
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000161 )
162
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000163 def test_wsgi_input(self):
164 def bad_app(e,s):
165 e["wsgi.input"].read()
166 s(b"200 OK", [(b"Content-Type", b"text/plain; charset=utf-8")])
167 return [b"data"]
168 out, err = run_amock(validator(bad_app))
169 self.failUnless(out.endswith(
170 b"A server error occurred. Please contact the administrator."
171 ))
172 self.assertEqual(
173 err.splitlines()[-2], "AssertionError"
174 )
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000175
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000176 def test_bytes_validation(self):
177 def app(e, s):
178 s(b"200 OK", [
179 (b"Content-Type", b"text/plain; charset=utf-8"),
180 ("Date", "Wed, 24 Dec 2008 13:29:32 GMT"),
181 ])
182 return [b"data"]
183 out, err = run_amock(validator(app))
184 self.failUnless(err.endswith('"GET / HTTP/1.0" 200 4\n'))
185 self.assertEqual(
186 b"HTTP/1.0 200 OK\r\n"
187 b"Server: WSGIServer/0.1 Python/3.1a0\r\n"
188 b"Content-Type: text/plain; charset=utf-8\r\n"
189 b"Date: Wed, 24 Dec 2008 13:29:32 GMT\r\n"
190 b"\r\n"
191 b"data",
192 out)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000193
194
195
196
197class UtilityTests(TestCase):
198
199 def checkShift(self,sn_in,pi_in,part,sn_out,pi_out):
200 env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in}
201 util.setup_testing_defaults(env)
202 self.assertEqual(util.shift_path_info(env),part)
203 self.assertEqual(env['PATH_INFO'],pi_out)
204 self.assertEqual(env['SCRIPT_NAME'],sn_out)
205 return env
206
207 def checkDefault(self, key, value, alt=None):
208 # Check defaulting when empty
209 env = {}
210 util.setup_testing_defaults(env)
211 if isinstance(value,StringIO):
212 self.failUnless(isinstance(env[key],StringIO))
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000213 elif isinstance(value,BytesIO):
214 self.failUnless(isinstance(env[key],BytesIO))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000215 else:
216 self.assertEqual(env[key],value)
217
218 # Check existing value
219 env = {key:alt}
220 util.setup_testing_defaults(env)
221 self.failUnless(env[key] is alt)
222
223 def checkCrossDefault(self,key,value,**kw):
224 util.setup_testing_defaults(kw)
225 self.assertEqual(kw[key],value)
226
227 def checkAppURI(self,uri,**kw):
228 util.setup_testing_defaults(kw)
229 self.assertEqual(util.application_uri(kw),uri)
230
231 def checkReqURI(self,uri,query=1,**kw):
232 util.setup_testing_defaults(kw)
233 self.assertEqual(util.request_uri(kw,query),uri)
234
235
236
237
238
239
240 def checkFW(self,text,size,match):
241
242 def make_it(text=text,size=size):
243 return util.FileWrapper(StringIO(text),size)
244
245 compare_generic_iter(make_it,match)
246
247 it = make_it()
248 self.failIf(it.filelike.closed)
249
250 for item in it:
251 pass
252
253 self.failIf(it.filelike.closed)
254
255 it.close()
256 self.failUnless(it.filelike.closed)
257
258
259 def testSimpleShifts(self):
260 self.checkShift('','/', '', '/', '')
261 self.checkShift('','/x', 'x', '/x', '')
262 self.checkShift('/','', None, '/', '')
263 self.checkShift('/a','/x/y', 'x', '/a/x', '/y')
264 self.checkShift('/a','/x/', 'x', '/a/x', '/')
265
266
267 def testNormalizedShifts(self):
268 self.checkShift('/a/b', '/../y', '..', '/a', '/y')
269 self.checkShift('', '/../y', '..', '', '/y')
270 self.checkShift('/a/b', '//y', 'y', '/a/b/y', '')
271 self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/')
272 self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '')
273 self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/')
274 self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/')
275 self.checkShift('/a/b', '///', '', '/a/b/', '')
276 self.checkShift('/a/b', '/.//', '', '/a/b/', '')
277 self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
278 self.checkShift('/a/b', '/.', None, '/a/b', '')
279
280
281 def testDefaults(self):
282 for key, value in [
283 ('SERVER_NAME','127.0.0.1'),
284 ('SERVER_PORT', '80'),
285 ('SERVER_PROTOCOL','HTTP/1.0'),
286 ('HTTP_HOST','127.0.0.1'),
287 ('REQUEST_METHOD','GET'),
288 ('SCRIPT_NAME',''),
289 ('PATH_INFO','/'),
290 ('wsgi.version', (1,0)),
291 ('wsgi.run_once', 0),
292 ('wsgi.multithread', 0),
293 ('wsgi.multiprocess', 0),
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000294 ('wsgi.input', BytesIO()),
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000295 ('wsgi.errors', StringIO()),
296 ('wsgi.url_scheme','http'),
297 ]:
298 self.checkDefault(key,value)
299
300
301 def testCrossDefaults(self):
302 self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar")
303 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on")
304 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1")
305 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes")
306 self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo")
307 self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo")
308 self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on")
309
310
311 def testGuessScheme(self):
312 self.assertEqual(util.guess_scheme({}), "http")
313 self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http")
314 self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https")
315 self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https")
316 self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https")
317
318
319
320
321
322 def testAppURIs(self):
323 self.checkAppURI("http://127.0.0.1/")
324 self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
Guido van Rossum52dbbb92008-08-18 21:44:30 +0000325 self.checkAppURI("http://127.0.0.1/sp%C3%A4m", SCRIPT_NAME="/späm")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000326 self.checkAppURI("http://spam.example.com:2071/",
327 HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071")
328 self.checkAppURI("http://spam.example.com/",
329 SERVER_NAME="spam.example.com")
330 self.checkAppURI("http://127.0.0.1/",
331 HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com")
332 self.checkAppURI("https://127.0.0.1/", HTTPS="on")
333 self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000",
334 HTTP_HOST=None)
335
336 def testReqURIs(self):
337 self.checkReqURI("http://127.0.0.1/")
338 self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
Guido van Rossum52dbbb92008-08-18 21:44:30 +0000339 self.checkReqURI("http://127.0.0.1/sp%C3%A4m", SCRIPT_NAME="/späm")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000340 self.checkReqURI("http://127.0.0.1/spammity/spam",
341 SCRIPT_NAME="/spammity", PATH_INFO="/spam")
342 self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
343 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
344 self.checkReqURI("http://127.0.0.1/spammity/spam", 0,
345 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
346
347 def testFileWrapper(self):
348 self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10])
349
350 def testHopByHop(self):
351 for hop in (
352 "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization "
353 "TE Trailers Transfer-Encoding Upgrade"
354 ).split():
355 for alt in hop, hop.title(), hop.upper(), hop.lower():
356 self.failUnless(util.is_hop_by_hop(alt))
357
358 # Not comprehensive, just a few random header names
359 for hop in (
360 "Accept Cache-Control Date Pragma Trailer Via Warning"
361 ).split():
362 for alt in hop, hop.title(), hop.upper(), hop.lower():
363 self.failIf(util.is_hop_by_hop(alt))
364
365class HeaderTests(TestCase):
366
367 def testMappingInterface(self):
368 test = [('x','y')]
369 self.assertEqual(len(Headers([])),0)
370 self.assertEqual(len(Headers(test[:])),1)
371 self.assertEqual(Headers(test[:]).keys(), ['x'])
372 self.assertEqual(Headers(test[:]).values(), ['y'])
373 self.assertEqual(Headers(test[:]).items(), test)
374 self.failIf(Headers(test).items() is test) # must be copy!
375
376 h=Headers([])
377 del h['foo'] # should not raise an error
378
379 h['Foo'] = 'bar'
Guido van Rossume2b70bc2006-08-18 22:13:04 +0000380 for m in h.__contains__, h.get, h.get_all, h.__getitem__:
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000381 self.failUnless(m('foo'))
382 self.failUnless(m('Foo'))
383 self.failUnless(m('FOO'))
384 self.failIf(m('bar'))
385
386 self.assertEqual(h['foo'],'bar')
387 h['foo'] = 'baz'
388 self.assertEqual(h['FOO'],'baz')
389 self.assertEqual(h.get_all('foo'),['baz'])
390
391 self.assertEqual(h.get("foo","whee"), "baz")
392 self.assertEqual(h.get("zoo","whee"), "whee")
393 self.assertEqual(h.setdefault("foo","whee"), "baz")
394 self.assertEqual(h.setdefault("zoo","whee"), "whee")
395 self.assertEqual(h["foo"],"baz")
396 self.assertEqual(h["zoo"],"whee")
397
398 def testRequireList(self):
399 self.assertRaises(TypeError, Headers, "foo")
400
401
402 def testExtras(self):
403 h = Headers([])
404 self.assertEqual(str(h),'\r\n')
405
406 h.add_header('foo','bar',baz="spam")
407 self.assertEqual(h['foo'], 'bar; baz="spam"')
408 self.assertEqual(str(h),'foo: bar; baz="spam"\r\n\r\n')
409
410 h.add_header('Foo','bar',cheese=None)
411 self.assertEqual(h.get_all('foo'),
412 ['bar; baz="spam"', 'bar; cheese'])
413
414 self.assertEqual(str(h),
415 'foo: bar; baz="spam"\r\n'
416 'Foo: bar; cheese\r\n'
417 '\r\n'
418 )
419
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000420 def testBytes(self):
421 h = Headers([
422 (b"Content-Type", b"text/plain; charset=utf-8"),
423 ])
424 self.assertEqual("text/plain; charset=utf-8", h.get("Content-Type"))
425
426 h[b"Foo"] = bytes(b"bar")
427 self.assertEqual("bar", h.get("Foo"))
428
429 h.setdefault(b"Bar", b"foo")
430 self.assertEqual("foo", h.get("Bar"))
431
432 h.add_header(b'content-disposition', b'attachment',
433 filename=b'bud.gif')
434 self.assertEqual('attachment; filename="bud.gif"',
435 h.get("content-disposition"))
436
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000437
438class ErrorHandler(BaseCGIHandler):
439 """Simple handler subclass for testing BaseHandler"""
440
441 def __init__(self,**kw):
442 setup_testing_defaults(kw)
443 BaseCGIHandler.__init__(
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000444 self, BytesIO(), BytesIO(), StringIO(), kw,
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000445 multithread=True, multiprocess=True
446 )
447
448class TestHandler(ErrorHandler):
449 """Simple handler subclass for testing BaseHandler, w/error passthru"""
450
451 def handle_error(self):
452 raise # for testing, we want to see what's happening
453
454
455
456
457
458
459
460
461
462
463
464class HandlerTests(TestCase):
465
466 def checkEnvironAttrs(self, handler):
467 env = handler.environ
468 for attr in [
469 'version','multithread','multiprocess','run_once','file_wrapper'
470 ]:
471 if attr=='file_wrapper' and handler.wsgi_file_wrapper is None:
472 continue
473 self.assertEqual(getattr(handler,'wsgi_'+attr),env['wsgi.'+attr])
474
475 def checkOSEnviron(self,handler):
476 empty = {}; setup_testing_defaults(empty)
477 env = handler.environ
478 from os import environ
479 for k,v in environ.items():
Guido van Rossume2b70bc2006-08-18 22:13:04 +0000480 if k not in empty:
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000481 self.assertEqual(env[k],v)
482 for k,v in empty.items():
Guido van Rossume2b70bc2006-08-18 22:13:04 +0000483 self.failUnless(k in env)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000484
485 def testEnviron(self):
486 h = TestHandler(X="Y")
487 h.setup_environ()
488 self.checkEnvironAttrs(h)
489 self.checkOSEnviron(h)
490 self.assertEqual(h.environ["X"],"Y")
491
492 def testCGIEnviron(self):
493 h = BaseCGIHandler(None,None,None,{})
494 h.setup_environ()
495 for key in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors':
Guido van Rossume2b70bc2006-08-18 22:13:04 +0000496 self.assert_(key in h.environ)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000497
498 def testScheme(self):
499 h=TestHandler(HTTPS="on"); h.setup_environ()
500 self.assertEqual(h.environ['wsgi.url_scheme'],'https')
501 h=TestHandler(); h.setup_environ()
502 self.assertEqual(h.environ['wsgi.url_scheme'],'http')
503
504
505 def testAbstractMethods(self):
506 h = BaseHandler()
507 for name in [
508 '_flush','get_stdin','get_stderr','add_cgi_vars'
509 ]:
510 self.assertRaises(NotImplementedError, getattr(h,name))
511 self.assertRaises(NotImplementedError, h._write, "test")
512
513
514 def testContentLength(self):
515 # Demo one reason iteration is better than write()... ;)
516
517 def trivial_app1(e,s):
518 s('200 OK',[])
519 return [e['wsgi.url_scheme']]
520
521 def trivial_app2(e,s):
522 s('200 OK',[])(e['wsgi.url_scheme'])
523 return []
524
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000525 def trivial_app3(e,s):
526 s('200 OK',[])
527 return ['\u0442\u0435\u0441\u0442'.encode("utf-8")]
528
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000529 h = TestHandler()
530 h.run(trivial_app1)
531 self.assertEqual(h.stdout.getvalue(),
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000532 ("Status: 200 OK\r\n"
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000533 "Content-Length: 4\r\n"
534 "\r\n"
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000535 "http").encode("iso-8859-1"))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000536
537 h = TestHandler()
538 h.run(trivial_app2)
539 self.assertEqual(h.stdout.getvalue(),
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000540 ("Status: 200 OK\r\n"
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000541 "\r\n"
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000542 "http").encode("iso-8859-1"))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000543
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000544 h = TestHandler()
545 h.run(trivial_app3)
546 self.assertEqual(h.stdout.getvalue(),
547 b'Status: 200 OK\r\n'
548 b'Content-Length: 8\r\n'
549 b'\r\n'
550 b'\xd1\x82\xd0\xb5\xd1\x81\xd1\x82')
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000551
552
553
554
555
556
557 def testBasicErrorOutput(self):
558
559 def non_error_app(e,s):
560 s('200 OK',[])
561 return []
562
563 def error_app(e,s):
564 raise AssertionError("This should be caught by handler")
565
566 h = ErrorHandler()
567 h.run(non_error_app)
568 self.assertEqual(h.stdout.getvalue(),
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000569 ("Status: 200 OK\r\n"
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000570 "Content-Length: 0\r\n"
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000571 "\r\n").encode("iso-8859-1"))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000572 self.assertEqual(h.stderr.getvalue(),"")
573
574 h = ErrorHandler()
575 h.run(error_app)
576 self.assertEqual(h.stdout.getvalue(),
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000577 ("Status: %s\r\n"
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000578 "Content-Type: text/plain\r\n"
579 "Content-Length: %d\r\n"
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000580 "\r\n%s" % (h.error_status,len(h.error_body),h.error_body)
581 ).encode("iso-8859-1"))
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000582
Guido van Rossumb053cd82006-08-24 03:53:23 +0000583 self.failUnless("AssertionError" in h.stderr.getvalue())
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000584
585 def testErrorAfterOutput(self):
586 MSG = "Some output has been sent"
587 def error_app(e,s):
588 s("200 OK",[])(MSG)
589 raise AssertionError("This should be caught by handler")
590
591 h = ErrorHandler()
592 h.run(error_app)
593 self.assertEqual(h.stdout.getvalue(),
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000594 ("Status: 200 OK\r\n"
595 "\r\n"+MSG).encode("iso-8859-1"))
Guido van Rossumb053cd82006-08-24 03:53:23 +0000596 self.failUnless("AssertionError" in h.stderr.getvalue())
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000597
598
599 def testHeaderFormats(self):
600
601 def non_error_app(e,s):
602 s('200 OK',[])
603 return []
604
605 stdpat = (
606 r"HTTP/%s 200 OK\r\n"
607 r"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n"
608 r"%s" r"Content-Length: 0\r\n" r"\r\n"
609 )
610 shortpat = (
611 "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000612 ).encode("iso-8859-1")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000613
614 for ssw in "FooBar/1.0", None:
615 sw = ssw and "Server: %s\r\n" % ssw or ""
616
617 for version in "1.0", "1.1":
618 for proto in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1":
619
620 h = TestHandler(SERVER_PROTOCOL=proto)
621 h.origin_server = False
622 h.http_version = version
623 h.server_software = ssw
624 h.run(non_error_app)
625 self.assertEqual(shortpat,h.stdout.getvalue())
626
627 h = TestHandler(SERVER_PROTOCOL=proto)
628 h.origin_server = True
629 h.http_version = version
630 h.server_software = ssw
631 h.run(non_error_app)
632 if proto=="HTTP/0.9":
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000633 self.assertEqual(h.stdout.getvalue(),b"")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000634 else:
635 self.failUnless(
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000636 re.match((stdpat%(version,sw)).encode("iso-8859-1"),
637 h.stdout.getvalue()),
638 ((stdpat%(version,sw)).encode("iso-8859-1"),
639 h.stdout.getvalue())
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000640 )
641
Antoine Pitrou38a66ad2009-01-03 18:41:49 +0000642 def testBytesData(self):
643 def app(e, s):
644 s(b"200 OK", [
645 (b"Content-Type", b"text/plain; charset=utf-8"),
646 ])
647 return [b"data"]
648
649 h = TestHandler()
650 h.run(app)
651 self.assertEqual(b"Status: 200 OK\r\n"
652 b"Content-Type: text/plain; charset=utf-8\r\n"
653 b"Content-Length: 4\r\n"
654 b"\r\n"
655 b"data",
656 h.stdout.getvalue())
657
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000658# This epilogue is needed for compatibility with the Python 2.5 regrtest module
659
660def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000661 support.run_unittest(__name__)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000662
663if __name__ == "__main__":
664 test_main()
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694# the above lines intentionally left blank