blob: b878dc1614f27794bc43d1f7f907f5638838089e [file] [log] [blame]
Guido van Rossum1a5e21e2006-02-28 21:57:43 +00001"""Unit tests for contextlib.py, and other context managers."""
2
Guido van Rossum1a5e21e2006-02-28 21:57:43 +00003
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00004import sys
Guido van Rossum1a5e21e2006-02-28 21:57:43 +00005import os
6import decimal
7import tempfile
8import unittest
9import threading
10from contextlib import * # Tests __all__
Benjamin Petersonee8712c2008-05-20 21:35:26 +000011from test import support
Guido van Rossum1a5e21e2006-02-28 21:57:43 +000012
13class ContextManagerTestCase(unittest.TestCase):
14
15 def test_contextmanager_plain(self):
16 state = []
17 @contextmanager
18 def woohoo():
19 state.append(1)
20 yield 42
21 state.append(999)
22 with woohoo() as x:
23 self.assertEqual(state, [1])
24 self.assertEqual(x, 42)
25 state.append(x)
26 self.assertEqual(state, [1, 42, 999])
27
28 def test_contextmanager_finally(self):
29 state = []
30 @contextmanager
31 def woohoo():
32 state.append(1)
33 try:
34 yield 42
35 finally:
36 state.append(999)
37 try:
38 with woohoo() as x:
39 self.assertEqual(state, [1])
40 self.assertEqual(x, 42)
41 state.append(x)
42 raise ZeroDivisionError()
43 except ZeroDivisionError:
44 pass
45 else:
46 self.fail("Expected ZeroDivisionError")
47 self.assertEqual(state, [1, 42, 999])
48
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000049 def test_contextmanager_no_reraise(self):
50 @contextmanager
51 def whee():
52 yield
Thomas Wouters477c8d52006-05-27 19:21:47 +000053 ctx = whee()
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000054 ctx.__enter__()
55 # Calling __exit__ should not result in an exception
56 self.failIf(ctx.__exit__(TypeError, TypeError("foo"), None))
57
58 def test_contextmanager_trap_yield_after_throw(self):
59 @contextmanager
60 def whoo():
61 try:
62 yield
63 except:
64 yield
Thomas Wouters477c8d52006-05-27 19:21:47 +000065 ctx = whoo()
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000066 ctx.__enter__()
67 self.assertRaises(
68 RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None
69 )
70
Guido van Rossum1a5e21e2006-02-28 21:57:43 +000071 def test_contextmanager_except(self):
72 state = []
73 @contextmanager
74 def woohoo():
75 state.append(1)
76 try:
77 yield 42
Guido van Rossumb940e112007-01-10 16:19:56 +000078 except ZeroDivisionError as e:
Guido van Rossum1a5e21e2006-02-28 21:57:43 +000079 state.append(e.args[0])
80 self.assertEqual(state, [1, 42, 999])
81 with woohoo() as x:
82 self.assertEqual(state, [1])
83 self.assertEqual(x, 42)
84 state.append(x)
85 raise ZeroDivisionError(999)
86 self.assertEqual(state, [1, 42, 999])
87
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000088 def test_contextmanager_attribs(self):
89 def attribs(**kw):
90 def decorate(func):
91 for k,v in kw.items():
92 setattr(func,k,v)
93 return func
94 return decorate
95 @contextmanager
96 @attribs(foo='bar')
97 def baz(spam):
98 """Whee!"""
99 self.assertEqual(baz.__name__,'baz')
100 self.assertEqual(baz.foo, 'bar')
101 self.assertEqual(baz.__doc__, "Whee!")
102
Guido van Rossum1a5e21e2006-02-28 21:57:43 +0000103class ClosingTestCase(unittest.TestCase):
104
105 # XXX This needs more work
106
107 def test_closing(self):
108 state = []
109 class C:
110 def close(self):
111 state.append(1)
112 x = C()
113 self.assertEqual(state, [])
114 with closing(x) as y:
115 self.assertEqual(x, y)
116 self.assertEqual(state, [1])
117
118 def test_closing_error(self):
119 state = []
120 class C:
121 def close(self):
122 state.append(1)
123 x = C()
124 self.assertEqual(state, [])
125 try:
126 with closing(x) as y:
127 self.assertEqual(x, y)
128 1/0
129 except ZeroDivisionError:
130 self.assertEqual(state, [1])
131 else:
132 self.fail("Didn't raise ZeroDivisionError")
133
134class FileContextTestCase(unittest.TestCase):
135
136 def testWithOpen(self):
137 tfn = tempfile.mktemp()
138 try:
139 f = None
140 with open(tfn, "w") as f:
141 self.failIf(f.closed)
142 f.write("Booh\n")
143 self.failUnless(f.closed)
144 f = None
145 try:
146 with open(tfn, "r") as f:
147 self.failIf(f.closed)
148 self.assertEqual(f.read(), "Booh\n")
149 1/0
150 except ZeroDivisionError:
151 self.failUnless(f.closed)
152 else:
153 self.fail("Didn't raise ZeroDivisionError")
154 finally:
155 try:
156 os.remove(tfn)
157 except os.error:
158 pass
159
160class LockContextTestCase(unittest.TestCase):
161
162 def boilerPlate(self, lock, locked):
163 self.failIf(locked())
164 with lock:
165 self.failUnless(locked())
166 self.failIf(locked())
167 try:
168 with lock:
169 self.failUnless(locked())
170 1/0
171 except ZeroDivisionError:
172 self.failIf(locked())
173 else:
174 self.fail("Didn't raise ZeroDivisionError")
175
176 def testWithLock(self):
177 lock = threading.Lock()
178 self.boilerPlate(lock, lock.locked)
179
180 def testWithRLock(self):
181 lock = threading.RLock()
182 self.boilerPlate(lock, lock._is_owned)
183
184 def testWithCondition(self):
185 lock = threading.Condition()
186 def locked():
187 return lock._is_owned()
188 self.boilerPlate(lock, locked)
189
190 def testWithSemaphore(self):
191 lock = threading.Semaphore()
192 def locked():
193 if lock.acquire(False):
194 lock.release()
195 return False
196 else:
197 return True
198 self.boilerPlate(lock, locked)
199
200 def testWithBoundedSemaphore(self):
201 lock = threading.BoundedSemaphore()
202 def locked():
203 if lock.acquire(False):
204 lock.release()
205 return False
206 else:
207 return True
208 self.boilerPlate(lock, locked)
209
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000210# This is needed to make the test actually run under regrtest.py!
211def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000212 support.run_unittest(__name__)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000213
Guido van Rossum1a5e21e2006-02-28 21:57:43 +0000214if __name__ == "__main__":
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000215 test_main()