blob: 136f1c37c62b9f2220098e066db373986032fb70 [file] [log] [blame]
Antoine Pitrou557934f2009-11-06 22:41:14 +00001"""
2Various tests for synchronization primitives.
3"""
4
5import sys
6import time
Victor Stinner2a129742011-05-30 23:02:52 +02007from _thread import start_new_thread, TIMEOUT_MAX
Antoine Pitrou557934f2009-11-06 22:41:14 +00008import threading
9import unittest
10
11from test import support
12
13
14def _wait():
15 # A crude wait/yield function not relying on synchronization primitives.
16 time.sleep(0.01)
17
18class Bunch(object):
19 """
20 A bunch of threads.
21 """
22 def __init__(self, f, n, wait_before_exit=False):
23 """
24 Construct a bunch of `n` threads running the same function `f`.
25 If `wait_before_exit` is True, the threads won't terminate until
26 do_finish() is called.
27 """
28 self.f = f
29 self.n = n
30 self.started = []
31 self.finished = []
32 self._can_exit = not wait_before_exit
33 def task():
Victor Stinner2a129742011-05-30 23:02:52 +020034 tid = threading.get_ident()
Antoine Pitrou557934f2009-11-06 22:41:14 +000035 self.started.append(tid)
36 try:
37 f()
38 finally:
39 self.finished.append(tid)
40 while not self._can_exit:
41 _wait()
42 for i in range(n):
43 start_new_thread(task, ())
44
45 def wait_for_started(self):
46 while len(self.started) < self.n:
47 _wait()
48
49 def wait_for_finished(self):
50 while len(self.finished) < self.n:
51 _wait()
52
53 def do_finish(self):
54 self._can_exit = True
55
56
57class BaseTestCase(unittest.TestCase):
58 def setUp(self):
59 self._threads = support.threading_setup()
60
61 def tearDown(self):
62 support.threading_cleanup(*self._threads)
63 support.reap_children()
64
Antoine Pitrou7c3e5772010-04-14 15:44:10 +000065 def assertTimeout(self, actual, expected):
66 # The waiting and/or time.time() can be imprecise, which
67 # is why comparing to the expected value would sometimes fail
68 # (especially under Windows).
69 self.assertGreaterEqual(actual, expected * 0.6)
70 # Test nothing insane happened
71 self.assertLess(actual, expected * 10.0)
72
Antoine Pitrou557934f2009-11-06 22:41:14 +000073
74class BaseLockTests(BaseTestCase):
75 """
76 Tests for both recursive and non-recursive locks.
77 """
78
79 def test_constructor(self):
80 lock = self.locktype()
81 del lock
82
Christian Heimesc5d95b12013-07-30 15:54:39 +020083 def test_repr(self):
84 lock = self.locktype()
Raymond Hettinger62f4dad2014-05-25 18:22:35 -070085 self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
86 del lock
87
88 def test_locked_repr(self):
89 lock = self.locktype()
90 lock.acquire()
91 self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
Christian Heimesc5d95b12013-07-30 15:54:39 +020092 del lock
93
Antoine Pitrou557934f2009-11-06 22:41:14 +000094 def test_acquire_destroy(self):
95 lock = self.locktype()
96 lock.acquire()
97 del lock
98
99 def test_acquire_release(self):
100 lock = self.locktype()
101 lock.acquire()
102 lock.release()
103 del lock
104
105 def test_try_acquire(self):
106 lock = self.locktype()
107 self.assertTrue(lock.acquire(False))
108 lock.release()
109
110 def test_try_acquire_contended(self):
111 lock = self.locktype()
112 lock.acquire()
113 result = []
114 def f():
115 result.append(lock.acquire(False))
116 Bunch(f, 1).wait_for_finished()
117 self.assertFalse(result[0])
118 lock.release()
119
120 def test_acquire_contended(self):
121 lock = self.locktype()
122 lock.acquire()
123 N = 5
124 def f():
125 lock.acquire()
126 lock.release()
127
128 b = Bunch(f, N)
129 b.wait_for_started()
130 _wait()
131 self.assertEqual(len(b.finished), 0)
132 lock.release()
133 b.wait_for_finished()
134 self.assertEqual(len(b.finished), N)
135
136 def test_with(self):
137 lock = self.locktype()
138 def f():
139 lock.acquire()
140 lock.release()
141 def _with(err=None):
142 with lock:
143 if err is not None:
144 raise err
145 _with()
146 # Check the lock is unacquired
147 Bunch(f, 1).wait_for_finished()
148 self.assertRaises(TypeError, _with, TypeError)
149 # Check the lock is unacquired
150 Bunch(f, 1).wait_for_finished()
151
Antoine Pitroub0872682009-11-09 16:08:16 +0000152 def test_thread_leak(self):
153 # The lock shouldn't leak a Thread instance when used from a foreign
154 # (non-threading) thread.
155 lock = self.locktype()
156 def f():
157 lock.acquire()
158 lock.release()
159 n = len(threading.enumerate())
160 # We run many threads in the hope that existing threads ids won't
161 # be recycled.
162 Bunch(f, 15).wait_for_finished()
Antoine Pitrou45fdb452011-04-04 21:59:09 +0200163 if len(threading.enumerate()) != n:
164 # There is a small window during which a Thread instance's
165 # target function has finished running, but the Thread is still
166 # alive and registered. Avoid spurious failures by waiting a
167 # bit more (seen on a buildbot).
168 time.sleep(0.4)
169 self.assertEqual(n, len(threading.enumerate()))
Antoine Pitroub0872682009-11-09 16:08:16 +0000170
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000171 def test_timeout(self):
172 lock = self.locktype()
173 # Can't set timeout if not blocking
174 self.assertRaises(ValueError, lock.acquire, 0, 1)
175 # Invalid timeout values
176 self.assertRaises(ValueError, lock.acquire, timeout=-100)
177 self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
178 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
179 # TIMEOUT_MAX is ok
180 lock.acquire(timeout=TIMEOUT_MAX)
181 lock.release()
182 t1 = time.time()
183 self.assertTrue(lock.acquire(timeout=5))
184 t2 = time.time()
185 # Just a sanity test that it didn't actually wait for the timeout.
186 self.assertLess(t2 - t1, 5)
187 results = []
188 def f():
189 t1 = time.time()
190 results.append(lock.acquire(timeout=0.5))
191 t2 = time.time()
192 results.append(t2 - t1)
193 Bunch(f, 1).wait_for_finished()
194 self.assertFalse(results[0])
195 self.assertTimeout(results[1], 0.5)
196
Antoine Pitrou557934f2009-11-06 22:41:14 +0000197
198class LockTests(BaseLockTests):
199 """
200 Tests for non-recursive, weak locks
201 (which can be acquired and released from different threads).
202 """
203 def test_reacquire(self):
204 # Lock needs to be released before re-acquiring.
205 lock = self.locktype()
206 phase = []
207 def f():
208 lock.acquire()
209 phase.append(None)
210 lock.acquire()
211 phase.append(None)
212 start_new_thread(f, ())
213 while len(phase) == 0:
214 _wait()
215 _wait()
216 self.assertEqual(len(phase), 1)
217 lock.release()
218 while len(phase) == 1:
219 _wait()
220 self.assertEqual(len(phase), 2)
221
222 def test_different_thread(self):
223 # Lock can be released from a different thread.
224 lock = self.locktype()
225 lock.acquire()
226 def f():
227 lock.release()
228 b = Bunch(f, 1)
229 b.wait_for_finished()
230 lock.acquire()
231 lock.release()
232
Antoine Pitrou7899acf2011-03-31 01:00:32 +0200233 def test_state_after_timeout(self):
234 # Issue #11618: check that lock is in a proper state after a
235 # (non-zero) timeout.
236 lock = self.locktype()
237 lock.acquire()
238 self.assertFalse(lock.acquire(timeout=0.01))
239 lock.release()
240 self.assertFalse(lock.locked())
241 self.assertTrue(lock.acquire(blocking=False))
242
Antoine Pitrou557934f2009-11-06 22:41:14 +0000243
244class RLockTests(BaseLockTests):
245 """
246 Tests for recursive locks.
247 """
248 def test_reacquire(self):
249 lock = self.locktype()
250 lock.acquire()
251 lock.acquire()
252 lock.release()
253 lock.acquire()
254 lock.release()
255 lock.release()
256
257 def test_release_unacquired(self):
258 # Cannot release an unacquired lock
259 lock = self.locktype()
260 self.assertRaises(RuntimeError, lock.release)
261 lock.acquire()
262 lock.acquire()
263 lock.release()
264 lock.acquire()
265 lock.release()
266 lock.release()
267 self.assertRaises(RuntimeError, lock.release)
Antoine Pitrouea3eb882012-05-17 18:55:59 +0200268
269 def test_release_save_unacquired(self):
270 # Cannot _release_save an unacquired lock
271 lock = self.locktype()
272 self.assertRaises(RuntimeError, lock._release_save)
273 lock.acquire()
274 lock.acquire()
275 lock.release()
276 lock.acquire()
277 lock.release()
278 lock.release()
Victor Stinnerc2824d42011-04-24 23:41:33 +0200279 self.assertRaises(RuntimeError, lock._release_save)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000280
281 def test_different_thread(self):
282 # Cannot release from a different thread
283 lock = self.locktype()
284 def f():
285 lock.acquire()
286 b = Bunch(f, 1, True)
287 try:
288 self.assertRaises(RuntimeError, lock.release)
289 finally:
290 b.do_finish()
291
292 def test__is_owned(self):
293 lock = self.locktype()
294 self.assertFalse(lock._is_owned())
295 lock.acquire()
296 self.assertTrue(lock._is_owned())
297 lock.acquire()
298 self.assertTrue(lock._is_owned())
299 result = []
300 def f():
301 result.append(lock._is_owned())
302 Bunch(f, 1).wait_for_finished()
303 self.assertFalse(result[0])
304 lock.release()
305 self.assertTrue(lock._is_owned())
306 lock.release()
307 self.assertFalse(lock._is_owned())
308
309
310class EventTests(BaseTestCase):
311 """
312 Tests for Event objects.
313 """
314
315 def test_is_set(self):
316 evt = self.eventtype()
317 self.assertFalse(evt.is_set())
318 evt.set()
319 self.assertTrue(evt.is_set())
320 evt.set()
321 self.assertTrue(evt.is_set())
322 evt.clear()
323 self.assertFalse(evt.is_set())
324 evt.clear()
325 self.assertFalse(evt.is_set())
326
327 def _check_notify(self, evt):
328 # All threads get notified
329 N = 5
330 results1 = []
331 results2 = []
332 def f():
333 results1.append(evt.wait())
334 results2.append(evt.wait())
335 b = Bunch(f, N)
336 b.wait_for_started()
337 _wait()
338 self.assertEqual(len(results1), 0)
339 evt.set()
340 b.wait_for_finished()
341 self.assertEqual(results1, [True] * N)
342 self.assertEqual(results2, [True] * N)
343
344 def test_notify(self):
345 evt = self.eventtype()
346 self._check_notify(evt)
347 # Another time, after an explicit clear()
348 evt.set()
349 evt.clear()
350 self._check_notify(evt)
351
352 def test_timeout(self):
353 evt = self.eventtype()
354 results1 = []
355 results2 = []
356 N = 5
357 def f():
358 results1.append(evt.wait(0.0))
359 t1 = time.time()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000360 r = evt.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000361 t2 = time.time()
362 results2.append((r, t2 - t1))
363 Bunch(f, N).wait_for_finished()
364 self.assertEqual(results1, [False] * N)
365 for r, dt in results2:
366 self.assertFalse(r)
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000367 self.assertTimeout(dt, 0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000368 # The event is set
369 results1 = []
370 results2 = []
371 evt.set()
372 Bunch(f, N).wait_for_finished()
373 self.assertEqual(results1, [True] * N)
374 for r, dt in results2:
375 self.assertTrue(r)
376
Charles-François Natalided03482012-01-07 18:24:56 +0100377 def test_set_and_clear(self):
378 # Issue #13502: check that wait() returns true even when the event is
379 # cleared before the waiting thread is woken up.
380 evt = self.eventtype()
381 results = []
382 N = 5
383 def f():
384 results.append(evt.wait(1))
385 b = Bunch(f, N)
386 b.wait_for_started()
387 time.sleep(0.5)
388 evt.set()
389 evt.clear()
390 b.wait_for_finished()
391 self.assertEqual(results, [True] * N)
392
Antoine Pitrou557934f2009-11-06 22:41:14 +0000393
394class ConditionTests(BaseTestCase):
395 """
396 Tests for condition variables.
397 """
398
399 def test_acquire(self):
400 cond = self.condtype()
401 # Be default we have an RLock: the condition can be acquired multiple
402 # times.
403 cond.acquire()
404 cond.acquire()
405 cond.release()
406 cond.release()
407 lock = threading.Lock()
408 cond = self.condtype(lock)
409 cond.acquire()
410 self.assertFalse(lock.acquire(False))
411 cond.release()
412 self.assertTrue(lock.acquire(False))
413 self.assertFalse(cond.acquire(False))
414 lock.release()
415 with cond:
416 self.assertFalse(lock.acquire(False))
417
418 def test_unacquired_wait(self):
419 cond = self.condtype()
420 self.assertRaises(RuntimeError, cond.wait)
421
422 def test_unacquired_notify(self):
423 cond = self.condtype()
424 self.assertRaises(RuntimeError, cond.notify)
425
426 def _check_notify(self, cond):
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000427 # Note that this test is sensitive to timing. If the worker threads
428 # don't execute in a timely fashion, the main thread may think they
429 # are further along then they are. The main thread therefore issues
430 # _wait() statements to try to make sure that it doesn't race ahead
431 # of the workers.
432 # Secondly, this test assumes that condition variables are not subject
433 # to spurious wakeups. The absence of spurious wakeups is an implementation
434 # detail of Condition Cariables in current CPython, but in general, not
435 # a guaranteed property of condition variables as a programming
436 # construct. In particular, it is possible that this can no longer
437 # be conveniently guaranteed should their implementation ever change.
Antoine Pitrou557934f2009-11-06 22:41:14 +0000438 N = 5
439 results1 = []
440 results2 = []
441 phase_num = 0
442 def f():
443 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000444 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000445 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000446 results1.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000447 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000448 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000449 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000450 results2.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000451 b = Bunch(f, N)
452 b.wait_for_started()
453 _wait()
454 self.assertEqual(results1, [])
455 # Notify 3 threads at first
456 cond.acquire()
457 cond.notify(3)
458 _wait()
459 phase_num = 1
460 cond.release()
461 while len(results1) < 3:
462 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000463 self.assertEqual(results1, [(True, 1)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000464 self.assertEqual(results2, [])
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000465 # first wait, to ensure all workers settle into cond.wait() before
466 # we continue. See issue #8799
467 _wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000468 # Notify 5 threads: they might be in their first or second wait
469 cond.acquire()
470 cond.notify(5)
471 _wait()
472 phase_num = 2
473 cond.release()
474 while len(results1) + len(results2) < 8:
475 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000476 self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
477 self.assertEqual(results2, [(True, 2)] * 3)
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000478 _wait() # make sure all workers settle into cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000479 # Notify all threads: they are all in their second wait
480 cond.acquire()
481 cond.notify_all()
482 _wait()
483 phase_num = 3
484 cond.release()
485 while len(results2) < 5:
486 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000487 self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
488 self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000489 b.wait_for_finished()
490
491 def test_notify(self):
492 cond = self.condtype()
493 self._check_notify(cond)
494 # A second time, to check internal state is still ok.
495 self._check_notify(cond)
496
497 def test_timeout(self):
498 cond = self.condtype()
499 results = []
500 N = 5
501 def f():
502 cond.acquire()
503 t1 = time.time()
Georg Brandlb9a43912010-10-28 09:03:20 +0000504 result = cond.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000505 t2 = time.time()
506 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000507 results.append((t2 - t1, result))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000508 Bunch(f, N).wait_for_finished()
Georg Brandlb9a43912010-10-28 09:03:20 +0000509 self.assertEqual(len(results), N)
510 for dt, result in results:
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000511 self.assertTimeout(dt, 0.5)
Georg Brandlb9a43912010-10-28 09:03:20 +0000512 # Note that conceptually (that"s the condition variable protocol)
513 # a wait() may succeed even if no one notifies us and before any
514 # timeout occurs. Spurious wakeups can occur.
515 # This makes it hard to verify the result value.
516 # In practice, this implementation has no spurious wakeups.
517 self.assertFalse(result)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000518
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000519 def test_waitfor(self):
520 cond = self.condtype()
521 state = 0
522 def f():
523 with cond:
524 result = cond.wait_for(lambda : state==4)
525 self.assertTrue(result)
526 self.assertEqual(state, 4)
527 b = Bunch(f, 1)
528 b.wait_for_started()
Victor Stinner3349bca2011-05-18 00:16:14 +0200529 for i in range(4):
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000530 time.sleep(0.01)
531 with cond:
532 state += 1
533 cond.notify()
534 b.wait_for_finished()
535
536 def test_waitfor_timeout(self):
537 cond = self.condtype()
538 state = 0
539 success = []
540 def f():
541 with cond:
542 dt = time.time()
543 result = cond.wait_for(lambda : state==4, timeout=0.1)
544 dt = time.time() - dt
545 self.assertFalse(result)
546 self.assertTimeout(dt, 0.1)
547 success.append(None)
548 b = Bunch(f, 1)
549 b.wait_for_started()
550 # Only increment 3 times, so state == 4 is never reached.
551 for i in range(3):
552 time.sleep(0.01)
553 with cond:
554 state += 1
555 cond.notify()
556 b.wait_for_finished()
557 self.assertEqual(len(success), 1)
558
Antoine Pitrou557934f2009-11-06 22:41:14 +0000559
560class BaseSemaphoreTests(BaseTestCase):
561 """
562 Common tests for {bounded, unbounded} semaphore objects.
563 """
564
565 def test_constructor(self):
566 self.assertRaises(ValueError, self.semtype, value = -1)
567 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
568
569 def test_acquire(self):
570 sem = self.semtype(1)
571 sem.acquire()
572 sem.release()
573 sem = self.semtype(2)
574 sem.acquire()
575 sem.acquire()
576 sem.release()
577 sem.release()
578
579 def test_acquire_destroy(self):
580 sem = self.semtype()
581 sem.acquire()
582 del sem
583
584 def test_acquire_contended(self):
585 sem = self.semtype(7)
586 sem.acquire()
587 N = 10
588 results1 = []
589 results2 = []
590 phase_num = 0
591 def f():
592 sem.acquire()
593 results1.append(phase_num)
594 sem.acquire()
595 results2.append(phase_num)
596 b = Bunch(f, 10)
597 b.wait_for_started()
598 while len(results1) + len(results2) < 6:
599 _wait()
600 self.assertEqual(results1 + results2, [0] * 6)
601 phase_num = 1
602 for i in range(7):
603 sem.release()
604 while len(results1) + len(results2) < 13:
605 _wait()
606 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
607 phase_num = 2
608 for i in range(6):
609 sem.release()
610 while len(results1) + len(results2) < 19:
611 _wait()
612 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
613 # The semaphore is still locked
614 self.assertFalse(sem.acquire(False))
615 # Final release, to let the last thread finish
616 sem.release()
617 b.wait_for_finished()
618
619 def test_try_acquire(self):
620 sem = self.semtype(2)
621 self.assertTrue(sem.acquire(False))
622 self.assertTrue(sem.acquire(False))
623 self.assertFalse(sem.acquire(False))
624 sem.release()
625 self.assertTrue(sem.acquire(False))
626
627 def test_try_acquire_contended(self):
628 sem = self.semtype(4)
629 sem.acquire()
630 results = []
631 def f():
632 results.append(sem.acquire(False))
633 results.append(sem.acquire(False))
634 Bunch(f, 5).wait_for_finished()
635 # There can be a thread switch between acquiring the semaphore and
636 # appending the result, therefore results will not necessarily be
637 # ordered.
638 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
639
Antoine Pitrou0454af92010-04-17 23:51:58 +0000640 def test_acquire_timeout(self):
641 sem = self.semtype(2)
642 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
643 self.assertTrue(sem.acquire(timeout=0.005))
644 self.assertTrue(sem.acquire(timeout=0.005))
645 self.assertFalse(sem.acquire(timeout=0.005))
646 sem.release()
647 self.assertTrue(sem.acquire(timeout=0.005))
648 t = time.time()
649 self.assertFalse(sem.acquire(timeout=0.5))
650 dt = time.time() - t
651 self.assertTimeout(dt, 0.5)
652
Antoine Pitrou557934f2009-11-06 22:41:14 +0000653 def test_default_value(self):
654 # The default initial value is 1.
655 sem = self.semtype()
656 sem.acquire()
657 def f():
658 sem.acquire()
659 sem.release()
660 b = Bunch(f, 1)
661 b.wait_for_started()
662 _wait()
663 self.assertFalse(b.finished)
664 sem.release()
665 b.wait_for_finished()
666
667 def test_with(self):
668 sem = self.semtype(2)
669 def _with(err=None):
670 with sem:
671 self.assertTrue(sem.acquire(False))
672 sem.release()
673 with sem:
674 self.assertFalse(sem.acquire(False))
675 if err:
676 raise err
677 _with()
678 self.assertTrue(sem.acquire(False))
679 sem.release()
680 self.assertRaises(TypeError, _with, TypeError)
681 self.assertTrue(sem.acquire(False))
682 sem.release()
683
684class SemaphoreTests(BaseSemaphoreTests):
685 """
686 Tests for unbounded semaphores.
687 """
688
689 def test_release_unacquired(self):
690 # Unbounded releases are allowed and increment the semaphore's value
691 sem = self.semtype(1)
692 sem.release()
693 sem.acquire()
694 sem.acquire()
695 sem.release()
696
697
698class BoundedSemaphoreTests(BaseSemaphoreTests):
699 """
700 Tests for bounded semaphores.
701 """
702
703 def test_release_unacquired(self):
704 # Cannot go past the initial value
705 sem = self.semtype()
706 self.assertRaises(ValueError, sem.release)
707 sem.acquire()
708 sem.release()
709 self.assertRaises(ValueError, sem.release)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000710
711
712class BarrierTests(BaseTestCase):
713 """
714 Tests for Barrier objects.
715 """
716 N = 5
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000717 defaultTimeout = 2.0
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000718
719 def setUp(self):
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000720 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000721 def tearDown(self):
722 self.barrier.abort()
723
724 def run_threads(self, f):
725 b = Bunch(f, self.N-1)
726 f()
727 b.wait_for_finished()
728
729 def multipass(self, results, n):
730 m = self.barrier.parties
731 self.assertEqual(m, self.N)
732 for i in range(n):
733 results[0].append(True)
734 self.assertEqual(len(results[1]), i * m)
735 self.barrier.wait()
736 results[1].append(True)
737 self.assertEqual(len(results[0]), (i + 1) * m)
738 self.barrier.wait()
739 self.assertEqual(self.barrier.n_waiting, 0)
740 self.assertFalse(self.barrier.broken)
741
742 def test_barrier(self, passes=1):
743 """
744 Test that a barrier is passed in lockstep
745 """
746 results = [[],[]]
747 def f():
748 self.multipass(results, passes)
749 self.run_threads(f)
750
751 def test_barrier_10(self):
752 """
753 Test that a barrier works for 10 consecutive runs
754 """
755 return self.test_barrier(10)
756
757 def test_wait_return(self):
758 """
759 test the return value from barrier.wait
760 """
761 results = []
762 def f():
763 r = self.barrier.wait()
764 results.append(r)
765
766 self.run_threads(f)
767 self.assertEqual(sum(results), sum(range(self.N)))
768
769 def test_action(self):
770 """
771 Test the 'action' callback
772 """
773 results = []
774 def action():
775 results.append(True)
776 barrier = self.barriertype(self.N, action)
777 def f():
778 barrier.wait()
779 self.assertEqual(len(results), 1)
780
781 self.run_threads(f)
782
783 def test_abort(self):
784 """
785 Test that an abort will put the barrier in a broken state
786 """
787 results1 = []
788 results2 = []
789 def f():
790 try:
791 i = self.barrier.wait()
792 if i == self.N//2:
793 raise RuntimeError
794 self.barrier.wait()
795 results1.append(True)
796 except threading.BrokenBarrierError:
797 results2.append(True)
798 except RuntimeError:
799 self.barrier.abort()
800 pass
801
802 self.run_threads(f)
803 self.assertEqual(len(results1), 0)
804 self.assertEqual(len(results2), self.N-1)
805 self.assertTrue(self.barrier.broken)
806
807 def test_reset(self):
808 """
809 Test that a 'reset' on a barrier frees the waiting threads
810 """
811 results1 = []
812 results2 = []
813 results3 = []
814 def f():
815 i = self.barrier.wait()
816 if i == self.N//2:
817 # Wait until the other threads are all in the barrier.
818 while self.barrier.n_waiting < self.N-1:
819 time.sleep(0.001)
820 self.barrier.reset()
821 else:
822 try:
823 self.barrier.wait()
824 results1.append(True)
825 except threading.BrokenBarrierError:
826 results2.append(True)
827 # Now, pass the barrier again
828 self.barrier.wait()
829 results3.append(True)
830
831 self.run_threads(f)
832 self.assertEqual(len(results1), 0)
833 self.assertEqual(len(results2), self.N-1)
834 self.assertEqual(len(results3), self.N)
835
836
837 def test_abort_and_reset(self):
838 """
839 Test that a barrier can be reset after being broken.
840 """
841 results1 = []
842 results2 = []
843 results3 = []
844 barrier2 = self.barriertype(self.N)
845 def f():
846 try:
847 i = self.barrier.wait()
848 if i == self.N//2:
849 raise RuntimeError
850 self.barrier.wait()
851 results1.append(True)
852 except threading.BrokenBarrierError:
853 results2.append(True)
854 except RuntimeError:
855 self.barrier.abort()
856 pass
857 # Synchronize and reset the barrier. Must synchronize first so
858 # that everyone has left it when we reset, and after so that no
859 # one enters it before the reset.
860 if barrier2.wait() == self.N//2:
861 self.barrier.reset()
862 barrier2.wait()
863 self.barrier.wait()
864 results3.append(True)
865
866 self.run_threads(f)
867 self.assertEqual(len(results1), 0)
868 self.assertEqual(len(results2), self.N-1)
869 self.assertEqual(len(results3), self.N)
870
871 def test_timeout(self):
872 """
873 Test wait(timeout)
874 """
875 def f():
876 i = self.barrier.wait()
877 if i == self.N // 2:
878 # One thread is late!
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000879 time.sleep(1.0)
880 # Default timeout is 2.0, so this is shorter.
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000881 self.assertRaises(threading.BrokenBarrierError,
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000882 self.barrier.wait, 0.5)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000883 self.run_threads(f)
884
885 def test_default_timeout(self):
886 """
887 Test the barrier's default timeout
888 """
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200889 # create a barrier with a low default timeout
890 barrier = self.barriertype(self.N, timeout=0.3)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000891 def f():
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000892 i = barrier.wait()
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000893 if i == self.N // 2:
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200894 # One thread is later than the default timeout of 0.3s.
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000895 time.sleep(1.0)
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000896 self.assertRaises(threading.BrokenBarrierError, barrier.wait)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000897 self.run_threads(f)
898
899 def test_single_thread(self):
900 b = self.barriertype(1)
901 b.wait()
902 b.wait()