blob: db821398439bbe41b802c44ce9b5e3d9390f45bc [file] [log] [blame]
Phillip J. Eby5cf565d2006-06-09 16:40:18 +00001from __future__ import nested_scopes # Backward compat for 2.1
Collin Winterc2898c52007-04-25 17:29:52 +00002from unittest import TestCase
Phillip J. Eby5cf565d2006-06-09 16:40:18 +00003from wsgiref.util import setup_testing_defaults
4from wsgiref.headers import Headers
5from wsgiref.handlers import BaseHandler, BaseCGIHandler
6from wsgiref import util
7from wsgiref.validate import validator
8from wsgiref.simple_server import WSGIServer, WSGIRequestHandler, demo_app
9from wsgiref.simple_server import make_server
10from StringIO import StringIO
Georg Brandle152a772008-05-24 18:31:28 +000011from SocketServer import BaseServer
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000012import re, sys
13
Collin Winterc2898c52007-04-25 17:29:52 +000014from test import test_support
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000015
16class MockServer(WSGIServer):
17 """Non-socket HTTP server"""
18
19 def __init__(self, server_address, RequestHandlerClass):
20 BaseServer.__init__(self, server_address, RequestHandlerClass)
21 self.server_bind()
22
23 def server_bind(self):
24 host, port = self.server_address
25 self.server_name = host
26 self.server_port = port
27 self.setup_environ()
28
29
30class MockHandler(WSGIRequestHandler):
31 """Non-socket HTTP handler"""
32 def setup(self):
33 self.connection = self.request
34 self.rfile, self.wfile = self.connection
35
36 def finish(self):
37 pass
38
39
40
41
42
43def hello_app(environ,start_response):
44 start_response("200 OK", [
45 ('Content-Type','text/plain'),
46 ('Date','Mon, 05 Jun 2006 18:49:54 GMT')
47 ])
48 return ["Hello, world!"]
49
50def run_amock(app=hello_app, data="GET / HTTP/1.0\n\n"):
51 server = make_server("", 80, app, MockServer, MockHandler)
52 inp, out, err, olderr = StringIO(data), StringIO(), StringIO(), sys.stderr
53 sys.stderr = err
54
55 try:
56 server.finish_request((inp,out), ("127.0.0.1",8888))
57 finally:
58 sys.stderr = olderr
59
60 return out.getvalue(), err.getvalue()
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
Phillip J. Eby403019b2006-06-12 04:04:32 +000084def compare_generic_iter(make_it,match):
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000085 """Utility to compare a generic 2.1/2.2+ iterator with an iterable
86
87 If running under Python 2.2+, this tests the iterator using iter()/next(),
88 as well as __getitem__. 'make_it' must be a function returning a fresh
89 iterator to be tested (since this may test the iterator twice)."""
90
91 it = make_it()
92 n = 0
93 for item in match:
Phillip J. Eby403019b2006-06-12 04:04:32 +000094 if not it[n]==item: raise AssertionError
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000095 n+=1
96 try:
97 it[n]
98 except IndexError:
99 pass
100 else:
101 raise AssertionError("Too many items from __getitem__",it)
102
103 try:
104 iter, StopIteration
105 except NameError:
106 pass
107 else:
108 # Only test iter mode under 2.2+
109 it = make_it()
Phillip J. Eby403019b2006-06-12 04:04:32 +0000110 if not iter(it) is it: raise AssertionError
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000111 for item in match:
Phillip J. Eby403019b2006-06-12 04:04:32 +0000112 if not it.next()==item: raise AssertionError
113 try:
114 it.next()
115 except StopIteration:
116 pass
117 else:
118 raise AssertionError("Too many items from .next()",it)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000119
120
121
122
123
124
125class IntegrationTests(TestCase):
126
127 def check_hello(self, out, has_length=True):
128 self.assertEqual(out,
129 "HTTP/1.0 200 OK\r\n"
130 "Server: WSGIServer/0.1 Python/"+sys.version.split()[0]+"\r\n"
131 "Content-Type: text/plain\r\n"
132 "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
133 (has_length and "Content-Length: 13\r\n" or "") +
134 "\r\n"
135 "Hello, world!"
136 )
137
138 def test_plain_hello(self):
139 out, err = run_amock()
140 self.check_hello(out)
141
142 def test_validated_hello(self):
143 out, err = run_amock(validator(hello_app))
144 # the middleware doesn't support len(), so content-length isn't there
145 self.check_hello(out, has_length=False)
146
147 def test_simple_validation_error(self):
148 def bad_app(environ,start_response):
149 start_response("200 OK", ('Content-Type','text/plain'))
150 return ["Hello, world!"]
151 out, err = run_amock(validator(bad_app))
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000152 self.assertTrue(out.endswith(
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000153 "A server error occurred. Please contact the administrator."
154 ))
155 self.assertEqual(
156 err.splitlines()[-2],
157 "AssertionError: Headers (('Content-Type', 'text/plain')) must"
158 " be of type list: <type 'tuple'>"
159 )
160
161
162
163
164
165
166class UtilityTests(TestCase):
167
168 def checkShift(self,sn_in,pi_in,part,sn_out,pi_out):
169 env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in}
170 util.setup_testing_defaults(env)
171 self.assertEqual(util.shift_path_info(env),part)
172 self.assertEqual(env['PATH_INFO'],pi_out)
173 self.assertEqual(env['SCRIPT_NAME'],sn_out)
174 return env
175
176 def checkDefault(self, key, value, alt=None):
177 # Check defaulting when empty
178 env = {}
179 util.setup_testing_defaults(env)
180 if isinstance(value,StringIO):
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000181 self.assertTrue(isinstance(env[key],StringIO))
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000182 else:
183 self.assertEqual(env[key],value)
184
185 # Check existing value
186 env = {key:alt}
187 util.setup_testing_defaults(env)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000188 self.assertTrue(env[key] is alt)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000189
190 def checkCrossDefault(self,key,value,**kw):
191 util.setup_testing_defaults(kw)
192 self.assertEqual(kw[key],value)
193
194 def checkAppURI(self,uri,**kw):
195 util.setup_testing_defaults(kw)
196 self.assertEqual(util.application_uri(kw),uri)
197
198 def checkReqURI(self,uri,query=1,**kw):
199 util.setup_testing_defaults(kw)
200 self.assertEqual(util.request_uri(kw,query),uri)
201
202
203
204
205
206
207 def checkFW(self,text,size,match):
208
209 def make_it(text=text,size=size):
210 return util.FileWrapper(StringIO(text),size)
211
Phillip J. Eby403019b2006-06-12 04:04:32 +0000212 compare_generic_iter(make_it,match)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000213
214 it = make_it()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000215 self.assertFalse(it.filelike.closed)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000216
217 for item in it:
218 pass
219
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000220 self.assertFalse(it.filelike.closed)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000221
222 it.close()
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000223 self.assertTrue(it.filelike.closed)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000224
225
226 def testSimpleShifts(self):
227 self.checkShift('','/', '', '/', '')
228 self.checkShift('','/x', 'x', '/x', '')
229 self.checkShift('/','', None, '/', '')
230 self.checkShift('/a','/x/y', 'x', '/a/x', '/y')
231 self.checkShift('/a','/x/', 'x', '/a/x', '/')
232
233
234 def testNormalizedShifts(self):
235 self.checkShift('/a/b', '/../y', '..', '/a', '/y')
236 self.checkShift('', '/../y', '..', '', '/y')
237 self.checkShift('/a/b', '//y', 'y', '/a/b/y', '')
238 self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/')
239 self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '')
240 self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/')
241 self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/')
242 self.checkShift('/a/b', '///', '', '/a/b/', '')
243 self.checkShift('/a/b', '/.//', '', '/a/b/', '')
244 self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
245 self.checkShift('/a/b', '/.', None, '/a/b', '')
246
247
248 def testDefaults(self):
249 for key, value in [
250 ('SERVER_NAME','127.0.0.1'),
251 ('SERVER_PORT', '80'),
252 ('SERVER_PROTOCOL','HTTP/1.0'),
253 ('HTTP_HOST','127.0.0.1'),
254 ('REQUEST_METHOD','GET'),
255 ('SCRIPT_NAME',''),
256 ('PATH_INFO','/'),
257 ('wsgi.version', (1,0)),
258 ('wsgi.run_once', 0),
259 ('wsgi.multithread', 0),
260 ('wsgi.multiprocess', 0),
261 ('wsgi.input', StringIO("")),
262 ('wsgi.errors', StringIO()),
263 ('wsgi.url_scheme','http'),
264 ]:
265 self.checkDefault(key,value)
266
267
268 def testCrossDefaults(self):
269 self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar")
270 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on")
271 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1")
272 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes")
273 self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo")
274 self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo")
275 self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on")
276
277
278 def testGuessScheme(self):
279 self.assertEqual(util.guess_scheme({}), "http")
280 self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http")
281 self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https")
282 self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https")
283 self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https")
284
285
286
287
288
289 def testAppURIs(self):
290 self.checkAppURI("http://127.0.0.1/")
291 self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
292 self.checkAppURI("http://spam.example.com:2071/",
293 HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071")
294 self.checkAppURI("http://spam.example.com/",
295 SERVER_NAME="spam.example.com")
296 self.checkAppURI("http://127.0.0.1/",
297 HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com")
298 self.checkAppURI("https://127.0.0.1/", HTTPS="on")
299 self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000",
300 HTTP_HOST=None)
301
302 def testReqURIs(self):
303 self.checkReqURI("http://127.0.0.1/")
304 self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
305 self.checkReqURI("http://127.0.0.1/spammity/spam",
306 SCRIPT_NAME="/spammity", PATH_INFO="/spam")
307 self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
308 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
309 self.checkReqURI("http://127.0.0.1/spammity/spam", 0,
310 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
311
312 def testFileWrapper(self):
313 self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10])
314
315 def testHopByHop(self):
316 for hop in (
317 "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization "
318 "TE Trailers Transfer-Encoding Upgrade"
319 ).split():
320 for alt in hop, hop.title(), hop.upper(), hop.lower():
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000321 self.assertTrue(util.is_hop_by_hop(alt))
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000322
323 # Not comprehensive, just a few random header names
324 for hop in (
325 "Accept Cache-Control Date Pragma Trailer Via Warning"
326 ).split():
327 for alt in hop, hop.title(), hop.upper(), hop.lower():
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000328 self.assertFalse(util.is_hop_by_hop(alt))
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000329
330class HeaderTests(TestCase):
331
332 def testMappingInterface(self):
333 test = [('x','y')]
334 self.assertEqual(len(Headers([])),0)
335 self.assertEqual(len(Headers(test[:])),1)
336 self.assertEqual(Headers(test[:]).keys(), ['x'])
337 self.assertEqual(Headers(test[:]).values(), ['y'])
338 self.assertEqual(Headers(test[:]).items(), test)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000339 self.assertFalse(Headers(test).items() is test) # must be copy!
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000340
341 h=Headers([])
342 del h['foo'] # should not raise an error
343
344 h['Foo'] = 'bar'
345 for m in h.has_key, h.__contains__, h.get, h.get_all, h.__getitem__:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000346 self.assertTrue(m('foo'))
347 self.assertTrue(m('Foo'))
348 self.assertTrue(m('FOO'))
349 self.assertFalse(m('bar'))
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000350
351 self.assertEqual(h['foo'],'bar')
352 h['foo'] = 'baz'
353 self.assertEqual(h['FOO'],'baz')
354 self.assertEqual(h.get_all('foo'),['baz'])
355
356 self.assertEqual(h.get("foo","whee"), "baz")
357 self.assertEqual(h.get("zoo","whee"), "whee")
358 self.assertEqual(h.setdefault("foo","whee"), "baz")
359 self.assertEqual(h.setdefault("zoo","whee"), "whee")
360 self.assertEqual(h["foo"],"baz")
361 self.assertEqual(h["zoo"],"whee")
362
363 def testRequireList(self):
364 self.assertRaises(TypeError, Headers, "foo")
365
366
367 def testExtras(self):
368 h = Headers([])
369 self.assertEqual(str(h),'\r\n')
370
371 h.add_header('foo','bar',baz="spam")
372 self.assertEqual(h['foo'], 'bar; baz="spam"')
373 self.assertEqual(str(h),'foo: bar; baz="spam"\r\n\r\n')
374
375 h.add_header('Foo','bar',cheese=None)
376 self.assertEqual(h.get_all('foo'),
377 ['bar; baz="spam"', 'bar; cheese'])
378
379 self.assertEqual(str(h),
380 'foo: bar; baz="spam"\r\n'
381 'Foo: bar; cheese\r\n'
382 '\r\n'
383 )
384
385
386class ErrorHandler(BaseCGIHandler):
387 """Simple handler subclass for testing BaseHandler"""
388
389 def __init__(self,**kw):
390 setup_testing_defaults(kw)
391 BaseCGIHandler.__init__(
392 self, StringIO(''), StringIO(), StringIO(), kw,
393 multithread=True, multiprocess=True
394 )
395
396class TestHandler(ErrorHandler):
397 """Simple handler subclass for testing BaseHandler, w/error passthru"""
398
399 def handle_error(self):
400 raise # for testing, we want to see what's happening
401
402
403
404
405
406
407
408
409
410
411
412class HandlerTests(TestCase):
413
414 def checkEnvironAttrs(self, handler):
415 env = handler.environ
416 for attr in [
417 'version','multithread','multiprocess','run_once','file_wrapper'
418 ]:
419 if attr=='file_wrapper' and handler.wsgi_file_wrapper is None:
420 continue
421 self.assertEqual(getattr(handler,'wsgi_'+attr),env['wsgi.'+attr])
422
423 def checkOSEnviron(self,handler):
424 empty = {}; setup_testing_defaults(empty)
425 env = handler.environ
426 from os import environ
427 for k,v in environ.items():
428 if not empty.has_key(k):
429 self.assertEqual(env[k],v)
430 for k,v in empty.items():
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000431 self.assertTrue(env.has_key(k))
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000432
433 def testEnviron(self):
434 h = TestHandler(X="Y")
435 h.setup_environ()
436 self.checkEnvironAttrs(h)
437 self.checkOSEnviron(h)
438 self.assertEqual(h.environ["X"],"Y")
439
440 def testCGIEnviron(self):
441 h = BaseCGIHandler(None,None,None,{})
442 h.setup_environ()
443 for key in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors':
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000444 self.assertTrue(h.environ.has_key(key))
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000445
446 def testScheme(self):
447 h=TestHandler(HTTPS="on"); h.setup_environ()
448 self.assertEqual(h.environ['wsgi.url_scheme'],'https')
449 h=TestHandler(); h.setup_environ()
450 self.assertEqual(h.environ['wsgi.url_scheme'],'http')
451
452
453 def testAbstractMethods(self):
454 h = BaseHandler()
455 for name in [
456 '_flush','get_stdin','get_stderr','add_cgi_vars'
457 ]:
458 self.assertRaises(NotImplementedError, getattr(h,name))
459 self.assertRaises(NotImplementedError, h._write, "test")
460
461
462 def testContentLength(self):
463 # Demo one reason iteration is better than write()... ;)
464
465 def trivial_app1(e,s):
466 s('200 OK',[])
467 return [e['wsgi.url_scheme']]
468
469 def trivial_app2(e,s):
470 s('200 OK',[])(e['wsgi.url_scheme'])
471 return []
472
473 h = TestHandler()
474 h.run(trivial_app1)
475 self.assertEqual(h.stdout.getvalue(),
476 "Status: 200 OK\r\n"
477 "Content-Length: 4\r\n"
478 "\r\n"
479 "http")
480
481 h = TestHandler()
482 h.run(trivial_app2)
483 self.assertEqual(h.stdout.getvalue(),
484 "Status: 200 OK\r\n"
485 "\r\n"
486 "http")
487
488
489
490
491
492
493
494 def testBasicErrorOutput(self):
495
496 def non_error_app(e,s):
497 s('200 OK',[])
498 return []
499
500 def error_app(e,s):
501 raise AssertionError("This should be caught by handler")
502
503 h = ErrorHandler()
504 h.run(non_error_app)
505 self.assertEqual(h.stdout.getvalue(),
506 "Status: 200 OK\r\n"
507 "Content-Length: 0\r\n"
508 "\r\n")
509 self.assertEqual(h.stderr.getvalue(),"")
510
511 h = ErrorHandler()
512 h.run(error_app)
513 self.assertEqual(h.stdout.getvalue(),
514 "Status: %s\r\n"
515 "Content-Type: text/plain\r\n"
516 "Content-Length: %d\r\n"
517 "\r\n%s" % (h.error_status,len(h.error_body),h.error_body))
518
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000519 self.assertTrue(h.stderr.getvalue().find("AssertionError")<>-1)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000520
521 def testErrorAfterOutput(self):
522 MSG = "Some output has been sent"
523 def error_app(e,s):
524 s("200 OK",[])(MSG)
525 raise AssertionError("This should be caught by handler")
526
527 h = ErrorHandler()
528 h.run(error_app)
529 self.assertEqual(h.stdout.getvalue(),
530 "Status: 200 OK\r\n"
531 "\r\n"+MSG)
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000532 self.assertTrue(h.stderr.getvalue().find("AssertionError")<>-1)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000533
534
535 def testHeaderFormats(self):
536
537 def non_error_app(e,s):
538 s('200 OK',[])
539 return []
540
541 stdpat = (
542 r"HTTP/%s 200 OK\r\n"
543 r"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n"
544 r"%s" r"Content-Length: 0\r\n" r"\r\n"
545 )
546 shortpat = (
547 "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
548 )
549
550 for ssw in "FooBar/1.0", None:
551 sw = ssw and "Server: %s\r\n" % ssw or ""
552
553 for version in "1.0", "1.1":
554 for proto in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1":
555
556 h = TestHandler(SERVER_PROTOCOL=proto)
557 h.origin_server = False
558 h.http_version = version
559 h.server_software = ssw
560 h.run(non_error_app)
561 self.assertEqual(shortpat,h.stdout.getvalue())
562
563 h = TestHandler(SERVER_PROTOCOL=proto)
564 h.origin_server = True
565 h.http_version = version
566 h.server_software = ssw
567 h.run(non_error_app)
568 if proto=="HTTP/0.9":
569 self.assertEqual(h.stdout.getvalue(),"")
570 else:
Benjamin Peterson5c8da862009-06-30 22:57:08 +0000571 self.assertTrue(
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000572 re.match(stdpat%(version,sw), h.stdout.getvalue()),
573 (stdpat%(version,sw), h.stdout.getvalue())
574 )
575
576# This epilogue is needed for compatibility with the Python 2.5 regrtest module
577
578def test_main():
Collin Winterc2898c52007-04-25 17:29:52 +0000579 test_support.run_unittest(__name__)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000580
581if __name__ == "__main__":
582 test_main()
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612# the above lines intentionally left blank