blob: c8d126230d671969f120e0d5189225149ca55e05 [file] [log] [blame]
Phillip J. Eby5cf565d2006-06-09 16:40:18 +00001from __future__ import nested_scopes # Backward compat for 2.1
2from unittest import TestSuite, TestCase, makeSuite
3from wsgiref.util import setup_testing_defaults
4from wsgiref.headers import Headers
5from wsgiref.handlers import BaseHandler, BaseCGIHandler
6from wsgiref import util
7from wsgiref.validate import validator
8from wsgiref.simple_server import WSGIServer, WSGIRequestHandler, demo_app
9from wsgiref.simple_server import make_server
10from StringIO import StringIO
11from SocketServer import BaseServer
12import re, sys
13
14
15class MockServer(WSGIServer):
16 """Non-socket HTTP server"""
17
18 def __init__(self, server_address, RequestHandlerClass):
19 BaseServer.__init__(self, server_address, RequestHandlerClass)
20 self.server_bind()
21
22 def server_bind(self):
23 host, port = self.server_address
24 self.server_name = host
25 self.server_port = port
26 self.setup_environ()
27
28
29class MockHandler(WSGIRequestHandler):
30 """Non-socket HTTP handler"""
31 def setup(self):
32 self.connection = self.request
33 self.rfile, self.wfile = self.connection
34
35 def finish(self):
36 pass
37
38
39
40
41
42def hello_app(environ,start_response):
43 start_response("200 OK", [
44 ('Content-Type','text/plain'),
45 ('Date','Mon, 05 Jun 2006 18:49:54 GMT')
46 ])
47 return ["Hello, world!"]
48
49def run_amock(app=hello_app, data="GET / HTTP/1.0\n\n"):
50 server = make_server("", 80, app, MockServer, MockHandler)
51 inp, out, err, olderr = StringIO(data), StringIO(), StringIO(), sys.stderr
52 sys.stderr = err
53
54 try:
55 server.finish_request((inp,out), ("127.0.0.1",8888))
56 finally:
57 sys.stderr = olderr
58
59 return out.getvalue(), err.getvalue()
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
Tim Peters06524b62006-06-11 20:52:59 +000083def compare_generic_iter(test, make_it, match):
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000084 """Utility to compare a generic 2.1/2.2+ iterator with an iterable
85
86 If running under Python 2.2+, this tests the iterator using iter()/next(),
87 as well as __getitem__. 'make_it' must be a function returning a fresh
88 iterator to be tested (since this may test the iterator twice)."""
89
90 it = make_it()
91 n = 0
92 for item in match:
Tim Peters06524b62006-06-11 20:52:59 +000093 test.assertEqual(it[n], item)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +000094 n+=1
95 try:
96 it[n]
97 except IndexError:
98 pass
99 else:
100 raise AssertionError("Too many items from __getitem__",it)
101
102 try:
103 iter, StopIteration
104 except NameError:
105 pass
106 else:
107 # Only test iter mode under 2.2+
108 it = make_it()
Tim Peters06524b62006-06-11 20:52:59 +0000109 test.assert_(iter(it) is it)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000110 for item in match:
Tim Peters06524b62006-06-11 20:52:59 +0000111 test.assertEqual(it.next(), item)
112 test.assertRaises(StopIteration, it.next)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000113
114
115
116
117
118
119class IntegrationTests(TestCase):
120
121 def check_hello(self, out, has_length=True):
122 self.assertEqual(out,
123 "HTTP/1.0 200 OK\r\n"
124 "Server: WSGIServer/0.1 Python/"+sys.version.split()[0]+"\r\n"
125 "Content-Type: text/plain\r\n"
126 "Date: Mon, 05 Jun 2006 18:49:54 GMT\r\n" +
127 (has_length and "Content-Length: 13\r\n" or "") +
128 "\r\n"
129 "Hello, world!"
130 )
131
132 def test_plain_hello(self):
133 out, err = run_amock()
134 self.check_hello(out)
135
136 def test_validated_hello(self):
137 out, err = run_amock(validator(hello_app))
138 # the middleware doesn't support len(), so content-length isn't there
139 self.check_hello(out, has_length=False)
140
141 def test_simple_validation_error(self):
142 def bad_app(environ,start_response):
143 start_response("200 OK", ('Content-Type','text/plain'))
144 return ["Hello, world!"]
145 out, err = run_amock(validator(bad_app))
146 self.failUnless(out.endswith(
147 "A server error occurred. Please contact the administrator."
148 ))
149 self.assertEqual(
150 err.splitlines()[-2],
151 "AssertionError: Headers (('Content-Type', 'text/plain')) must"
152 " be of type list: <type 'tuple'>"
153 )
154
155
156
157
158
159
160class UtilityTests(TestCase):
161
162 def checkShift(self,sn_in,pi_in,part,sn_out,pi_out):
163 env = {'SCRIPT_NAME':sn_in,'PATH_INFO':pi_in}
164 util.setup_testing_defaults(env)
165 self.assertEqual(util.shift_path_info(env),part)
166 self.assertEqual(env['PATH_INFO'],pi_out)
167 self.assertEqual(env['SCRIPT_NAME'],sn_out)
168 return env
169
170 def checkDefault(self, key, value, alt=None):
171 # Check defaulting when empty
172 env = {}
173 util.setup_testing_defaults(env)
174 if isinstance(value,StringIO):
175 self.failUnless(isinstance(env[key],StringIO))
176 else:
177 self.assertEqual(env[key],value)
178
179 # Check existing value
180 env = {key:alt}
181 util.setup_testing_defaults(env)
182 self.failUnless(env[key] is alt)
183
184 def checkCrossDefault(self,key,value,**kw):
185 util.setup_testing_defaults(kw)
186 self.assertEqual(kw[key],value)
187
188 def checkAppURI(self,uri,**kw):
189 util.setup_testing_defaults(kw)
190 self.assertEqual(util.application_uri(kw),uri)
191
192 def checkReqURI(self,uri,query=1,**kw):
193 util.setup_testing_defaults(kw)
194 self.assertEqual(util.request_uri(kw,query),uri)
195
196
197
198
199
200
201 def checkFW(self,text,size,match):
202
203 def make_it(text=text,size=size):
204 return util.FileWrapper(StringIO(text),size)
205
Tim Peters06524b62006-06-11 20:52:59 +0000206 compare_generic_iter(self, make_it, match)
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000207
208 it = make_it()
209 self.failIf(it.filelike.closed)
210
211 for item in it:
212 pass
213
214 self.failIf(it.filelike.closed)
215
216 it.close()
217 self.failUnless(it.filelike.closed)
218
219
220 def testSimpleShifts(self):
221 self.checkShift('','/', '', '/', '')
222 self.checkShift('','/x', 'x', '/x', '')
223 self.checkShift('/','', None, '/', '')
224 self.checkShift('/a','/x/y', 'x', '/a/x', '/y')
225 self.checkShift('/a','/x/', 'x', '/a/x', '/')
226
227
228 def testNormalizedShifts(self):
229 self.checkShift('/a/b', '/../y', '..', '/a', '/y')
230 self.checkShift('', '/../y', '..', '', '/y')
231 self.checkShift('/a/b', '//y', 'y', '/a/b/y', '')
232 self.checkShift('/a/b', '//y/', 'y', '/a/b/y', '/')
233 self.checkShift('/a/b', '/./y', 'y', '/a/b/y', '')
234 self.checkShift('/a/b', '/./y/', 'y', '/a/b/y', '/')
235 self.checkShift('/a/b', '///./..//y/.//', '..', '/a', '/y/')
236 self.checkShift('/a/b', '///', '', '/a/b/', '')
237 self.checkShift('/a/b', '/.//', '', '/a/b/', '')
238 self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
239 self.checkShift('/a/b', '/.', None, '/a/b', '')
240
241
242 def testDefaults(self):
243 for key, value in [
244 ('SERVER_NAME','127.0.0.1'),
245 ('SERVER_PORT', '80'),
246 ('SERVER_PROTOCOL','HTTP/1.0'),
247 ('HTTP_HOST','127.0.0.1'),
248 ('REQUEST_METHOD','GET'),
249 ('SCRIPT_NAME',''),
250 ('PATH_INFO','/'),
251 ('wsgi.version', (1,0)),
252 ('wsgi.run_once', 0),
253 ('wsgi.multithread', 0),
254 ('wsgi.multiprocess', 0),
255 ('wsgi.input', StringIO("")),
256 ('wsgi.errors', StringIO()),
257 ('wsgi.url_scheme','http'),
258 ]:
259 self.checkDefault(key,value)
260
261
262 def testCrossDefaults(self):
263 self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar")
264 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on")
265 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="1")
266 self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="yes")
267 self.checkCrossDefault('wsgi.url_scheme',"http",HTTPS="foo")
268 self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo")
269 self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on")
270
271
272 def testGuessScheme(self):
273 self.assertEqual(util.guess_scheme({}), "http")
274 self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http")
275 self.assertEqual(util.guess_scheme({'HTTPS':"on"}), "https")
276 self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https")
277 self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https")
278
279
280
281
282
283 def testAppURIs(self):
284 self.checkAppURI("http://127.0.0.1/")
285 self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
286 self.checkAppURI("http://spam.example.com:2071/",
287 HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071")
288 self.checkAppURI("http://spam.example.com/",
289 SERVER_NAME="spam.example.com")
290 self.checkAppURI("http://127.0.0.1/",
291 HTTP_HOST="127.0.0.1", SERVER_NAME="spam.example.com")
292 self.checkAppURI("https://127.0.0.1/", HTTPS="on")
293 self.checkAppURI("http://127.0.0.1:8000/", SERVER_PORT="8000",
294 HTTP_HOST=None)
295
296 def testReqURIs(self):
297 self.checkReqURI("http://127.0.0.1/")
298 self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
299 self.checkReqURI("http://127.0.0.1/spammity/spam",
300 SCRIPT_NAME="/spammity", PATH_INFO="/spam")
301 self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
302 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
303 self.checkReqURI("http://127.0.0.1/spammity/spam", 0,
304 SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
305
306 def testFileWrapper(self):
307 self.checkFW("xyz"*50, 120, ["xyz"*40,"xyz"*10])
308
309 def testHopByHop(self):
310 for hop in (
311 "Connection Keep-Alive Proxy-Authenticate Proxy-Authorization "
312 "TE Trailers Transfer-Encoding Upgrade"
313 ).split():
314 for alt in hop, hop.title(), hop.upper(), hop.lower():
315 self.failUnless(util.is_hop_by_hop(alt))
316
317 # Not comprehensive, just a few random header names
318 for hop in (
319 "Accept Cache-Control Date Pragma Trailer Via Warning"
320 ).split():
321 for alt in hop, hop.title(), hop.upper(), hop.lower():
322 self.failIf(util.is_hop_by_hop(alt))
323
324class HeaderTests(TestCase):
325
326 def testMappingInterface(self):
327 test = [('x','y')]
328 self.assertEqual(len(Headers([])),0)
329 self.assertEqual(len(Headers(test[:])),1)
330 self.assertEqual(Headers(test[:]).keys(), ['x'])
331 self.assertEqual(Headers(test[:]).values(), ['y'])
332 self.assertEqual(Headers(test[:]).items(), test)
333 self.failIf(Headers(test).items() is test) # must be copy!
334
335 h=Headers([])
336 del h['foo'] # should not raise an error
337
338 h['Foo'] = 'bar'
339 for m in h.has_key, h.__contains__, h.get, h.get_all, h.__getitem__:
340 self.failUnless(m('foo'))
341 self.failUnless(m('Foo'))
342 self.failUnless(m('FOO'))
343 self.failIf(m('bar'))
344
345 self.assertEqual(h['foo'],'bar')
346 h['foo'] = 'baz'
347 self.assertEqual(h['FOO'],'baz')
348 self.assertEqual(h.get_all('foo'),['baz'])
349
350 self.assertEqual(h.get("foo","whee"), "baz")
351 self.assertEqual(h.get("zoo","whee"), "whee")
352 self.assertEqual(h.setdefault("foo","whee"), "baz")
353 self.assertEqual(h.setdefault("zoo","whee"), "whee")
354 self.assertEqual(h["foo"],"baz")
355 self.assertEqual(h["zoo"],"whee")
356
357 def testRequireList(self):
358 self.assertRaises(TypeError, Headers, "foo")
359
360
361 def testExtras(self):
362 h = Headers([])
363 self.assertEqual(str(h),'\r\n')
364
365 h.add_header('foo','bar',baz="spam")
366 self.assertEqual(h['foo'], 'bar; baz="spam"')
367 self.assertEqual(str(h),'foo: bar; baz="spam"\r\n\r\n')
368
369 h.add_header('Foo','bar',cheese=None)
370 self.assertEqual(h.get_all('foo'),
371 ['bar; baz="spam"', 'bar; cheese'])
372
373 self.assertEqual(str(h),
374 'foo: bar; baz="spam"\r\n'
375 'Foo: bar; cheese\r\n'
376 '\r\n'
377 )
378
379
380class ErrorHandler(BaseCGIHandler):
381 """Simple handler subclass for testing BaseHandler"""
382
383 def __init__(self,**kw):
384 setup_testing_defaults(kw)
385 BaseCGIHandler.__init__(
386 self, StringIO(''), StringIO(), StringIO(), kw,
387 multithread=True, multiprocess=True
388 )
389
390class TestHandler(ErrorHandler):
391 """Simple handler subclass for testing BaseHandler, w/error passthru"""
392
393 def handle_error(self):
394 raise # for testing, we want to see what's happening
395
396
397
398
399
400
401
402
403
404
405
406class HandlerTests(TestCase):
407
408 def checkEnvironAttrs(self, handler):
409 env = handler.environ
410 for attr in [
411 'version','multithread','multiprocess','run_once','file_wrapper'
412 ]:
413 if attr=='file_wrapper' and handler.wsgi_file_wrapper is None:
414 continue
415 self.assertEqual(getattr(handler,'wsgi_'+attr),env['wsgi.'+attr])
416
417 def checkOSEnviron(self,handler):
418 empty = {}; setup_testing_defaults(empty)
419 env = handler.environ
420 from os import environ
421 for k,v in environ.items():
422 if not empty.has_key(k):
423 self.assertEqual(env[k],v)
424 for k,v in empty.items():
425 self.failUnless(env.has_key(k))
426
427 def testEnviron(self):
428 h = TestHandler(X="Y")
429 h.setup_environ()
430 self.checkEnvironAttrs(h)
431 self.checkOSEnviron(h)
432 self.assertEqual(h.environ["X"],"Y")
433
434 def testCGIEnviron(self):
435 h = BaseCGIHandler(None,None,None,{})
436 h.setup_environ()
437 for key in 'wsgi.url_scheme', 'wsgi.input', 'wsgi.errors':
Tim Peters06524b62006-06-11 20:52:59 +0000438 self.assert_(h.environ.has_key(key))
Phillip J. Eby5cf565d2006-06-09 16:40:18 +0000439
440 def testScheme(self):
441 h=TestHandler(HTTPS="on"); h.setup_environ()
442 self.assertEqual(h.environ['wsgi.url_scheme'],'https')
443 h=TestHandler(); h.setup_environ()
444 self.assertEqual(h.environ['wsgi.url_scheme'],'http')
445
446
447 def testAbstractMethods(self):
448 h = BaseHandler()
449 for name in [
450 '_flush','get_stdin','get_stderr','add_cgi_vars'
451 ]:
452 self.assertRaises(NotImplementedError, getattr(h,name))
453 self.assertRaises(NotImplementedError, h._write, "test")
454
455
456 def testContentLength(self):
457 # Demo one reason iteration is better than write()... ;)
458
459 def trivial_app1(e,s):
460 s('200 OK',[])
461 return [e['wsgi.url_scheme']]
462
463 def trivial_app2(e,s):
464 s('200 OK',[])(e['wsgi.url_scheme'])
465 return []
466
467 h = TestHandler()
468 h.run(trivial_app1)
469 self.assertEqual(h.stdout.getvalue(),
470 "Status: 200 OK\r\n"
471 "Content-Length: 4\r\n"
472 "\r\n"
473 "http")
474
475 h = TestHandler()
476 h.run(trivial_app2)
477 self.assertEqual(h.stdout.getvalue(),
478 "Status: 200 OK\r\n"
479 "\r\n"
480 "http")
481
482
483
484
485
486
487
488 def testBasicErrorOutput(self):
489
490 def non_error_app(e,s):
491 s('200 OK',[])
492 return []
493
494 def error_app(e,s):
495 raise AssertionError("This should be caught by handler")
496
497 h = ErrorHandler()
498 h.run(non_error_app)
499 self.assertEqual(h.stdout.getvalue(),
500 "Status: 200 OK\r\n"
501 "Content-Length: 0\r\n"
502 "\r\n")
503 self.assertEqual(h.stderr.getvalue(),"")
504
505 h = ErrorHandler()
506 h.run(error_app)
507 self.assertEqual(h.stdout.getvalue(),
508 "Status: %s\r\n"
509 "Content-Type: text/plain\r\n"
510 "Content-Length: %d\r\n"
511 "\r\n%s" % (h.error_status,len(h.error_body),h.error_body))
512
513 self.failUnless(h.stderr.getvalue().find("AssertionError")<>-1)
514
515 def testErrorAfterOutput(self):
516 MSG = "Some output has been sent"
517 def error_app(e,s):
518 s("200 OK",[])(MSG)
519 raise AssertionError("This should be caught by handler")
520
521 h = ErrorHandler()
522 h.run(error_app)
523 self.assertEqual(h.stdout.getvalue(),
524 "Status: 200 OK\r\n"
525 "\r\n"+MSG)
526 self.failUnless(h.stderr.getvalue().find("AssertionError")<>-1)
527
528
529 def testHeaderFormats(self):
530
531 def non_error_app(e,s):
532 s('200 OK',[])
533 return []
534
535 stdpat = (
536 r"HTTP/%s 200 OK\r\n"
537 r"Date: \w{3}, [ 0123]\d \w{3} \d{4} \d\d:\d\d:\d\d GMT\r\n"
538 r"%s" r"Content-Length: 0\r\n" r"\r\n"
539 )
540 shortpat = (
541 "Status: 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"
542 )
543
544 for ssw in "FooBar/1.0", None:
545 sw = ssw and "Server: %s\r\n" % ssw or ""
546
547 for version in "1.0", "1.1":
548 for proto in "HTTP/0.9", "HTTP/1.0", "HTTP/1.1":
549
550 h = TestHandler(SERVER_PROTOCOL=proto)
551 h.origin_server = False
552 h.http_version = version
553 h.server_software = ssw
554 h.run(non_error_app)
555 self.assertEqual(shortpat,h.stdout.getvalue())
556
557 h = TestHandler(SERVER_PROTOCOL=proto)
558 h.origin_server = True
559 h.http_version = version
560 h.server_software = ssw
561 h.run(non_error_app)
562 if proto=="HTTP/0.9":
563 self.assertEqual(h.stdout.getvalue(),"")
564 else:
565 self.failUnless(
566 re.match(stdpat%(version,sw), h.stdout.getvalue()),
567 (stdpat%(version,sw), h.stdout.getvalue())
568 )
569
570# This epilogue is needed for compatibility with the Python 2.5 regrtest module
571
572def test_main():
573 import unittest
574 from test.test_support import run_suite
575 run_suite(
576 unittest.defaultTestLoader.loadTestsFromModule(sys.modules[__name__])
577 )
578
579if __name__ == "__main__":
580 test_main()
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610# the above lines intentionally left blank