blob: 855ad46ba0301880e3e2867caac7abedee2c55ed [file] [log] [blame]
Antoine Pitrouc747d3a2009-11-09 16:47:50 +00001"""
2Various tests for synchronization primitives.
3"""
4
5import sys
6import time
7from thread import start_new_thread, get_ident
8import threading
9import unittest
10
11from test import test_support as support
12
13
14def _wait():
15 # A crude wait/yield function not relying on synchronization primitives.
16 time.sleep(0.01)
17
18class Bunch(object):
19 """
20 A bunch of threads.
21 """
22 def __init__(self, f, n, wait_before_exit=False):
23 """
24 Construct a bunch of `n` threads running the same function `f`.
25 If `wait_before_exit` is True, the threads won't terminate until
26 do_finish() is called.
27 """
28 self.f = f
29 self.n = n
30 self.started = []
31 self.finished = []
32 self._can_exit = not wait_before_exit
33 def task():
34 tid = get_ident()
35 self.started.append(tid)
36 try:
37 f()
38 finally:
39 self.finished.append(tid)
40 while not self._can_exit:
41 _wait()
42 for i in range(n):
43 start_new_thread(task, ())
44
45 def wait_for_started(self):
46 while len(self.started) < self.n:
47 _wait()
48
49 def wait_for_finished(self):
50 while len(self.finished) < self.n:
51 _wait()
52
53 def do_finish(self):
54 self._can_exit = True
55
56
57class BaseTestCase(unittest.TestCase):
58 def setUp(self):
59 self._threads = support.threading_setup()
60
61 def tearDown(self):
62 support.threading_cleanup(*self._threads)
63 support.reap_children()
64
65
66class BaseLockTests(BaseTestCase):
67 """
68 Tests for both recursive and non-recursive locks.
69 """
70
71 def test_constructor(self):
72 lock = self.locktype()
73 del lock
74
75 def test_acquire_destroy(self):
76 lock = self.locktype()
77 lock.acquire()
78 del lock
79
80 def test_acquire_release(self):
81 lock = self.locktype()
82 lock.acquire()
83 lock.release()
84 del lock
85
86 def test_try_acquire(self):
87 lock = self.locktype()
88 self.assertTrue(lock.acquire(False))
89 lock.release()
90
91 def test_try_acquire_contended(self):
92 lock = self.locktype()
93 lock.acquire()
94 result = []
95 def f():
96 result.append(lock.acquire(False))
97 Bunch(f, 1).wait_for_finished()
98 self.assertFalse(result[0])
99 lock.release()
100
101 def test_acquire_contended(self):
102 lock = self.locktype()
103 lock.acquire()
104 N = 5
105 def f():
106 lock.acquire()
107 lock.release()
108
109 b = Bunch(f, N)
110 b.wait_for_started()
111 _wait()
112 self.assertEqual(len(b.finished), 0)
113 lock.release()
114 b.wait_for_finished()
115 self.assertEqual(len(b.finished), N)
116
117 def test_with(self):
118 lock = self.locktype()
119 def f():
120 lock.acquire()
121 lock.release()
122 def _with(err=None):
123 with lock:
124 if err is not None:
125 raise err
126 _with()
127 # Check the lock is unacquired
128 Bunch(f, 1).wait_for_finished()
129 self.assertRaises(TypeError, _with, TypeError)
130 # Check the lock is unacquired
131 Bunch(f, 1).wait_for_finished()
132
133 def test_thread_leak(self):
134 # The lock shouldn't leak a Thread instance when used from a foreign
135 # (non-threading) thread.
136 lock = self.locktype()
137 def f():
138 lock.acquire()
139 lock.release()
140 n = len(threading.enumerate())
141 # We run many threads in the hope that existing threads ids won't
142 # be recycled.
143 Bunch(f, 15).wait_for_finished()
144 self.assertEqual(n, len(threading.enumerate()))
145
146
147class LockTests(BaseLockTests):
148 """
149 Tests for non-recursive, weak locks
150 (which can be acquired and released from different threads).
151 """
152 def test_reacquire(self):
153 # Lock needs to be released before re-acquiring.
154 lock = self.locktype()
155 phase = []
156 def f():
157 lock.acquire()
158 phase.append(None)
159 lock.acquire()
160 phase.append(None)
161 start_new_thread(f, ())
162 while len(phase) == 0:
163 _wait()
164 _wait()
165 self.assertEqual(len(phase), 1)
166 lock.release()
167 while len(phase) == 1:
168 _wait()
169 self.assertEqual(len(phase), 2)
170
171 def test_different_thread(self):
172 # Lock can be released from a different thread.
173 lock = self.locktype()
174 lock.acquire()
175 def f():
176 lock.release()
177 b = Bunch(f, 1)
178 b.wait_for_finished()
179 lock.acquire()
180 lock.release()
181
182
183class RLockTests(BaseLockTests):
184 """
185 Tests for recursive locks.
186 """
187 def test_reacquire(self):
188 lock = self.locktype()
189 lock.acquire()
190 lock.acquire()
191 lock.release()
192 lock.acquire()
193 lock.release()
194 lock.release()
195
196 def test_release_unacquired(self):
197 # Cannot release an unacquired lock
198 lock = self.locktype()
199 self.assertRaises(RuntimeError, lock.release)
200 lock.acquire()
201 lock.acquire()
202 lock.release()
203 lock.acquire()
204 lock.release()
205 lock.release()
206 self.assertRaises(RuntimeError, lock.release)
207
208 def test_different_thread(self):
209 # Cannot release from a different thread
210 lock = self.locktype()
211 def f():
212 lock.acquire()
213 b = Bunch(f, 1, True)
214 try:
215 self.assertRaises(RuntimeError, lock.release)
216 finally:
217 b.do_finish()
218
219 def test__is_owned(self):
220 lock = self.locktype()
221 self.assertFalse(lock._is_owned())
222 lock.acquire()
223 self.assertTrue(lock._is_owned())
224 lock.acquire()
225 self.assertTrue(lock._is_owned())
226 result = []
227 def f():
228 result.append(lock._is_owned())
229 Bunch(f, 1).wait_for_finished()
230 self.assertFalse(result[0])
231 lock.release()
232 self.assertTrue(lock._is_owned())
233 lock.release()
234 self.assertFalse(lock._is_owned())
235
236
237class EventTests(BaseTestCase):
238 """
239 Tests for Event objects.
240 """
241
242 def test_is_set(self):
243 evt = self.eventtype()
244 self.assertFalse(evt.is_set())
245 evt.set()
246 self.assertTrue(evt.is_set())
247 evt.set()
248 self.assertTrue(evt.is_set())
249 evt.clear()
250 self.assertFalse(evt.is_set())
251 evt.clear()
252 self.assertFalse(evt.is_set())
253
254 def _check_notify(self, evt):
255 # All threads get notified
256 N = 5
257 results1 = []
258 results2 = []
259 def f():
260 evt.wait()
261 results1.append(evt.is_set())
262 evt.wait()
263 results2.append(evt.is_set())
264 b = Bunch(f, N)
265 b.wait_for_started()
266 _wait()
267 self.assertEqual(len(results1), 0)
268 evt.set()
269 b.wait_for_finished()
270 self.assertEqual(results1, [True] * N)
271 self.assertEqual(results2, [True] * N)
272
273 def test_notify(self):
274 evt = self.eventtype()
275 self._check_notify(evt)
276 # Another time, after an explicit clear()
277 evt.set()
278 evt.clear()
279 self._check_notify(evt)
280
281 def test_timeout(self):
282 evt = self.eventtype()
283 results1 = []
284 results2 = []
285 N = 5
286 def f():
287 evt.wait(0.0)
288 results1.append(evt.is_set())
289 t1 = time.time()
290 evt.wait(0.2)
291 r = evt.is_set()
292 t2 = time.time()
293 results2.append((r, t2 - t1))
294 Bunch(f, N).wait_for_finished()
295 self.assertEqual(results1, [False] * N)
296 for r, dt in results2:
297 self.assertFalse(r)
298 self.assertTrue(dt >= 0.2, dt)
299 # The event is set
300 results1 = []
301 results2 = []
302 evt.set()
303 Bunch(f, N).wait_for_finished()
304 self.assertEqual(results1, [True] * N)
305 for r, dt in results2:
306 self.assertTrue(r)
307
308
309class ConditionTests(BaseTestCase):
310 """
311 Tests for condition variables.
312 """
313
314 def test_acquire(self):
315 cond = self.condtype()
316 # Be default we have an RLock: the condition can be acquired multiple
317 # times.
318 cond.acquire()
319 cond.acquire()
320 cond.release()
321 cond.release()
322 lock = threading.Lock()
323 cond = self.condtype(lock)
324 cond.acquire()
325 self.assertFalse(lock.acquire(False))
326 cond.release()
327 self.assertTrue(lock.acquire(False))
328 self.assertFalse(cond.acquire(False))
329 lock.release()
330 with cond:
331 self.assertFalse(lock.acquire(False))
332
333 def test_unacquired_wait(self):
334 cond = self.condtype()
335 self.assertRaises(RuntimeError, cond.wait)
336
337 def test_unacquired_notify(self):
338 cond = self.condtype()
339 self.assertRaises(RuntimeError, cond.notify)
340
341 def _check_notify(self, cond):
342 N = 5
343 results1 = []
344 results2 = []
345 phase_num = 0
346 def f():
347 cond.acquire()
348 cond.wait()
349 cond.release()
350 results1.append(phase_num)
351 cond.acquire()
352 cond.wait()
353 cond.release()
354 results2.append(phase_num)
355 b = Bunch(f, N)
356 b.wait_for_started()
357 _wait()
358 self.assertEqual(results1, [])
359 # Notify 3 threads at first
360 cond.acquire()
361 cond.notify(3)
362 _wait()
363 phase_num = 1
364 cond.release()
365 while len(results1) < 3:
366 _wait()
367 self.assertEqual(results1, [1] * 3)
368 self.assertEqual(results2, [])
369 # Notify 5 threads: they might be in their first or second wait
370 cond.acquire()
371 cond.notify(5)
372 _wait()
373 phase_num = 2
374 cond.release()
375 while len(results1) + len(results2) < 8:
376 _wait()
377 self.assertEqual(results1, [1] * 3 + [2] * 2)
378 self.assertEqual(results2, [2] * 3)
379 # Notify all threads: they are all in their second wait
380 cond.acquire()
381 cond.notify_all()
382 _wait()
383 phase_num = 3
384 cond.release()
385 while len(results2) < 5:
386 _wait()
387 self.assertEqual(results1, [1] * 3 + [2] * 2)
388 self.assertEqual(results2, [2] * 3 + [3] * 2)
389 b.wait_for_finished()
390
391 def test_notify(self):
392 cond = self.condtype()
393 self._check_notify(cond)
394 # A second time, to check internal state is still ok.
395 self._check_notify(cond)
396
397 def test_timeout(self):
398 cond = self.condtype()
399 results = []
400 N = 5
401 def f():
402 cond.acquire()
403 t1 = time.time()
404 cond.wait(0.2)
405 t2 = time.time()
406 cond.release()
407 results.append(t2 - t1)
408 Bunch(f, N).wait_for_finished()
409 self.assertEqual(len(results), 5)
410 for dt in results:
411 self.assertTrue(dt >= 0.2, dt)
412
413
414class BaseSemaphoreTests(BaseTestCase):
415 """
416 Common tests for {bounded, unbounded} semaphore objects.
417 """
418
419 def test_constructor(self):
420 self.assertRaises(ValueError, self.semtype, value = -1)
421 self.assertRaises(ValueError, self.semtype, value = -sys.maxint)
422
423 def test_acquire(self):
424 sem = self.semtype(1)
425 sem.acquire()
426 sem.release()
427 sem = self.semtype(2)
428 sem.acquire()
429 sem.acquire()
430 sem.release()
431 sem.release()
432
433 def test_acquire_destroy(self):
434 sem = self.semtype()
435 sem.acquire()
436 del sem
437
438 def test_acquire_contended(self):
439 sem = self.semtype(7)
440 sem.acquire()
441 N = 10
442 results1 = []
443 results2 = []
444 phase_num = 0
445 def f():
446 sem.acquire()
447 results1.append(phase_num)
448 sem.acquire()
449 results2.append(phase_num)
450 b = Bunch(f, 10)
451 b.wait_for_started()
452 while len(results1) + len(results2) < 6:
453 _wait()
454 self.assertEqual(results1 + results2, [0] * 6)
455 phase_num = 1
456 for i in range(7):
457 sem.release()
458 while len(results1) + len(results2) < 13:
459 _wait()
460 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
461 phase_num = 2
462 for i in range(6):
463 sem.release()
464 while len(results1) + len(results2) < 19:
465 _wait()
466 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
467 # The semaphore is still locked
468 self.assertFalse(sem.acquire(False))
469 # Final release, to let the last thread finish
470 sem.release()
471 b.wait_for_finished()
472
473 def test_try_acquire(self):
474 sem = self.semtype(2)
475 self.assertTrue(sem.acquire(False))
476 self.assertTrue(sem.acquire(False))
477 self.assertFalse(sem.acquire(False))
478 sem.release()
479 self.assertTrue(sem.acquire(False))
480
481 def test_try_acquire_contended(self):
482 sem = self.semtype(4)
483 sem.acquire()
484 results = []
485 def f():
486 results.append(sem.acquire(False))
487 results.append(sem.acquire(False))
488 Bunch(f, 5).wait_for_finished()
489 # There can be a thread switch between acquiring the semaphore and
490 # appending the result, therefore results will not necessarily be
491 # ordered.
492 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
493
494 def test_default_value(self):
495 # The default initial value is 1.
496 sem = self.semtype()
497 sem.acquire()
498 def f():
499 sem.acquire()
500 sem.release()
501 b = Bunch(f, 1)
502 b.wait_for_started()
503 _wait()
504 self.assertFalse(b.finished)
505 sem.release()
506 b.wait_for_finished()
507
508 def test_with(self):
509 sem = self.semtype(2)
510 def _with(err=None):
511 with sem:
512 self.assertTrue(sem.acquire(False))
513 sem.release()
514 with sem:
515 self.assertFalse(sem.acquire(False))
516 if err:
517 raise err
518 _with()
519 self.assertTrue(sem.acquire(False))
520 sem.release()
521 self.assertRaises(TypeError, _with, TypeError)
522 self.assertTrue(sem.acquire(False))
523 sem.release()
524
525class SemaphoreTests(BaseSemaphoreTests):
526 """
527 Tests for unbounded semaphores.
528 """
529
530 def test_release_unacquired(self):
531 # Unbounded releases are allowed and increment the semaphore's value
532 sem = self.semtype(1)
533 sem.release()
534 sem.acquire()
535 sem.acquire()
536 sem.release()
537
538
539class BoundedSemaphoreTests(BaseSemaphoreTests):
540 """
541 Tests for bounded semaphores.
542 """
543
544 def test_release_unacquired(self):
545 # Cannot go past the initial value
546 sem = self.semtype()
547 self.assertRaises(ValueError, sem.release)
548 sem.acquire()
549 sem.release()
550 self.assertRaises(ValueError, sem.release)