blob: a3bc99fcf15d762c59da2f94df72da92262d3b74 [file] [log] [blame]
Phillip J. Eby5cf565d2006-06-09 16:40:18 +00001from __future__ import nested_scopes # Backward compat for 2.1
Collin Winterc2898c52007-04-25 17:29:52 +00002from unittest import TestCase
Phillip J. Eby5cf565d2006-06-09 16:40:18 +00003from wsgiref.util import setup_testing_defaults
4from wsgiref.headers import Headers
5from wsgiref.handlers import BaseHandler, BaseCGIHandler
6from wsgiref import util
7from wsgiref.validate import validator
8from wsgiref.simple_server import WSGIServer, WSGIRequestHandler, demo_app
9from wsgiref.simple_server import make_server
10from StringIO import StringIO
Georg Brandle152a772008-05-24 18:31:28 +000011from SocketServer import BaseServer
Antoine Pitroub3c169b2009-11-03 16:41:20 +000012import os
13import re
14import sys
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000015
Collin Winterc2898c52007-04-25 17:29:52 +000016from test import test_support
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000017
18class MockServer(WSGIServer):
19 """Non-socket HTTP server"""
20
21 def __init__(self, server_address, RequestHandlerClass):
22 BaseServer.__init__(self, server_address, RequestHandlerClass)
23 self.server_bind()
24
25 def server_bind(self):
26 host, port = self.server_address
27 self.server_name = host
28 self.server_port = port
29 self.setup_environ()
30
31
32class MockHandler(WSGIRequestHandler):
33 """Non-socket HTTP handler"""
34 def setup(self):
35 self.connection = self.request
36 self.rfile, self.wfile = self.connection
37
38 def finish(self):
39 pass
40
41
42
43
44
45def hello_app(environ,start_response):
46 start_response("200 OK", [
47 ('Content-Type','text/plain'),
48 ('Date','Mon, 05 Jun 2006 18:49:54 GMT')
49 ])
50 return ["Hello, world!"]
51
52def run_amock(app=hello_app, data="GET / HTTP/1.0\n\n"):
53 server = make_server("", 80, app, MockServer, MockHandler)
54 inp, out, err, olderr = StringIO(data), StringIO(), StringIO(), sys.stderr
55 sys.stderr = err
56
57 try:
58 server.finish_request((inp,out), ("127.0.0.1",8888))
59 finally:
60 sys.stderr = olderr
61
62 return out.getvalue(), err.getvalue()
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
Phillip J. Eby403019b2006-06-12 04:04:32 +000086def compare_generic_iter(make_it,match):
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000087 """Utility to compare a generic 2.1/2.2+ iterator with an iterable
88
89 If running under Python 2.2+, this tests the iterator using iter()/next(),
90 as well as __getitem__. 'make_it' must be a function returning a fresh
91 iterator to be tested (since this may test the iterator twice)."""
92
93 it = make_it()
94 n = 0
95 for item in match:
Phillip J. Eby403019b2006-06-12 04:04:32 +000096 if not it[n]==item: raise AssertionError
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000097 n+=1
98 try:
99 it[n]
100 except IndexError:
101 pass
102 else:
103 raise AssertionError("Too many items from __getitem__",it)
104
105 try:
106 iter, StopIteration
107 except NameError:
108 pass
109 else:
110 # Only test iter mode under 2.2+
111 it = make_it()
Phillip J. Eby403019b2006-06-12 04:04:32 +0000112 if not iter(it) is it: raise AssertionError
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000113 for item in match:
Phillip J. Eby403019b2006-06-12 04:04:32 +0000114 if not it.next()==item: raise AssertionError
115 try:
116 it.next()
117 except StopIteration:
118 pass
119 else:
120 raise AssertionError("Too many items from .next()",it)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000121
122
123
124
125
126
127class IntegrationTests(TestCase):
128
129 def check_hello(self, out, has_length=True):
130 self.assertEqual(out,
131 "HTTP/1.0 200 OK\r\n"
132 "Server: WSGIServer/0.1 Python/"+sys.version.split()[0]+"\r\n"
133 "Content-Type: text/plain\r\n"
134 "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
135 (has_length and "Content-Length: 13\r\n" or "") +
136 "\r\n"
137 "Hello, world!"
138 )
139
140 def test_plain_hello(self):
141 out, err = run_amock()
142 self.check_hello(out)
143
144 def test_validated_hello(self):
145 out, err = run_amock(validator(hello_app))
146 # the middleware doesn't support len(), so content-length isn't there
147 self.check_hello(out, has_length=False)
148
149 def test_simple_validation_error(self):
150 def bad_app(environ,start_response):
151 start_response("200 OK", ('Content-Type','text/plain'))
152 return ["Hello, world!"]
153 out, err = run_amock(validator(bad_app))
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000154 self.assertTrue(out.endswith(
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000155 "A server error occurred. Please contact the administrator."
156 ))
157 self.assertEqual(
158 err.splitlines()[-2],
159 "AssertionError: Headers (('Content-Type', 'text/plain')) must"
160 " be of type list: <type 'tuple'>"
161 )
162
163
164
165
166
167
168class UtilityTests(TestCase):
169
170 def checkShift(self,sn_in,pi_in,part,sn_out,pi_out):
171 env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in}
172 util.setup_testing_defaults(env)
173 self.assertEqual(util.shift_path_info(env),part)
174 self.assertEqual(env['PATH_INFO'],pi_out)
175 self.assertEqual(env['SCRIPT_NAME'],sn_out)
176 return env
177
178 def checkDefault(self, key, value, alt=None):
179 # Check defaulting when empty
180 env = {}
181 util.setup_testing_defaults(env)
182 if isinstance(value,StringIO):
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000183 self.assertTrue(isinstance(env[key],StringIO))
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000184 else:
185 self.assertEqual(env[key],value)
186
187 # Check existing value
188 env = {key:alt}
189 util.setup_testing_defaults(env)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000190 self.assertTrue(env[key] is alt)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000191
192 def checkCrossDefault(self,key,value,**kw):
193 util.setup_testing_defaults(kw)
194 self.assertEqual(kw[key],value)
195
196 def checkAppURI(self,uri,**kw):
197 util.setup_testing_defaults(kw)
198 self.assertEqual(util.application_uri(kw),uri)
199
200 def checkReqURI(self,uri,query=1,**kw):
201 util.setup_testing_defaults(kw)
202 self.assertEqual(util.request_uri(kw,query),uri)
203
204
205
206
207
208
209 def checkFW(self,text,size,match):
210
211 def make_it(text=text,size=size):
212 return util.FileWrapper(StringIO(text),size)
213
Phillip J. Eby403019b2006-06-12 04:04:32 +0000214 compare_generic_iter(make_it,match)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000215
216 it = make_it()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000217 self.assertFalse(it.filelike.closed)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000218
219 for item in it:
220 pass
221
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000222 self.assertFalse(it.filelike.closed)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000223
224 it.close()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000225 self.assertTrue(it.filelike.closed)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000226
227
228 def testSimpleShifts(self):
229 self.checkShift('','/', '', '/', '')
230 self.checkShift('','/x', 'x', '/x', '')
231 self.checkShift('/','', None, '/', '')
232 self.checkShift('/a','/x/y', 'x', '/a/x', '/y')
233 self.checkShift('/a','/x/', 'x', '/a/x', '/')
234
235
236 def testNormalizedShifts(self):
237 self.checkShift('/a/b', '/../y', '..', '/a', '/y')
238 self.checkShift('', '/../y', '..', '', '/y')
239 self.checkShift('/a/b', '//y', 'y', '/a/b/y', '')
240 self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/')
241 self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '')
242 self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/')
243 self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/')
244 self.checkShift('/a/b', '///', '', '/a/b/', '')
245 self.checkShift('/a/b', '/.//', '', '/a/b/', '')
246 self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
247 self.checkShift('/a/b', '/.', None, '/a/b', '')
248
249
250 def testDefaults(self):
251 for key, value in [
252 ('SERVER_NAME','127.0.0.1'),
253 ('SERVER_PORT', '80'),
254 ('SERVER_PROTOCOL','HTTP/1.0'),
255 ('HTTP_HOST','127.0.0.1'),
256 ('REQUEST_METHOD','GET'),
257 ('SCRIPT_NAME',''),
258 ('PATH_INFO','/'),
259 ('wsgi.version', (1,0)),
260 ('wsgi.run_once', 0),
261 ('wsgi.multithread', 0),
262 ('wsgi.multiprocess', 0),
263 ('wsgi.input', StringIO("")),
264 ('wsgi.errors', StringIO()),
265 ('wsgi.url_scheme','http'),
266 ]:
267 self.checkDefault(key,value)
268
269
270 def testCrossDefaults(self):
271 self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar")
272 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on")
273 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1")
274 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes")
275 self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo")
276 self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo")
277 self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on")
278
279
280 def testGuessScheme(self):
281 self.assertEqual(util.guess_scheme({}), "http")
282 self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http")
283 self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https")
284 self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https")
285 self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https")
286
287
288
289
290
291 def testAppURIs(self):
292 self.checkAppURI("http://127.0.0.1/")
293 self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
294 self.checkAppURI("http://spam.example.com:2071/",
295 HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071")
296 self.checkAppURI("http://spam.example.com/",
297 SERVER_NAME="spam.example.com")
298 self.checkAppURI("http://127.0.0.1/",
299 HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com")
300 self.checkAppURI("https://127.0.0.1/", HTTPS="on")
301 self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000",
302 HTTP_HOST=None)
303
304 def testReqURIs(self):
305 self.checkReqURI("http://127.0.0.1/")
306 self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
307 self.checkReqURI("http://127.0.0.1/spammity/spam",
308 SCRIPT_NAME="/spammity", PATH_INFO="/spam")
309 self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
310 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
311 self.checkReqURI("http://127.0.0.1/spammity/spam", 0,
312 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
313
314 def testFileWrapper(self):
315 self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10])
316
317 def testHopByHop(self):
318 for hop in (
319 "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization "
320 "TE Trailers Transfer-Encoding Upgrade"
321 ).split():
322 for alt in hop, hop.title(), hop.upper(), hop.lower():
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000323 self.assertTrue(util.is_hop_by_hop(alt))
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000324
325 # Not comprehensive, just a few random header names
326 for hop in (
327 "Accept Cache-Control Date Pragma Trailer Via Warning"
328 ).split():
329 for alt in hop, hop.title(), hop.upper(), hop.lower():
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000330 self.assertFalse(util.is_hop_by_hop(alt))
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000331
332class HeaderTests(TestCase):
333
334 def testMappingInterface(self):
335 test = [('x','y')]
336 self.assertEqual(len(Headers([])),0)
337 self.assertEqual(len(Headers(test[:])),1)
338 self.assertEqual(Headers(test[:]).keys(), ['x'])
339 self.assertEqual(Headers(test[:]).values(), ['y'])
340 self.assertEqual(Headers(test[:]).items(), test)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000341 self.assertFalse(Headers(test).items() is test) # must be copy!
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000342
343 h=Headers([])
344 del h['foo'] # should not raise an error
345
346 h['Foo'] = 'bar'
347 for m in h.has_key, h.__contains__, h.get, h.get_all, h.__getitem__:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000348 self.assertTrue(m('foo'))
349 self.assertTrue(m('Foo'))
350 self.assertTrue(m('FOO'))
351 self.assertFalse(m('bar'))
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000352
353 self.assertEqual(h['foo'],'bar')
354 h['foo'] = 'baz'
355 self.assertEqual(h['FOO'],'baz')
356 self.assertEqual(h.get_all('foo'),['baz'])
357
358 self.assertEqual(h.get("foo","whee"), "baz")
359 self.assertEqual(h.get("zoo","whee"), "whee")
360 self.assertEqual(h.setdefault("foo","whee"), "baz")
361 self.assertEqual(h.setdefault("zoo","whee"), "whee")
362 self.assertEqual(h["foo"],"baz")
363 self.assertEqual(h["zoo"],"whee")
364
365 def testRequireList(self):
366 self.assertRaises(TypeError, Headers, "foo")
367
368
369 def testExtras(self):
370 h = Headers([])
371 self.assertEqual(str(h),'\r\n')
372
373 h.add_header('foo','bar',baz="spam")
374 self.assertEqual(h['foo'], 'bar; baz="spam"')
375 self.assertEqual(str(h),'foo: bar; baz="spam"\r\n\r\n')
376
377 h.add_header('Foo','bar',cheese=None)
378 self.assertEqual(h.get_all('foo'),
379 ['bar; baz="spam"', 'bar; cheese'])
380
381 self.assertEqual(str(h),
382 'foo: bar; baz="spam"\r\n'
383 'Foo: bar; cheese\r\n'
384 '\r\n'
385 )
386
387
388class ErrorHandler(BaseCGIHandler):
389 """Simple handler subclass for testing BaseHandler"""
390
Antoine Pitroub3c169b2009-11-03 16:41:20 +0000391 # BaseHandler records the OS environment at import time, but envvars
392 # might have been changed later by other tests, which trips up
393 # HandlerTests.testEnviron().
394 os_environ = dict(os.environ.items())
395
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000396 def __init__(self,**kw):
397 setup_testing_defaults(kw)
398 BaseCGIHandler.__init__(
399 self, StringIO(''), StringIO(), StringIO(), kw,
400 multithread=True, multiprocess=True
401 )
402
403class TestHandler(ErrorHandler):
404 """Simple handler subclass for testing BaseHandler, w/error passthru"""
405
406 def handle_error(self):
407 raise # for testing, we want to see what's happening
408
409
410
411
412
413
414
415
416
417
418
419class HandlerTests(TestCase):
420
421 def checkEnvironAttrs(self, handler):
422 env = handler.environ
423 for attr in [
424 'version','multithread','multiprocess','run_once','file_wrapper'
425 ]:
426 if attr=='file_wrapper' and handler.wsgi_file_wrapper is None:
427 continue
428 self.assertEqual(getattr(handler,'wsgi_'+attr),env['wsgi.'+attr])
429
430 def checkOSEnviron(self,handler):
431 empty = {}; setup_testing_defaults(empty)
432 env = handler.environ
433 from os import environ
434 for k,v in environ.items():
435 if not empty.has_key(k):
436 self.assertEqual(env[k],v)
437 for k,v in empty.items():
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000438 self.assertTrue(env.has_key(k))
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000439
440 def testEnviron(self):
441 h = TestHandler(X="Y")
442 h.setup_environ()
443 self.checkEnvironAttrs(h)
444 self.checkOSEnviron(h)
445 self.assertEqual(h.environ["X"],"Y")
446
447 def testCGIEnviron(self):
448 h = BaseCGIHandler(None,None,None,{})
449 h.setup_environ()
450 for key in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors':
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000451 self.assertTrue(h.environ.has_key(key))
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000452
453 def testScheme(self):
454 h=TestHandler(HTTPS="on"); h.setup_environ()
455 self.assertEqual(h.environ['wsgi.url_scheme'],'https')
456 h=TestHandler(); h.setup_environ()
457 self.assertEqual(h.environ['wsgi.url_scheme'],'http')
458
459
460 def testAbstractMethods(self):
461 h = BaseHandler()
462 for name in [
463 '_flush','get_stdin','get_stderr','add_cgi_vars'
464 ]:
465 self.assertRaises(NotImplementedError, getattr(h,name))
466 self.assertRaises(NotImplementedError, h._write, "test")
467
468
469 def testContentLength(self):
470 # Demo one reason iteration is better than write()... ;)
471
472 def trivial_app1(e,s):
473 s('200 OK',[])
474 return [e['wsgi.url_scheme']]
475
476 def trivial_app2(e,s):
477 s('200 OK',[])(e['wsgi.url_scheme'])
478 return []
479
480 h = TestHandler()
481 h.run(trivial_app1)
482 self.assertEqual(h.stdout.getvalue(),
483 "Status: 200 OK\r\n"
484 "Content-Length: 4\r\n"
485 "\r\n"
486 "http")
487
488 h = TestHandler()
489 h.run(trivial_app2)
490 self.assertEqual(h.stdout.getvalue(),
491 "Status: 200 OK\r\n"
492 "\r\n"
493 "http")
494
495
496
497
498
499
500
501 def testBasicErrorOutput(self):
502
503 def non_error_app(e,s):
504 s('200 OK',[])
505 return []
506
507 def error_app(e,s):
508 raise AssertionError("This should be caught by handler")
509
510 h = ErrorHandler()
511 h.run(non_error_app)
512 self.assertEqual(h.stdout.getvalue(),
513 "Status: 200 OK\r\n"
514 "Content-Length: 0\r\n"
515 "\r\n")
516 self.assertEqual(h.stderr.getvalue(),"")
517
518 h = ErrorHandler()
519 h.run(error_app)
520 self.assertEqual(h.stdout.getvalue(),
521 "Status: %s\r\n"
522 "Content-Type: text/plain\r\n"
523 "Content-Length: %d\r\n"
524 "\r\n%s" % (h.error_status,len(h.error_body),h.error_body))
525
Antoine Pitrouaf45b112010-01-04 23:28:16 +0000526 self.assertNotEqual(h.stderr.getvalue().find("AssertionError"), -1)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000527
528 def testErrorAfterOutput(self):
529 MSG = "Some output has been sent"
530 def error_app(e,s):
531 s("200 OK",[])(MSG)
532 raise AssertionError("This should be caught by handler")
533
534 h = ErrorHandler()
535 h.run(error_app)
536 self.assertEqual(h.stdout.getvalue(),
537 "Status: 200 OK\r\n"
538 "\r\n"+MSG)
Antoine Pitrouaf45b112010-01-04 23:28:16 +0000539 self.assertNotEqual(h.stderr.getvalue().find("AssertionError"), -1)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000540
541
542 def testHeaderFormats(self):
543
544 def non_error_app(e,s):
545 s('200 OK',[])
546 return []
547
548 stdpat = (
549 r"HTTP/%s 200 OK\r\n"
550 r"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n"
551 r"%s" r"Content-Length: 0\r\n" r"\r\n"
552 )
553 shortpat = (
554 "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
555 )
556
557 for ssw in "FooBar/1.0", None:
558 sw = ssw and "Server: %s\r\n" % ssw or ""
559
560 for version in "1.0", "1.1":
561 for proto in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1":
562
563 h = TestHandler(SERVER_PROTOCOL=proto)
564 h.origin_server = False
565 h.http_version = version
566 h.server_software = ssw
567 h.run(non_error_app)
568 self.assertEqual(shortpat,h.stdout.getvalue())
569
570 h = TestHandler(SERVER_PROTOCOL=proto)
571 h.origin_server = True
572 h.http_version = version
573 h.server_software = ssw
574 h.run(non_error_app)
575 if proto=="HTTP/0.9":
576 self.assertEqual(h.stdout.getvalue(),"")
577 else:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000578 self.assertTrue(
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000579 re.match(stdpat%(version,sw), h.stdout.getvalue()),
580 (stdpat%(version,sw), h.stdout.getvalue())
581 )
582
583# This epilogue is needed for compatibility with the Python 2.5 regrtest module
584
585def test_main():
Collin Winterc2898c52007-04-25 17:29:52 +0000586 test_support.run_unittest(__name__)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000587
588if __name__ == "__main__":
589 test_main()
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619# the above lines intentionally left blank