blob: 1ec271b81a82798f9e0d855058d74bcd9a7edb07 [file] [log] [blame]
Phillip J. Eby5cf565d2006-06-09 16:40:18 +00001from __future__ import nested_scopes # Backward compat for 2.1
2from unittest import TestSuite, TestCase, makeSuite
3from wsgiref.util import setup_testing_defaults
4from wsgiref.headers import Headers
5from wsgiref.handlers import BaseHandler, BaseCGIHandler
6from wsgiref import util
7from wsgiref.validate import validator
8from wsgiref.simple_server import WSGIServer, WSGIRequestHandler, demo_app
9from wsgiref.simple_server import make_server
10from StringIO import StringIO
11from SocketServer import BaseServer
12import re, sys
13
14
15class MockServer(WSGIServer):
16 """Non-socket HTTP server"""
17
18 def __init__(self, server_address, RequestHandlerClass):
19 BaseServer.__init__(self, server_address, RequestHandlerClass)
20 self.server_bind()
21
22 def server_bind(self):
23 host, port = self.server_address
24 self.server_name = host
25 self.server_port = port
26 self.setup_environ()
27
28
29class MockHandler(WSGIRequestHandler):
30 """Non-socket HTTP handler"""
31 def setup(self):
32 self.connection = self.request
33 self.rfile, self.wfile = self.connection
34
35 def finish(self):
36 pass
37
38
39
40
41
42def hello_app(environ,start_response):
43 start_response("200 OK", [
44 ('Content-Type','text/plain'),
45 ('Date','Mon, 05 Jun 2006 18:49:54 GMT')
46 ])
47 return ["Hello, world!"]
48
49def run_amock(app=hello_app, data="GET / HTTP/1.0\n\n"):
50 server = make_server("", 80, app, MockServer, MockHandler)
51 inp, out, err, olderr = StringIO(data), StringIO(), StringIO(), sys.stderr
52 sys.stderr = err
53
54 try:
55 server.finish_request((inp,out), ("127.0.0.1",8888))
56 finally:
57 sys.stderr = olderr
58
59 return out.getvalue(), err.getvalue()
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
Phillip J. Eby403019b2006-06-12 04:04:32 +000083def compare_generic_iter(make_it,match):
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000084 """Utility to compare a generic 2.1/2.2+ iterator with an iterable
85
86 If running under Python 2.2+, this tests the iterator using iter()/next(),
87 as well as __getitem__. 'make_it' must be a function returning a fresh
88 iterator to be tested (since this may test the iterator twice)."""
89
90 it = make_it()
91 n = 0
92 for item in match:
Phillip J. Eby403019b2006-06-12 04:04:32 +000093 if not it[n]==item: raise AssertionError
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000094 n+=1
95 try:
96 it[n]
97 except IndexError:
98 pass
99 else:
100 raise AssertionError("Too many items from __getitem__",it)
101
102 try:
103 iter, StopIteration
104 except NameError:
105 pass
106 else:
107 # Only test iter mode under 2.2+
108 it = make_it()
Phillip J. Eby403019b2006-06-12 04:04:32 +0000109 if not iter(it) is it: raise AssertionError
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000110 for item in match:
Phillip J. Eby403019b2006-06-12 04:04:32 +0000111 if not it.next()==item: raise AssertionError
112 try:
113 it.next()
114 except StopIteration:
115 pass
116 else:
117 raise AssertionError("Too many items from .next()",it)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000118
119
120
121
122
123
124class IntegrationTests(TestCase):
125
126 def check_hello(self, out, has_length=True):
127 self.assertEqual(out,
128 "HTTP/1.0 200 OK\r\n"
129 "Server: WSGIServer/0.1 Python/"+sys.version.split()[0]+"\r\n"
130 "Content-Type: text/plain\r\n"
131 "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
132 (has_length and "Content-Length: 13\r\n" or "") +
133 "\r\n"
134 "Hello, world!"
135 )
136
137 def test_plain_hello(self):
138 out, err = run_amock()
139 self.check_hello(out)
140
141 def test_validated_hello(self):
142 out, err = run_amock(validator(hello_app))
143 # the middleware doesn't support len(), so content-length isn't there
144 self.check_hello(out, has_length=False)
145
146 def test_simple_validation_error(self):
147 def bad_app(environ,start_response):
148 start_response("200 OK", ('Content-Type','text/plain'))
149 return ["Hello, world!"]
150 out, err = run_amock(validator(bad_app))
151 self.failUnless(out.endswith(
152 "A server error occurred. Please contact the administrator."
153 ))
154 self.assertEqual(
155 err.splitlines()[-2],
156 "AssertionError: Headers (('Content-Type', 'text/plain')) must"
157 " be of type list: <type 'tuple'>"
158 )
159
160
161
162
163
164
165class UtilityTests(TestCase):
166
167 def checkShift(self,sn_in,pi_in,part,sn_out,pi_out):
168 env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in}
169 util.setup_testing_defaults(env)
170 self.assertEqual(util.shift_path_info(env),part)
171 self.assertEqual(env['PATH_INFO'],pi_out)
172 self.assertEqual(env['SCRIPT_NAME'],sn_out)
173 return env
174
175 def checkDefault(self, key, value, alt=None):
176 # Check defaulting when empty
177 env = {}
178 util.setup_testing_defaults(env)
179 if isinstance(value,StringIO):
180 self.failUnless(isinstance(env[key],StringIO))
181 else:
182 self.assertEqual(env[key],value)
183
184 # Check existing value
185 env = {key:alt}
186 util.setup_testing_defaults(env)
187 self.failUnless(env[key] is alt)
188
189 def checkCrossDefault(self,key,value,**kw):
190 util.setup_testing_defaults(kw)
191 self.assertEqual(kw[key],value)
192
193 def checkAppURI(self,uri,**kw):
194 util.setup_testing_defaults(kw)
195 self.assertEqual(util.application_uri(kw),uri)
196
197 def checkReqURI(self,uri,query=1,**kw):
198 util.setup_testing_defaults(kw)
199 self.assertEqual(util.request_uri(kw,query),uri)
200
201
202
203
204
205
206 def checkFW(self,text,size,match):
207
208 def make_it(text=text,size=size):
209 return util.FileWrapper(StringIO(text),size)
210
Phillip J. Eby403019b2006-06-12 04:04:32 +0000211 compare_generic_iter(make_it,match)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000212
213 it = make_it()
214 self.failIf(it.filelike.closed)
215
216 for item in it:
217 pass
218
219 self.failIf(it.filelike.closed)
220
221 it.close()
222 self.failUnless(it.filelike.closed)
223
224
225 def testSimpleShifts(self):
226 self.checkShift('','/', '', '/', '')
227 self.checkShift('','/x', 'x', '/x', '')
228 self.checkShift('/','', None, '/', '')
229 self.checkShift('/a','/x/y', 'x', '/a/x', '/y')
230 self.checkShift('/a','/x/', 'x', '/a/x', '/')
231
232
233 def testNormalizedShifts(self):
234 self.checkShift('/a/b', '/../y', '..', '/a', '/y')
235 self.checkShift('', '/../y', '..', '', '/y')
236 self.checkShift('/a/b', '//y', 'y', '/a/b/y', '')
237 self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/')
238 self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '')
239 self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/')
240 self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/')
241 self.checkShift('/a/b', '///', '', '/a/b/', '')
242 self.checkShift('/a/b', '/.//', '', '/a/b/', '')
243 self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
244 self.checkShift('/a/b', '/.', None, '/a/b', '')
245
246
247 def testDefaults(self):
248 for key, value in [
249 ('SERVER_NAME','127.0.0.1'),
250 ('SERVER_PORT', '80'),
251 ('SERVER_PROTOCOL','HTTP/1.0'),
252 ('HTTP_HOST','127.0.0.1'),
253 ('REQUEST_METHOD','GET'),
254 ('SCRIPT_NAME',''),
255 ('PATH_INFO','/'),
256 ('wsgi.version', (1,0)),
257 ('wsgi.run_once', 0),
258 ('wsgi.multithread', 0),
259 ('wsgi.multiprocess', 0),
260 ('wsgi.input', StringIO("")),
261 ('wsgi.errors', StringIO()),
262 ('wsgi.url_scheme','http'),
263 ]:
264 self.checkDefault(key,value)
265
266
267 def testCrossDefaults(self):
268 self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar")
269 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on")
270 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1")
271 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes")
272 self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo")
273 self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo")
274 self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on")
275
276
277 def testGuessScheme(self):
278 self.assertEqual(util.guess_scheme({}), "http")
279 self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http")
280 self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https")
281 self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https")
282 self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https")
283
284
285
286
287
288 def testAppURIs(self):
289 self.checkAppURI("http://127.0.0.1/")
290 self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
291 self.checkAppURI("http://spam.example.com:2071/",
292 HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071")
293 self.checkAppURI("http://spam.example.com/",
294 SERVER_NAME="spam.example.com")
295 self.checkAppURI("http://127.0.0.1/",
296 HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com")
297 self.checkAppURI("https://127.0.0.1/", HTTPS="on")
298 self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000",
299 HTTP_HOST=None)
300
301 def testReqURIs(self):
302 self.checkReqURI("http://127.0.0.1/")
303 self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
304 self.checkReqURI("http://127.0.0.1/spammity/spam",
305 SCRIPT_NAME="/spammity", PATH_INFO="/spam")
306 self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
307 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
308 self.checkReqURI("http://127.0.0.1/spammity/spam", 0,
309 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
310
311 def testFileWrapper(self):
312 self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10])
313
314 def testHopByHop(self):
315 for hop in (
316 "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization "
317 "TE Trailers Transfer-Encoding Upgrade"
318 ).split():
319 for alt in hop, hop.title(), hop.upper(), hop.lower():
320 self.failUnless(util.is_hop_by_hop(alt))
321
322 # Not comprehensive, just a few random header names
323 for hop in (
324 "Accept Cache-Control Date Pragma Trailer Via Warning"
325 ).split():
326 for alt in hop, hop.title(), hop.upper(), hop.lower():
327 self.failIf(util.is_hop_by_hop(alt))
328
329class HeaderTests(TestCase):
330
331 def testMappingInterface(self):
332 test = [('x','y')]
333 self.assertEqual(len(Headers([])),0)
334 self.assertEqual(len(Headers(test[:])),1)
335 self.assertEqual(Headers(test[:]).keys(), ['x'])
336 self.assertEqual(Headers(test[:]).values(), ['y'])
337 self.assertEqual(Headers(test[:]).items(), test)
338 self.failIf(Headers(test).items() is test) # must be copy!
339
340 h=Headers([])
341 del h['foo'] # should not raise an error
342
343 h['Foo'] = 'bar'
344 for m in h.has_key, h.__contains__, h.get, h.get_all, h.__getitem__:
345 self.failUnless(m('foo'))
346 self.failUnless(m('Foo'))
347 self.failUnless(m('FOO'))
348 self.failIf(m('bar'))
349
350 self.assertEqual(h['foo'],'bar')
351 h['foo'] = 'baz'
352 self.assertEqual(h['FOO'],'baz')
353 self.assertEqual(h.get_all('foo'),['baz'])
354
355 self.assertEqual(h.get("foo","whee"), "baz")
356 self.assertEqual(h.get("zoo","whee"), "whee")
357 self.assertEqual(h.setdefault("foo","whee"), "baz")
358 self.assertEqual(h.setdefault("zoo","whee"), "whee")
359 self.assertEqual(h["foo"],"baz")
360 self.assertEqual(h["zoo"],"whee")
361
362 def testRequireList(self):
363 self.assertRaises(TypeError, Headers, "foo")
364
365
366 def testExtras(self):
367 h = Headers([])
368 self.assertEqual(str(h),'\r\n')
369
370 h.add_header('foo','bar',baz="spam")
371 self.assertEqual(h['foo'], 'bar; baz="spam"')
372 self.assertEqual(str(h),'foo: bar; baz="spam"\r\n\r\n')
373
374 h.add_header('Foo','bar',cheese=None)
375 self.assertEqual(h.get_all('foo'),
376 ['bar; baz="spam"', 'bar; cheese'])
377
378 self.assertEqual(str(h),
379 'foo: bar; baz="spam"\r\n'
380 'Foo: bar; cheese\r\n'
381 '\r\n'
382 )
383
384
385class ErrorHandler(BaseCGIHandler):
386 """Simple handler subclass for testing BaseHandler"""
387
388 def __init__(self,**kw):
389 setup_testing_defaults(kw)
390 BaseCGIHandler.__init__(
391 self, StringIO(''), StringIO(), StringIO(), kw,
392 multithread=True, multiprocess=True
393 )
394
395class TestHandler(ErrorHandler):
396 """Simple handler subclass for testing BaseHandler, w/error passthru"""
397
398 def handle_error(self):
399 raise # for testing, we want to see what's happening
400
401
402
403
404
405
406
407
408
409
410
411class HandlerTests(TestCase):
412
413 def checkEnvironAttrs(self, handler):
414 env = handler.environ
415 for attr in [
416 'version','multithread','multiprocess','run_once','file_wrapper'
417 ]:
418 if attr=='file_wrapper' and handler.wsgi_file_wrapper is None:
419 continue
420 self.assertEqual(getattr(handler,'wsgi_'+attr),env['wsgi.'+attr])
421
422 def checkOSEnviron(self,handler):
423 empty = {}; setup_testing_defaults(empty)
424 env = handler.environ
425 from os import environ
426 for k,v in environ.items():
427 if not empty.has_key(k):
428 self.assertEqual(env[k],v)
429 for k,v in empty.items():
430 self.failUnless(env.has_key(k))
431
432 def testEnviron(self):
433 h = TestHandler(X="Y")
434 h.setup_environ()
435 self.checkEnvironAttrs(h)
436 self.checkOSEnviron(h)
437 self.assertEqual(h.environ["X"],"Y")
438
439 def testCGIEnviron(self):
440 h = BaseCGIHandler(None,None,None,{})
441 h.setup_environ()
442 for key in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors':
Tim Peters06524b62006-06-11 20:52:59 +0000443 self.assert_(h.environ.has_key(key))
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000444
445 def testScheme(self):
446 h=TestHandler(HTTPS="on"); h.setup_environ()
447 self.assertEqual(h.environ['wsgi.url_scheme'],'https')
448 h=TestHandler(); h.setup_environ()
449 self.assertEqual(h.environ['wsgi.url_scheme'],'http')
450
451
452 def testAbstractMethods(self):
453 h = BaseHandler()
454 for name in [
455 '_flush','get_stdin','get_stderr','add_cgi_vars'
456 ]:
457 self.assertRaises(NotImplementedError, getattr(h,name))
458 self.assertRaises(NotImplementedError, h._write, "test")
459
460
461 def testContentLength(self):
462 # Demo one reason iteration is better than write()... ;)
463
464 def trivial_app1(e,s):
465 s('200 OK',[])
466 return [e['wsgi.url_scheme']]
467
468 def trivial_app2(e,s):
469 s('200 OK',[])(e['wsgi.url_scheme'])
470 return []
471
472 h = TestHandler()
473 h.run(trivial_app1)
474 self.assertEqual(h.stdout.getvalue(),
475 "Status: 200 OK\r\n"
476 "Content-Length: 4\r\n"
477 "\r\n"
478 "http")
479
480 h = TestHandler()
481 h.run(trivial_app2)
482 self.assertEqual(h.stdout.getvalue(),
483 "Status: 200 OK\r\n"
484 "\r\n"
485 "http")
486
487
488
489
490
491
492
493 def testBasicErrorOutput(self):
494
495 def non_error_app(e,s):
496 s('200 OK',[])
497 return []
498
499 def error_app(e,s):
500 raise AssertionError("This should be caught by handler")
501
502 h = ErrorHandler()
503 h.run(non_error_app)
504 self.assertEqual(h.stdout.getvalue(),
505 "Status: 200 OK\r\n"
506 "Content-Length: 0\r\n"
507 "\r\n")
508 self.assertEqual(h.stderr.getvalue(),"")
509
510 h = ErrorHandler()
511 h.run(error_app)
512 self.assertEqual(h.stdout.getvalue(),
513 "Status: %s\r\n"
514 "Content-Type: text/plain\r\n"
515 "Content-Length: %d\r\n"
516 "\r\n%s" % (h.error_status,len(h.error_body),h.error_body))
517
518 self.failUnless(h.stderr.getvalue().find("AssertionError")<>-1)
519
520 def testErrorAfterOutput(self):
521 MSG = "Some output has been sent"
522 def error_app(e,s):
523 s("200 OK",[])(MSG)
524 raise AssertionError("This should be caught by handler")
525
526 h = ErrorHandler()
527 h.run(error_app)
528 self.assertEqual(h.stdout.getvalue(),
529 "Status: 200 OK\r\n"
530 "\r\n"+MSG)
531 self.failUnless(h.stderr.getvalue().find("AssertionError")<>-1)
532
533
534 def testHeaderFormats(self):
535
536 def non_error_app(e,s):
537 s('200 OK',[])
538 return []
539
540 stdpat = (
541 r"HTTP/%s 200 OK\r\n"
542 r"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n"
543 r"%s" r"Content-Length: 0\r\n" r"\r\n"
544 )
545 shortpat = (
546 "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
547 )
548
549 for ssw in "FooBar/1.0", None:
550 sw = ssw and "Server: %s\r\n" % ssw or ""
551
552 for version in "1.0", "1.1":
553 for proto in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1":
554
555 h = TestHandler(SERVER_PROTOCOL=proto)
556 h.origin_server = False
557 h.http_version = version
558 h.server_software = ssw
559 h.run(non_error_app)
560 self.assertEqual(shortpat,h.stdout.getvalue())
561
562 h = TestHandler(SERVER_PROTOCOL=proto)
563 h.origin_server = True
564 h.http_version = version
565 h.server_software = ssw
566 h.run(non_error_app)
567 if proto=="HTTP/0.9":
568 self.assertEqual(h.stdout.getvalue(),"")
569 else:
570 self.failUnless(
571 re.match(stdpat%(version,sw), h.stdout.getvalue()),
572 (stdpat%(version,sw), h.stdout.getvalue())
573 )
574
575# This epilogue is needed for compatibility with the Python 2.5 regrtest module
576
577def test_main():
578 import unittest
579 from test.test_support import run_suite
580 run_suite(
581 unittest.defaultTestLoader.loadTestsFromModule(sys.modules[__name__])
582 )
583
584if __name__ == "__main__":
585 test_main()
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615# the above lines intentionally left blank