blob: ac5ada3b99dc676e0ad0c7528bb3a5cd367d9a1e [file] [log] [blame]
Thomas Wouters0e3f5912006-08-11 14:57:12 +00001from __future__ import nested_scopes # Backward compat for 2.1
Guido van Rossumd8faa362007-04-27 19:54:29 +00002from unittest import TestCase
Thomas Wouters0e3f5912006-08-11 14:57:12 +00003from wsgiref.util import setup_testing_defaults
4from wsgiref.headers import Headers
5from wsgiref.handlers import BaseHandler, BaseCGIHandler
6from wsgiref import util
7from wsgiref.validate import validator
8from wsgiref.simple_server import WSGIServer, WSGIRequestHandler, demo_app
9from wsgiref.simple_server import make_server
Jeremy Hyltone6b59c52007-08-10 19:13:33 +000010from io import StringIO, BytesIO, BufferedReader
Alexandre Vassalottice261952008-05-12 02:31:37 +000011from socketserver import BaseServer
Thomas Wouters0e3f5912006-08-11 14:57:12 +000012import re, sys
13
Benjamin Petersonee8712c2008-05-20 21:35:26 +000014from test import support
Thomas Wouters0e3f5912006-08-11 14:57:12 +000015
16class MockServer(WSGIServer):
17 """Non-socket HTTP server"""
18
19 def __init__(self, server_address, RequestHandlerClass):
20 BaseServer.__init__(self, server_address, RequestHandlerClass)
21 self.server_bind()
22
23 def server_bind(self):
24 host, port = self.server_address
25 self.server_name = host
26 self.server_port = port
27 self.setup_environ()
28
29
30class MockHandler(WSGIRequestHandler):
31 """Non-socket HTTP handler"""
32 def setup(self):
33 self.connection = self.request
34 self.rfile, self.wfile = self.connection
35
36 def finish(self):
37 pass
38
39
40
41
42
43def hello_app(environ,start_response):
44 start_response("200 OK", [
45 ('Content-Type','text/plain'),
46 ('Date','Mon, 05 Jun 2006 18:49:54 GMT')
47 ])
48 return ["Hello, world!"]
49
Guido van Rossum6a10e022007-08-08 17:01:45 +000050def run_amock(app=hello_app, data=b"GET / HTTP/1.0\n\n"):
Thomas Wouters0e3f5912006-08-11 14:57:12 +000051 server = make_server("", 80, app, MockServer, MockHandler)
Jeremy Hyltone6b59c52007-08-10 19:13:33 +000052 inp = BufferedReader(BytesIO(data))
53 out = StringIO()
54 olderr = sys.stderr
55 err = sys.stderr = StringIO()
Thomas Wouters0e3f5912006-08-11 14:57:12 +000056
57 try:
Jeremy Hyltone6b59c52007-08-10 19:13:33 +000058 server.finish_request((inp, out), ("127.0.0.1",8888))
Thomas Wouters0e3f5912006-08-11 14:57:12 +000059 finally:
60 sys.stderr = olderr
61
62 return out.getvalue(), err.getvalue()
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86def compare_generic_iter(make_it,match):
87 """Utility to compare a generic 2.1/2.2+ iterator with an iterable
88
89 If running under Python 2.2+, this tests the iterator using iter()/next(),
90 as well as __getitem__. 'make_it' must be a function returning a fresh
91 iterator to be tested (since this may test the iterator twice)."""
92
93 it = make_it()
94 n = 0
95 for item in match:
96 if not it[n]==item: raise AssertionError
97 n+=1
98 try:
99 it[n]
100 except IndexError:
101 pass
102 else:
103 raise AssertionError("Too many items from __getitem__",it)
104
105 try:
106 iter, StopIteration
107 except NameError:
108 pass
109 else:
110 # Only test iter mode under 2.2+
111 it = make_it()
112 if not iter(it) is it: raise AssertionError
113 for item in match:
Georg Brandla18af4e2007-04-21 15:47:16 +0000114 if not next(it) == item: raise AssertionError
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000115 try:
Georg Brandla18af4e2007-04-21 15:47:16 +0000116 next(it)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000117 except StopIteration:
118 pass
119 else:
Georg Brandla18af4e2007-04-21 15:47:16 +0000120 raise AssertionError("Too many items from .__next__()", it)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000121
122
123
124
125
126
127class IntegrationTests(TestCase):
128
129 def check_hello(self, out, has_length=True):
130 self.assertEqual(out,
131 "HTTP/1.0 200 OK\r\n"
132 "Server: WSGIServer/0.1 Python/"+sys.version.split()[0]+"\r\n"
133 "Content-Type: text/plain\r\n"
134 "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
135 (has_length and "Content-Length: 13\r\n" or "") +
136 "\r\n"
137 "Hello, world!"
138 )
139
140 def test_plain_hello(self):
141 out, err = run_amock()
142 self.check_hello(out)
143
144 def test_validated_hello(self):
145 out, err = run_amock(validator(hello_app))
146 # the middleware doesn't support len(), so content-length isn't there
147 self.check_hello(out, has_length=False)
148
149 def test_simple_validation_error(self):
150 def bad_app(environ,start_response):
151 start_response("200 OK", ('Content-Type','text/plain'))
152 return ["Hello, world!"]
153 out, err = run_amock(validator(bad_app))
154 self.failUnless(out.endswith(
155 "A server error occurred. Please contact the administrator."
156 ))
157 self.assertEqual(
158 err.splitlines()[-2],
159 "AssertionError: Headers (('Content-Type', 'text/plain')) must"
Martin v. Löwis250ad612008-04-07 05:43:42 +0000160 " be of type list: <class 'tuple'>"
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000161 )
162
163
164
165
166
167
168class UtilityTests(TestCase):
169
170 def checkShift(self,sn_in,pi_in,part,sn_out,pi_out):
171 env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in}
172 util.setup_testing_defaults(env)
173 self.assertEqual(util.shift_path_info(env),part)
174 self.assertEqual(env['PATH_INFO'],pi_out)
175 self.assertEqual(env['SCRIPT_NAME'],sn_out)
176 return env
177
178 def checkDefault(self, key, value, alt=None):
179 # Check defaulting when empty
180 env = {}
181 util.setup_testing_defaults(env)
182 if isinstance(value,StringIO):
183 self.failUnless(isinstance(env[key],StringIO))
184 else:
185 self.assertEqual(env[key],value)
186
187 # Check existing value
188 env = {key:alt}
189 util.setup_testing_defaults(env)
190 self.failUnless(env[key] is alt)
191
192 def checkCrossDefault(self,key,value,**kw):
193 util.setup_testing_defaults(kw)
194 self.assertEqual(kw[key],value)
195
196 def checkAppURI(self,uri,**kw):
197 util.setup_testing_defaults(kw)
198 self.assertEqual(util.application_uri(kw),uri)
199
200 def checkReqURI(self,uri,query=1,**kw):
201 util.setup_testing_defaults(kw)
202 self.assertEqual(util.request_uri(kw,query),uri)
203
204
205
206
207
208
209 def checkFW(self,text,size,match):
210
211 def make_it(text=text,size=size):
212 return util.FileWrapper(StringIO(text),size)
213
214 compare_generic_iter(make_it,match)
215
216 it = make_it()
217 self.failIf(it.filelike.closed)
218
219 for item in it:
220 pass
221
222 self.failIf(it.filelike.closed)
223
224 it.close()
225 self.failUnless(it.filelike.closed)
226
227
228 def testSimpleShifts(self):
229 self.checkShift('','/', '', '/', '')
230 self.checkShift('','/x', 'x', '/x', '')
231 self.checkShift('/','', None, '/', '')
232 self.checkShift('/a','/x/y', 'x', '/a/x', '/y')
233 self.checkShift('/a','/x/', 'x', '/a/x', '/')
234
235
236 def testNormalizedShifts(self):
237 self.checkShift('/a/b', '/../y', '..', '/a', '/y')
238 self.checkShift('', '/../y', '..', '', '/y')
239 self.checkShift('/a/b', '//y', 'y', '/a/b/y', '')
240 self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/')
241 self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '')
242 self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/')
243 self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/')
244 self.checkShift('/a/b', '///', '', '/a/b/', '')
245 self.checkShift('/a/b', '/.//', '', '/a/b/', '')
246 self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
247 self.checkShift('/a/b', '/.', None, '/a/b', '')
248
249
250 def testDefaults(self):
251 for key, value in [
252 ('SERVER_NAME','127.0.0.1'),
253 ('SERVER_PORT', '80'),
254 ('SERVER_PROTOCOL','HTTP/1.0'),
255 ('HTTP_HOST','127.0.0.1'),
256 ('REQUEST_METHOD','GET'),
257 ('SCRIPT_NAME',''),
258 ('PATH_INFO','/'),
259 ('wsgi.version', (1,0)),
260 ('wsgi.run_once', 0),
261 ('wsgi.multithread', 0),
262 ('wsgi.multiprocess', 0),
263 ('wsgi.input', StringIO("")),
264 ('wsgi.errors', StringIO()),
265 ('wsgi.url_scheme','http'),
266 ]:
267 self.checkDefault(key,value)
268
269
270 def testCrossDefaults(self):
271 self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar")
272 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on")
273 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1")
274 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes")
275 self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo")
276 self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo")
277 self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on")
278
279
280 def testGuessScheme(self):
281 self.assertEqual(util.guess_scheme({}), "http")
282 self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http")
283 self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https")
284 self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https")
285 self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https")
286
287
288
289
290
291 def testAppURIs(self):
292 self.checkAppURI("http://127.0.0.1/")
293 self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
294 self.checkAppURI("http://spam.example.com:2071/",
295 HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071")
296 self.checkAppURI("http://spam.example.com/",
297 SERVER_NAME="spam.example.com")
298 self.checkAppURI("http://127.0.0.1/",
299 HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com")
300 self.checkAppURI("https://127.0.0.1/", HTTPS="on")
301 self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000",
302 HTTP_HOST=None)
303
304 def testReqURIs(self):
305 self.checkReqURI("http://127.0.0.1/")
306 self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
307 self.checkReqURI("http://127.0.0.1/spammity/spam",
308 SCRIPT_NAME="/spammity", PATH_INFO="/spam")
309 self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
310 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
311 self.checkReqURI("http://127.0.0.1/spammity/spam", 0,
312 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
313
314 def testFileWrapper(self):
315 self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10])
316
317 def testHopByHop(self):
318 for hop in (
319 "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization "
320 "TE Trailers Transfer-Encoding Upgrade"
321 ).split():
322 for alt in hop, hop.title(), hop.upper(), hop.lower():
323 self.failUnless(util.is_hop_by_hop(alt))
324
325 # Not comprehensive, just a few random header names
326 for hop in (
327 "Accept Cache-Control Date Pragma Trailer Via Warning"
328 ).split():
329 for alt in hop, hop.title(), hop.upper(), hop.lower():
330 self.failIf(util.is_hop_by_hop(alt))
331
332class HeaderTests(TestCase):
333
334 def testMappingInterface(self):
335 test = [('x','y')]
336 self.assertEqual(len(Headers([])),0)
337 self.assertEqual(len(Headers(test[:])),1)
338 self.assertEqual(Headers(test[:]).keys(), ['x'])
339 self.assertEqual(Headers(test[:]).values(), ['y'])
340 self.assertEqual(Headers(test[:]).items(), test)
341 self.failIf(Headers(test).items() is test) # must be copy!
342
343 h=Headers([])
344 del h['foo'] # should not raise an error
345
346 h['Foo'] = 'bar'
Guido van Rossume2b70bc2006-08-18 22:13:04 +0000347 for m in h.__contains__, h.get, h.get_all, h.__getitem__:
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000348 self.failUnless(m('foo'))
349 self.failUnless(m('Foo'))
350 self.failUnless(m('FOO'))
351 self.failIf(m('bar'))
352
353 self.assertEqual(h['foo'],'bar')
354 h['foo'] = 'baz'
355 self.assertEqual(h['FOO'],'baz')
356 self.assertEqual(h.get_all('foo'),['baz'])
357
358 self.assertEqual(h.get("foo","whee"), "baz")
359 self.assertEqual(h.get("zoo","whee"), "whee")
360 self.assertEqual(h.setdefault("foo","whee"), "baz")
361 self.assertEqual(h.setdefault("zoo","whee"), "whee")
362 self.assertEqual(h["foo"],"baz")
363 self.assertEqual(h["zoo"],"whee")
364
365 def testRequireList(self):
366 self.assertRaises(TypeError, Headers, "foo")
367
368
369 def testExtras(self):
370 h = Headers([])
371 self.assertEqual(str(h),'\r\n')
372
373 h.add_header('foo','bar',baz="spam")
374 self.assertEqual(h['foo'], 'bar; baz="spam"')
375 self.assertEqual(str(h),'foo: bar; baz="spam"\r\n\r\n')
376
377 h.add_header('Foo','bar',cheese=None)
378 self.assertEqual(h.get_all('foo'),
379 ['bar; baz="spam"', 'bar; cheese'])
380
381 self.assertEqual(str(h),
382 'foo: bar; baz="spam"\r\n'
383 'Foo: bar; cheese\r\n'
384 '\r\n'
385 )
386
387
388class ErrorHandler(BaseCGIHandler):
389 """Simple handler subclass for testing BaseHandler"""
390
391 def __init__(self,**kw):
392 setup_testing_defaults(kw)
393 BaseCGIHandler.__init__(
394 self, StringIO(''), StringIO(), StringIO(), kw,
395 multithread=True, multiprocess=True
396 )
397
398class TestHandler(ErrorHandler):
399 """Simple handler subclass for testing BaseHandler, w/error passthru"""
400
401 def handle_error(self):
402 raise # for testing, we want to see what's happening
403
404
405
406
407
408
409
410
411
412
413
414class HandlerTests(TestCase):
415
416 def checkEnvironAttrs(self, handler):
417 env = handler.environ
418 for attr in [
419 'version','multithread','multiprocess','run_once','file_wrapper'
420 ]:
421 if attr=='file_wrapper' and handler.wsgi_file_wrapper is None:
422 continue
423 self.assertEqual(getattr(handler,'wsgi_'+attr),env['wsgi.'+attr])
424
425 def checkOSEnviron(self,handler):
426 empty = {}; setup_testing_defaults(empty)
427 env = handler.environ
428 from os import environ
429 for k,v in environ.items():
Guido van Rossume2b70bc2006-08-18 22:13:04 +0000430 if k not in empty:
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000431 self.assertEqual(env[k],v)
432 for k,v in empty.items():
Guido van Rossume2b70bc2006-08-18 22:13:04 +0000433 self.failUnless(k in env)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000434
435 def testEnviron(self):
436 h = TestHandler(X="Y")
437 h.setup_environ()
438 self.checkEnvironAttrs(h)
439 self.checkOSEnviron(h)
440 self.assertEqual(h.environ["X"],"Y")
441
442 def testCGIEnviron(self):
443 h = BaseCGIHandler(None,None,None,{})
444 h.setup_environ()
445 for key in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors':
Guido van Rossume2b70bc2006-08-18 22:13:04 +0000446 self.assert_(key in h.environ)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000447
448 def testScheme(self):
449 h=TestHandler(HTTPS="on"); h.setup_environ()
450 self.assertEqual(h.environ['wsgi.url_scheme'],'https')
451 h=TestHandler(); h.setup_environ()
452 self.assertEqual(h.environ['wsgi.url_scheme'],'http')
453
454
455 def testAbstractMethods(self):
456 h = BaseHandler()
457 for name in [
458 '_flush','get_stdin','get_stderr','add_cgi_vars'
459 ]:
460 self.assertRaises(NotImplementedError, getattr(h,name))
461 self.assertRaises(NotImplementedError, h._write, "test")
462
463
464 def testContentLength(self):
465 # Demo one reason iteration is better than write()... ;)
466
467 def trivial_app1(e,s):
468 s('200 OK',[])
469 return [e['wsgi.url_scheme']]
470
471 def trivial_app2(e,s):
472 s('200 OK',[])(e['wsgi.url_scheme'])
473 return []
474
475 h = TestHandler()
476 h.run(trivial_app1)
477 self.assertEqual(h.stdout.getvalue(),
478 "Status: 200 OK\r\n"
479 "Content-Length: 4\r\n"
480 "\r\n"
481 "http")
482
483 h = TestHandler()
484 h.run(trivial_app2)
485 self.assertEqual(h.stdout.getvalue(),
486 "Status: 200 OK\r\n"
487 "\r\n"
488 "http")
489
490
491
492
493
494
495
496 def testBasicErrorOutput(self):
497
498 def non_error_app(e,s):
499 s('200 OK',[])
500 return []
501
502 def error_app(e,s):
503 raise AssertionError("This should be caught by handler")
504
505 h = ErrorHandler()
506 h.run(non_error_app)
507 self.assertEqual(h.stdout.getvalue(),
508 "Status: 200 OK\r\n"
509 "Content-Length: 0\r\n"
510 "\r\n")
511 self.assertEqual(h.stderr.getvalue(),"")
512
513 h = ErrorHandler()
514 h.run(error_app)
515 self.assertEqual(h.stdout.getvalue(),
516 "Status: %s\r\n"
517 "Content-Type: text/plain\r\n"
518 "Content-Length: %d\r\n"
519 "\r\n%s" % (h.error_status,len(h.error_body),h.error_body))
520
Guido van Rossumb053cd82006-08-24 03:53:23 +0000521 self.failUnless("AssertionError" in h.stderr.getvalue())
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000522
523 def testErrorAfterOutput(self):
524 MSG = "Some output has been sent"
525 def error_app(e,s):
526 s("200 OK",[])(MSG)
527 raise AssertionError("This should be caught by handler")
528
529 h = ErrorHandler()
530 h.run(error_app)
531 self.assertEqual(h.stdout.getvalue(),
532 "Status: 200 OK\r\n"
533 "\r\n"+MSG)
Guido van Rossumb053cd82006-08-24 03:53:23 +0000534 self.failUnless("AssertionError" in h.stderr.getvalue())
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000535
536
537 def testHeaderFormats(self):
538
539 def non_error_app(e,s):
540 s('200 OK',[])
541 return []
542
543 stdpat = (
544 r"HTTP/%s 200 OK\r\n"
545 r"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n"
546 r"%s" r"Content-Length: 0\r\n" r"\r\n"
547 )
548 shortpat = (
549 "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
550 )
551
552 for ssw in "FooBar/1.0", None:
553 sw = ssw and "Server: %s\r\n" % ssw or ""
554
555 for version in "1.0", "1.1":
556 for proto in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1":
557
558 h = TestHandler(SERVER_PROTOCOL=proto)
559 h.origin_server = False
560 h.http_version = version
561 h.server_software = ssw
562 h.run(non_error_app)
563 self.assertEqual(shortpat,h.stdout.getvalue())
564
565 h = TestHandler(SERVER_PROTOCOL=proto)
566 h.origin_server = True
567 h.http_version = version
568 h.server_software = ssw
569 h.run(non_error_app)
570 if proto=="HTTP/0.9":
571 self.assertEqual(h.stdout.getvalue(),"")
572 else:
573 self.failUnless(
574 re.match(stdpat%(version,sw), h.stdout.getvalue()),
575 (stdpat%(version,sw), h.stdout.getvalue())
576 )
577
578# This epilogue is needed for compatibility with the Python 2.5 regrtest module
579
580def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000581 support.run_unittest(__name__)
Thomas Wouters0e3f5912006-08-11 14:57:12 +0000582
583if __name__ == "__main__":
584 test_main()
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614# the above lines intentionally left blank