blob: b98452dbc440b998e56e67248657bb29bdf1e4d7 [file] [log] [blame]
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001from __future__ import nested_scopes # Backward compat for 2.1
Guido van Rossumd8faa362007-04-27 19:54:29 +00002from unittest import TestCase
Thomas Wouters0e3f5912006-08-11 14:57:12 +00003from wsgiref.util import setup_testing_defaults
4from wsgiref.headers import Headers
5from wsgiref.handlers import BaseHandler, BaseCGIHandler
6from wsgiref import util
7from wsgiref.validate import validator
8from wsgiref.simple_server import WSGIServer, WSGIRequestHandler, demo_app
9from wsgiref.simple_server import make_server
Jeremy Hyltone6b59c52007-08-10 19:13:33 +000010from io import StringIO, BytesIO, BufferedReader
Alexandre Vassalottice261952008-05-12 02:31:37 +000011from socketserver import BaseServer
Thomas Wouters0e3f5912006-08-11 14:57:12 +000012import re, sys
13
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Thomas Wouters0e3f5912006-08-11 14:57:12 +000015
16class MockServer(WSGIServer):
17 """Non-socket HTTP server"""
18
19 def __init__(self, server_address, RequestHandlerClass):
20 BaseServer.__init__(self, server_address, RequestHandlerClass)
21 self.server_bind()
22
23 def server_bind(self):
24 host, port = self.server_address
25 self.server_name = host
26 self.server_port = port
27 self.setup_environ()
28
29
30class MockHandler(WSGIRequestHandler):
31 """Non-socket HTTP handler"""
32 def setup(self):
33 self.connection = self.request
34 self.rfile, self.wfile = self.connection
35
36 def finish(self):
37 pass
38
39
40
41
42
43def hello_app(environ,start_response):
44 start_response("200 OK", [
45 ('Content-Type','text/plain'),
46 ('Date','Mon, 05 Jun 2006 18:49:54 GMT')
47 ])
48 return ["Hello, world!"]
49
Guido van Rossum6a10e022007-08-08 17:01:45 +000050def run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"):
Thomas Wouters0e3f5912006-08-11 14:57:12 +000051 server = make_server("", 80, app, MockServer, MockHandler)
Jeremy Hyltone6b59c52007-08-10 19:13:33 +000052 inp = BufferedReader(BytesIO(data))
53 out = StringIO()
54 olderr = sys.stderr
55 err = sys.stderr = StringIO()
Thomas Wouters0e3f5912006-08-11 14:57:12 +000056
57 try:
Jeremy Hyltone6b59c52007-08-10 19:13:33 +000058 server.finish_request((inp, out), ("127.0.0.1",8888))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000059 finally:
60 sys.stderr = olderr
61
62 return out.getvalue(), err.getvalue()
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86def compare_generic_iter(make_it,match):
87 """Utility to compare a generic 2.1/2.2+ iterator with an iterable
88
89 If running under Python 2.2+, this tests the iterator using iter()/next(),
90 as well as __getitem__. 'make_it' must be a function returning a fresh
91 iterator to be tested (since this may test the iterator twice)."""
92
93 it = make_it()
94 n = 0
95 for item in match:
96 if not it[n]==item: raise AssertionError
97 n+=1
98 try:
99 it[n]
100 except IndexError:
101 pass
102 else:
103 raise AssertionError("Too many items from __getitem__",it)
104
105 try:
106 iter, StopIteration
107 except NameError:
108 pass
109 else:
110 # Only test iter mode under 2.2+
111 it = make_it()
112 if not iter(it) is it: raise AssertionError
113 for item in match:
Georg Brandla18af4e2007-04-21 15:47:16 +0000114 if not next(it) == item: raise AssertionError
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000115 try:
Georg Brandla18af4e2007-04-21 15:47:16 +0000116 next(it)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117 except StopIteration:
118 pass
119 else:
Georg Brandla18af4e2007-04-21 15:47:16 +0000120 raise AssertionError("Too many items from .__next__()", it)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000121
122
123
124
125
126
127class IntegrationTests(TestCase):
128
129 def check_hello(self, out, has_length=True):
130 self.assertEqual(out,
131 "HTTP/1.0 200 OK\r\n"
132 "Server: WSGIServer/0.1 Python/"+sys.version.split()[0]+"\r\n"
133 "Content-Type: text/plain\r\n"
134 "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
135 (has_length and "Content-Length: 13\r\n" or "") +
136 "\r\n"
137 "Hello, world!"
138 )
139
140 def test_plain_hello(self):
141 out, err = run_amock()
142 self.check_hello(out)
143
144 def test_validated_hello(self):
145 out, err = run_amock(validator(hello_app))
146 # the middleware doesn't support len(), so content-length isn't there
147 self.check_hello(out, has_length=False)
148
149 def test_simple_validation_error(self):
150 def bad_app(environ,start_response):
151 start_response("200 OK", ('Content-Type','text/plain'))
152 return ["Hello, world!"]
153 out, err = run_amock(validator(bad_app))
154 self.failUnless(out.endswith(
155 "A server error occurred. Please contact the administrator."
156 ))
157 self.assertEqual(
158 err.splitlines()[-2],
159 "AssertionError: Headers (('Content-Type', 'text/plain')) must"
Martin v. Löwis250ad612008-04-07 05:43:42 +0000160 " be of type list: <class 'tuple'>"
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000161 )
162
163
164
165
166
167
168class UtilityTests(TestCase):
169
170 def checkShift(self,sn_in,pi_in,part,sn_out,pi_out):
171 env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in}
172 util.setup_testing_defaults(env)
173 self.assertEqual(util.shift_path_info(env),part)
174 self.assertEqual(env['PATH_INFO'],pi_out)
175 self.assertEqual(env['SCRIPT_NAME'],sn_out)
176 return env
177
178 def checkDefault(self, key, value, alt=None):
179 # Check defaulting when empty
180 env = {}
181 util.setup_testing_defaults(env)
182 if isinstance(value,StringIO):
183 self.failUnless(isinstance(env[key],StringIO))
184 else:
185 self.assertEqual(env[key],value)
186
187 # Check existing value
188 env = {key:alt}
189 util.setup_testing_defaults(env)
190 self.failUnless(env[key] is alt)
191
192 def checkCrossDefault(self,key,value,**kw):
193 util.setup_testing_defaults(kw)
194 self.assertEqual(kw[key],value)
195
196 def checkAppURI(self,uri,**kw):
197 util.setup_testing_defaults(kw)
198 self.assertEqual(util.application_uri(kw),uri)
199
200 def checkReqURI(self,uri,query=1,**kw):
201 util.setup_testing_defaults(kw)
202 self.assertEqual(util.request_uri(kw,query),uri)
203
204
205
206
207
208
209 def checkFW(self,text,size,match):
210
211 def make_it(text=text,size=size):
212 return util.FileWrapper(StringIO(text),size)
213
214 compare_generic_iter(make_it,match)
215
216 it = make_it()
217 self.failIf(it.filelike.closed)
218
219 for item in it:
220 pass
221
222 self.failIf(it.filelike.closed)
223
224 it.close()
225 self.failUnless(it.filelike.closed)
226
227
228 def testSimpleShifts(self):
229 self.checkShift('','/', '', '/', '')
230 self.checkShift('','/x', 'x', '/x', '')
231 self.checkShift('/','', None, '/', '')
232 self.checkShift('/a','/x/y', 'x', '/a/x', '/y')
233 self.checkShift('/a','/x/', 'x', '/a/x', '/')
234
235
236 def testNormalizedShifts(self):
237 self.checkShift('/a/b', '/../y', '..', '/a', '/y')
238 self.checkShift('', '/../y', '..', '', '/y')
239 self.checkShift('/a/b', '//y', 'y', '/a/b/y', '')
240 self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/')
241 self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '')
242 self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/')
243 self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/')
244 self.checkShift('/a/b', '///', '', '/a/b/', '')
245 self.checkShift('/a/b', '/.//', '', '/a/b/', '')
246 self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
247 self.checkShift('/a/b', '/.', None, '/a/b', '')
248
249
250 def testDefaults(self):
251 for key, value in [
252 ('SERVER_NAME','127.0.0.1'),
253 ('SERVER_PORT', '80'),
254 ('SERVER_PROTOCOL','HTTP/1.0'),
255 ('HTTP_HOST','127.0.0.1'),
256 ('REQUEST_METHOD','GET'),
257 ('SCRIPT_NAME',''),
258 ('PATH_INFO','/'),
259 ('wsgi.version', (1,0)),
260 ('wsgi.run_once', 0),
261 ('wsgi.multithread', 0),
262 ('wsgi.multiprocess', 0),
263 ('wsgi.input', StringIO("")),
264 ('wsgi.errors', StringIO()),
265 ('wsgi.url_scheme','http'),
266 ]:
267 self.checkDefault(key,value)
268
269
270 def testCrossDefaults(self):
271 self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar")
272 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on")
273 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1")
274 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes")
275 self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo")
276 self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo")
277 self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on")
278
279
280 def testGuessScheme(self):
281 self.assertEqual(util.guess_scheme({}), "http")
282 self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http")
283 self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https")
284 self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https")
285 self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https")
286
287
288
289
290
291 def testAppURIs(self):
292 self.checkAppURI("http://127.0.0.1/")
293 self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
Guido van Rossum52dbbb92008-08-18 21:44:30 +0000294 self.checkAppURI("http://127.0.0.1/sp%C3%A4m", SCRIPT_NAME="/späm")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000295 self.checkAppURI("http://spam.example.com:2071/",
296 HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071")
297 self.checkAppURI("http://spam.example.com/",
298 SERVER_NAME="spam.example.com")
299 self.checkAppURI("http://127.0.0.1/",
300 HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com")
301 self.checkAppURI("https://127.0.0.1/", HTTPS="on")
302 self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000",
303 HTTP_HOST=None)
304
305 def testReqURIs(self):
306 self.checkReqURI("http://127.0.0.1/")
307 self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
Guido van Rossum52dbbb92008-08-18 21:44:30 +0000308 self.checkReqURI("http://127.0.0.1/sp%C3%A4m", SCRIPT_NAME="/späm")
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000309 self.checkReqURI("http://127.0.0.1/spammity/spam",
310 SCRIPT_NAME="/spammity", PATH_INFO="/spam")
311 self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
312 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
313 self.checkReqURI("http://127.0.0.1/spammity/spam", 0,
314 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
315
316 def testFileWrapper(self):
317 self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10])
318
319 def testHopByHop(self):
320 for hop in (
321 "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization "
322 "TE Trailers Transfer-Encoding Upgrade"
323 ).split():
324 for alt in hop, hop.title(), hop.upper(), hop.lower():
325 self.failUnless(util.is_hop_by_hop(alt))
326
327 # Not comprehensive, just a few random header names
328 for hop in (
329 "Accept Cache-Control Date Pragma Trailer Via Warning"
330 ).split():
331 for alt in hop, hop.title(), hop.upper(), hop.lower():
332 self.failIf(util.is_hop_by_hop(alt))
333
334class HeaderTests(TestCase):
335
336 def testMappingInterface(self):
337 test = [('x','y')]
338 self.assertEqual(len(Headers([])),0)
339 self.assertEqual(len(Headers(test[:])),1)
340 self.assertEqual(Headers(test[:]).keys(), ['x'])
341 self.assertEqual(Headers(test[:]).values(), ['y'])
342 self.assertEqual(Headers(test[:]).items(), test)
343 self.failIf(Headers(test).items() is test) # must be copy!
344
345 h=Headers([])
346 del h['foo'] # should not raise an error
347
348 h['Foo'] = 'bar'
Guido van Rossume2b70bc2006-08-18 22:13:04 +0000349 for m in h.__contains__, h.get, h.get_all, h.__getitem__:
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000350 self.failUnless(m('foo'))
351 self.failUnless(m('Foo'))
352 self.failUnless(m('FOO'))
353 self.failIf(m('bar'))
354
355 self.assertEqual(h['foo'],'bar')
356 h['foo'] = 'baz'
357 self.assertEqual(h['FOO'],'baz')
358 self.assertEqual(h.get_all('foo'),['baz'])
359
360 self.assertEqual(h.get("foo","whee"), "baz")
361 self.assertEqual(h.get("zoo","whee"), "whee")
362 self.assertEqual(h.setdefault("foo","whee"), "baz")
363 self.assertEqual(h.setdefault("zoo","whee"), "whee")
364 self.assertEqual(h["foo"],"baz")
365 self.assertEqual(h["zoo"],"whee")
366
367 def testRequireList(self):
368 self.assertRaises(TypeError, Headers, "foo")
369
370
371 def testExtras(self):
372 h = Headers([])
373 self.assertEqual(str(h),'\r\n')
374
375 h.add_header('foo','bar',baz="spam")
376 self.assertEqual(h['foo'], 'bar; baz="spam"')
377 self.assertEqual(str(h),'foo: bar; baz="spam"\r\n\r\n')
378
379 h.add_header('Foo','bar',cheese=None)
380 self.assertEqual(h.get_all('foo'),
381 ['bar; baz="spam"', 'bar; cheese'])
382
383 self.assertEqual(str(h),
384 'foo: bar; baz="spam"\r\n'
385 'Foo: bar; cheese\r\n'
386 '\r\n'
387 )
388
389
390class ErrorHandler(BaseCGIHandler):
391 """Simple handler subclass for testing BaseHandler"""
392
393 def __init__(self,**kw):
394 setup_testing_defaults(kw)
395 BaseCGIHandler.__init__(
396 self, StringIO(''), StringIO(), StringIO(), kw,
397 multithread=True, multiprocess=True
398 )
399
400class TestHandler(ErrorHandler):
401 """Simple handler subclass for testing BaseHandler, w/error passthru"""
402
403 def handle_error(self):
404 raise # for testing, we want to see what's happening
405
406
407
408
409
410
411
412
413
414
415
416class HandlerTests(TestCase):
417
418 def checkEnvironAttrs(self, handler):
419 env = handler.environ
420 for attr in [
421 'version','multithread','multiprocess','run_once','file_wrapper'
422 ]:
423 if attr=='file_wrapper' and handler.wsgi_file_wrapper is None:
424 continue
425 self.assertEqual(getattr(handler,'wsgi_'+attr),env['wsgi.'+attr])
426
427 def checkOSEnviron(self,handler):
428 empty = {}; setup_testing_defaults(empty)
429 env = handler.environ
430 from os import environ
431 for k,v in environ.items():
Guido van Rossume2b70bc2006-08-18 22:13:04 +0000432 if k not in empty:
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000433 self.assertEqual(env[k],v)
434 for k,v in empty.items():
Guido van Rossume2b70bc2006-08-18 22:13:04 +0000435 self.failUnless(k in env)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000436
437 def testEnviron(self):
438 h = TestHandler(X="Y")
439 h.setup_environ()
440 self.checkEnvironAttrs(h)
441 self.checkOSEnviron(h)
442 self.assertEqual(h.environ["X"],"Y")
443
444 def testCGIEnviron(self):
445 h = BaseCGIHandler(None,None,None,{})
446 h.setup_environ()
447 for key in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors':
Guido van Rossume2b70bc2006-08-18 22:13:04 +0000448 self.assert_(key in h.environ)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000449
450 def testScheme(self):
451 h=TestHandler(HTTPS="on"); h.setup_environ()
452 self.assertEqual(h.environ['wsgi.url_scheme'],'https')
453 h=TestHandler(); h.setup_environ()
454 self.assertEqual(h.environ['wsgi.url_scheme'],'http')
455
456
457 def testAbstractMethods(self):
458 h = BaseHandler()
459 for name in [
460 '_flush','get_stdin','get_stderr','add_cgi_vars'
461 ]:
462 self.assertRaises(NotImplementedError, getattr(h,name))
463 self.assertRaises(NotImplementedError, h._write, "test")
464
465
466 def testContentLength(self):
467 # Demo one reason iteration is better than write()... ;)
468
469 def trivial_app1(e,s):
470 s('200 OK',[])
471 return [e['wsgi.url_scheme']]
472
473 def trivial_app2(e,s):
474 s('200 OK',[])(e['wsgi.url_scheme'])
475 return []
476
477 h = TestHandler()
478 h.run(trivial_app1)
479 self.assertEqual(h.stdout.getvalue(),
480 "Status: 200 OK\r\n"
481 "Content-Length: 4\r\n"
482 "\r\n"
483 "http")
484
485 h = TestHandler()
486 h.run(trivial_app2)
487 self.assertEqual(h.stdout.getvalue(),
488 "Status: 200 OK\r\n"
489 "\r\n"
490 "http")
491
492
493
494
495
496
497
498 def testBasicErrorOutput(self):
499
500 def non_error_app(e,s):
501 s('200 OK',[])
502 return []
503
504 def error_app(e,s):
505 raise AssertionError("This should be caught by handler")
506
507 h = ErrorHandler()
508 h.run(non_error_app)
509 self.assertEqual(h.stdout.getvalue(),
510 "Status: 200 OK\r\n"
511 "Content-Length: 0\r\n"
512 "\r\n")
513 self.assertEqual(h.stderr.getvalue(),"")
514
515 h = ErrorHandler()
516 h.run(error_app)
517 self.assertEqual(h.stdout.getvalue(),
518 "Status: %s\r\n"
519 "Content-Type: text/plain\r\n"
520 "Content-Length: %d\r\n"
521 "\r\n%s" % (h.error_status,len(h.error_body),h.error_body))
522
Guido van Rossumb053cd82006-08-24 03:53:23 +0000523 self.failUnless("AssertionError" in h.stderr.getvalue())
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000524
525 def testErrorAfterOutput(self):
526 MSG = "Some output has been sent"
527 def error_app(e,s):
528 s("200 OK",[])(MSG)
529 raise AssertionError("This should be caught by handler")
530
531 h = ErrorHandler()
532 h.run(error_app)
533 self.assertEqual(h.stdout.getvalue(),
534 "Status: 200 OK\r\n"
535 "\r\n"+MSG)
Guido van Rossumb053cd82006-08-24 03:53:23 +0000536 self.failUnless("AssertionError" in h.stderr.getvalue())
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000537
538
539 def testHeaderFormats(self):
540
541 def non_error_app(e,s):
542 s('200 OK',[])
543 return []
544
545 stdpat = (
546 r"HTTP/%s 200 OK\r\n"
547 r"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n"
548 r"%s" r"Content-Length: 0\r\n" r"\r\n"
549 )
550 shortpat = (
551 "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
552 )
553
554 for ssw in "FooBar/1.0", None:
555 sw = ssw and "Server: %s\r\n" % ssw or ""
556
557 for version in "1.0", "1.1":
558 for proto in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1":
559
560 h = TestHandler(SERVER_PROTOCOL=proto)
561 h.origin_server = False
562 h.http_version = version
563 h.server_software = ssw
564 h.run(non_error_app)
565 self.assertEqual(shortpat,h.stdout.getvalue())
566
567 h = TestHandler(SERVER_PROTOCOL=proto)
568 h.origin_server = True
569 h.http_version = version
570 h.server_software = ssw
571 h.run(non_error_app)
572 if proto=="HTTP/0.9":
573 self.assertEqual(h.stdout.getvalue(),"")
574 else:
575 self.failUnless(
576 re.match(stdpat%(version,sw), h.stdout.getvalue()),
577 (stdpat%(version,sw), h.stdout.getvalue())
578 )
579
580# This epilogue is needed for compatibility with the Python 2.5 regrtest module
581
582def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000583 support.run_unittest(__name__)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000584
585if __name__ == "__main__":
586 test_main()
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616# the above lines intentionally left blank