blob: 1cbcea2343968aca703a7f6f5e50de33fa4061b1 [file] [log] [blame]
Antoine Pitrou557934f2009-11-06 22:41:14 +00001"""
2Various tests for synchronization primitives.
3"""
4
5import sys
6import time
Victor Stinner2a129742011-05-30 23:02:52 +02007from _thread import start_new_thread, TIMEOUT_MAX
Antoine Pitrou557934f2009-11-06 22:41:14 +00008import threading
9import unittest
10
11from test import support
12
13
14def _wait():
15 # A crude wait/yield function not relying on synchronization primitives.
16 time.sleep(0.01)
17
18class Bunch(object):
19 """
20 A bunch of threads.
21 """
22 def __init__(self, f, n, wait_before_exit=False):
23 """
24 Construct a bunch of `n` threads running the same function `f`.
25 If `wait_before_exit` is True, the threads won't terminate until
26 do_finish() is called.
27 """
28 self.f = f
29 self.n = n
30 self.started = []
31 self.finished = []
32 self._can_exit = not wait_before_exit
33 def task():
Victor Stinner2a129742011-05-30 23:02:52 +020034 tid = threading.get_ident()
Antoine Pitrou557934f2009-11-06 22:41:14 +000035 self.started.append(tid)
36 try:
37 f()
38 finally:
39 self.finished.append(tid)
40 while not self._can_exit:
41 _wait()
42 for i in range(n):
43 start_new_thread(task, ())
44
45 def wait_for_started(self):
46 while len(self.started) < self.n:
47 _wait()
48
49 def wait_for_finished(self):
50 while len(self.finished) < self.n:
51 _wait()
52
53 def do_finish(self):
54 self._can_exit = True
55
56
57class BaseTestCase(unittest.TestCase):
58 def setUp(self):
59 self._threads = support.threading_setup()
60
61 def tearDown(self):
62 support.threading_cleanup(*self._threads)
63 support.reap_children()
64
Antoine Pitrou7c3e5772010-04-14 15:44:10 +000065 def assertTimeout(self, actual, expected):
66 # The waiting and/or time.time() can be imprecise, which
67 # is why comparing to the expected value would sometimes fail
68 # (especially under Windows).
69 self.assertGreaterEqual(actual, expected * 0.6)
70 # Test nothing insane happened
71 self.assertLess(actual, expected * 10.0)
72
Antoine Pitrou557934f2009-11-06 22:41:14 +000073
74class BaseLockTests(BaseTestCase):
75 """
76 Tests for both recursive and non-recursive locks.
77 """
78
79 def test_constructor(self):
80 lock = self.locktype()
81 del lock
82
Christian Heimesc5d95b12013-07-30 15:54:39 +020083 def test_repr(self):
84 lock = self.locktype()
85 repr(lock)
86 del lock
87
Antoine Pitrou557934f2009-11-06 22:41:14 +000088 def test_acquire_destroy(self):
89 lock = self.locktype()
90 lock.acquire()
91 del lock
92
93 def test_acquire_release(self):
94 lock = self.locktype()
95 lock.acquire()
96 lock.release()
97 del lock
98
99 def test_try_acquire(self):
100 lock = self.locktype()
101 self.assertTrue(lock.acquire(False))
102 lock.release()
103
104 def test_try_acquire_contended(self):
105 lock = self.locktype()
106 lock.acquire()
107 result = []
108 def f():
109 result.append(lock.acquire(False))
110 Bunch(f, 1).wait_for_finished()
111 self.assertFalse(result[0])
112 lock.release()
113
114 def test_acquire_contended(self):
115 lock = self.locktype()
116 lock.acquire()
117 N = 5
118 def f():
119 lock.acquire()
120 lock.release()
121
122 b = Bunch(f, N)
123 b.wait_for_started()
124 _wait()
125 self.assertEqual(len(b.finished), 0)
126 lock.release()
127 b.wait_for_finished()
128 self.assertEqual(len(b.finished), N)
129
130 def test_with(self):
131 lock = self.locktype()
132 def f():
133 lock.acquire()
134 lock.release()
135 def _with(err=None):
136 with lock:
137 if err is not None:
138 raise err
139 _with()
140 # Check the lock is unacquired
141 Bunch(f, 1).wait_for_finished()
142 self.assertRaises(TypeError, _with, TypeError)
143 # Check the lock is unacquired
144 Bunch(f, 1).wait_for_finished()
145
Antoine Pitroub0872682009-11-09 16:08:16 +0000146 def test_thread_leak(self):
147 # The lock shouldn't leak a Thread instance when used from a foreign
148 # (non-threading) thread.
149 lock = self.locktype()
150 def f():
151 lock.acquire()
152 lock.release()
153 n = len(threading.enumerate())
154 # We run many threads in the hope that existing threads ids won't
155 # be recycled.
156 Bunch(f, 15).wait_for_finished()
Antoine Pitrou45fdb452011-04-04 21:59:09 +0200157 if len(threading.enumerate()) != n:
158 # There is a small window during which a Thread instance's
159 # target function has finished running, but the Thread is still
160 # alive and registered. Avoid spurious failures by waiting a
161 # bit more (seen on a buildbot).
162 time.sleep(0.4)
163 self.assertEqual(n, len(threading.enumerate()))
Antoine Pitroub0872682009-11-09 16:08:16 +0000164
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000165 def test_timeout(self):
166 lock = self.locktype()
167 # Can't set timeout if not blocking
168 self.assertRaises(ValueError, lock.acquire, 0, 1)
169 # Invalid timeout values
170 self.assertRaises(ValueError, lock.acquire, timeout=-100)
171 self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
172 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
173 # TIMEOUT_MAX is ok
174 lock.acquire(timeout=TIMEOUT_MAX)
175 lock.release()
176 t1 = time.time()
177 self.assertTrue(lock.acquire(timeout=5))
178 t2 = time.time()
179 # Just a sanity test that it didn't actually wait for the timeout.
180 self.assertLess(t2 - t1, 5)
181 results = []
182 def f():
183 t1 = time.time()
184 results.append(lock.acquire(timeout=0.5))
185 t2 = time.time()
186 results.append(t2 - t1)
187 Bunch(f, 1).wait_for_finished()
188 self.assertFalse(results[0])
189 self.assertTimeout(results[1], 0.5)
190
Antoine Pitrou557934f2009-11-06 22:41:14 +0000191
192class LockTests(BaseLockTests):
193 """
194 Tests for non-recursive, weak locks
195 (which can be acquired and released from different threads).
196 """
197 def test_reacquire(self):
198 # Lock needs to be released before re-acquiring.
199 lock = self.locktype()
200 phase = []
201 def f():
202 lock.acquire()
203 phase.append(None)
204 lock.acquire()
205 phase.append(None)
206 start_new_thread(f, ())
207 while len(phase) == 0:
208 _wait()
209 _wait()
210 self.assertEqual(len(phase), 1)
211 lock.release()
212 while len(phase) == 1:
213 _wait()
214 self.assertEqual(len(phase), 2)
215
216 def test_different_thread(self):
217 # Lock can be released from a different thread.
218 lock = self.locktype()
219 lock.acquire()
220 def f():
221 lock.release()
222 b = Bunch(f, 1)
223 b.wait_for_finished()
224 lock.acquire()
225 lock.release()
226
Antoine Pitrou7899acf2011-03-31 01:00:32 +0200227 def test_state_after_timeout(self):
228 # Issue #11618: check that lock is in a proper state after a
229 # (non-zero) timeout.
230 lock = self.locktype()
231 lock.acquire()
232 self.assertFalse(lock.acquire(timeout=0.01))
233 lock.release()
234 self.assertFalse(lock.locked())
235 self.assertTrue(lock.acquire(blocking=False))
236
Antoine Pitrou557934f2009-11-06 22:41:14 +0000237
238class RLockTests(BaseLockTests):
239 """
240 Tests for recursive locks.
241 """
242 def test_reacquire(self):
243 lock = self.locktype()
244 lock.acquire()
245 lock.acquire()
246 lock.release()
247 lock.acquire()
248 lock.release()
249 lock.release()
250
251 def test_release_unacquired(self):
252 # Cannot release an unacquired lock
253 lock = self.locktype()
254 self.assertRaises(RuntimeError, lock.release)
255 lock.acquire()
256 lock.acquire()
257 lock.release()
258 lock.acquire()
259 lock.release()
260 lock.release()
261 self.assertRaises(RuntimeError, lock.release)
Antoine Pitrouea3eb882012-05-17 18:55:59 +0200262
263 def test_release_save_unacquired(self):
264 # Cannot _release_save an unacquired lock
265 lock = self.locktype()
266 self.assertRaises(RuntimeError, lock._release_save)
267 lock.acquire()
268 lock.acquire()
269 lock.release()
270 lock.acquire()
271 lock.release()
272 lock.release()
Victor Stinnerc2824d42011-04-24 23:41:33 +0200273 self.assertRaises(RuntimeError, lock._release_save)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000274
275 def test_different_thread(self):
276 # Cannot release from a different thread
277 lock = self.locktype()
278 def f():
279 lock.acquire()
280 b = Bunch(f, 1, True)
281 try:
282 self.assertRaises(RuntimeError, lock.release)
283 finally:
284 b.do_finish()
285
286 def test__is_owned(self):
287 lock = self.locktype()
288 self.assertFalse(lock._is_owned())
289 lock.acquire()
290 self.assertTrue(lock._is_owned())
291 lock.acquire()
292 self.assertTrue(lock._is_owned())
293 result = []
294 def f():
295 result.append(lock._is_owned())
296 Bunch(f, 1).wait_for_finished()
297 self.assertFalse(result[0])
298 lock.release()
299 self.assertTrue(lock._is_owned())
300 lock.release()
301 self.assertFalse(lock._is_owned())
302
303
304class EventTests(BaseTestCase):
305 """
306 Tests for Event objects.
307 """
308
309 def test_is_set(self):
310 evt = self.eventtype()
311 self.assertFalse(evt.is_set())
312 evt.set()
313 self.assertTrue(evt.is_set())
314 evt.set()
315 self.assertTrue(evt.is_set())
316 evt.clear()
317 self.assertFalse(evt.is_set())
318 evt.clear()
319 self.assertFalse(evt.is_set())
320
321 def _check_notify(self, evt):
322 # All threads get notified
323 N = 5
324 results1 = []
325 results2 = []
326 def f():
327 results1.append(evt.wait())
328 results2.append(evt.wait())
329 b = Bunch(f, N)
330 b.wait_for_started()
331 _wait()
332 self.assertEqual(len(results1), 0)
333 evt.set()
334 b.wait_for_finished()
335 self.assertEqual(results1, [True] * N)
336 self.assertEqual(results2, [True] * N)
337
338 def test_notify(self):
339 evt = self.eventtype()
340 self._check_notify(evt)
341 # Another time, after an explicit clear()
342 evt.set()
343 evt.clear()
344 self._check_notify(evt)
345
346 def test_timeout(self):
347 evt = self.eventtype()
348 results1 = []
349 results2 = []
350 N = 5
351 def f():
352 results1.append(evt.wait(0.0))
353 t1 = time.time()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000354 r = evt.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000355 t2 = time.time()
356 results2.append((r, t2 - t1))
357 Bunch(f, N).wait_for_finished()
358 self.assertEqual(results1, [False] * N)
359 for r, dt in results2:
360 self.assertFalse(r)
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000361 self.assertTimeout(dt, 0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000362 # The event is set
363 results1 = []
364 results2 = []
365 evt.set()
366 Bunch(f, N).wait_for_finished()
367 self.assertEqual(results1, [True] * N)
368 for r, dt in results2:
369 self.assertTrue(r)
370
Charles-François Natalided03482012-01-07 18:24:56 +0100371 def test_set_and_clear(self):
372 # Issue #13502: check that wait() returns true even when the event is
373 # cleared before the waiting thread is woken up.
374 evt = self.eventtype()
375 results = []
376 N = 5
377 def f():
378 results.append(evt.wait(1))
379 b = Bunch(f, N)
380 b.wait_for_started()
381 time.sleep(0.5)
382 evt.set()
383 evt.clear()
384 b.wait_for_finished()
385 self.assertEqual(results, [True] * N)
386
Antoine Pitrou557934f2009-11-06 22:41:14 +0000387
388class ConditionTests(BaseTestCase):
389 """
390 Tests for condition variables.
391 """
392
393 def test_acquire(self):
394 cond = self.condtype()
395 # Be default we have an RLock: the condition can be acquired multiple
396 # times.
397 cond.acquire()
398 cond.acquire()
399 cond.release()
400 cond.release()
401 lock = threading.Lock()
402 cond = self.condtype(lock)
403 cond.acquire()
404 self.assertFalse(lock.acquire(False))
405 cond.release()
406 self.assertTrue(lock.acquire(False))
407 self.assertFalse(cond.acquire(False))
408 lock.release()
409 with cond:
410 self.assertFalse(lock.acquire(False))
411
412 def test_unacquired_wait(self):
413 cond = self.condtype()
414 self.assertRaises(RuntimeError, cond.wait)
415
416 def test_unacquired_notify(self):
417 cond = self.condtype()
418 self.assertRaises(RuntimeError, cond.notify)
419
420 def _check_notify(self, cond):
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000421 # Note that this test is sensitive to timing. If the worker threads
422 # don't execute in a timely fashion, the main thread may think they
423 # are further along then they are. The main thread therefore issues
424 # _wait() statements to try to make sure that it doesn't race ahead
425 # of the workers.
426 # Secondly, this test assumes that condition variables are not subject
427 # to spurious wakeups. The absence of spurious wakeups is an implementation
428 # detail of Condition Cariables in current CPython, but in general, not
429 # a guaranteed property of condition variables as a programming
430 # construct. In particular, it is possible that this can no longer
431 # be conveniently guaranteed should their implementation ever change.
Antoine Pitrou557934f2009-11-06 22:41:14 +0000432 N = 5
433 results1 = []
434 results2 = []
435 phase_num = 0
436 def f():
437 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000438 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000439 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000440 results1.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000441 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000442 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000443 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000444 results2.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000445 b = Bunch(f, N)
446 b.wait_for_started()
447 _wait()
448 self.assertEqual(results1, [])
449 # Notify 3 threads at first
450 cond.acquire()
451 cond.notify(3)
452 _wait()
453 phase_num = 1
454 cond.release()
455 while len(results1) < 3:
456 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000457 self.assertEqual(results1, [(True, 1)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000458 self.assertEqual(results2, [])
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000459 # first wait, to ensure all workers settle into cond.wait() before
460 # we continue. See issue #8799
461 _wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000462 # Notify 5 threads: they might be in their first or second wait
463 cond.acquire()
464 cond.notify(5)
465 _wait()
466 phase_num = 2
467 cond.release()
468 while len(results1) + len(results2) < 8:
469 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000470 self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
471 self.assertEqual(results2, [(True, 2)] * 3)
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000472 _wait() # make sure all workers settle into cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000473 # Notify all threads: they are all in their second wait
474 cond.acquire()
475 cond.notify_all()
476 _wait()
477 phase_num = 3
478 cond.release()
479 while len(results2) < 5:
480 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000481 self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
482 self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000483 b.wait_for_finished()
484
485 def test_notify(self):
486 cond = self.condtype()
487 self._check_notify(cond)
488 # A second time, to check internal state is still ok.
489 self._check_notify(cond)
490
491 def test_timeout(self):
492 cond = self.condtype()
493 results = []
494 N = 5
495 def f():
496 cond.acquire()
497 t1 = time.time()
Georg Brandlb9a43912010-10-28 09:03:20 +0000498 result = cond.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000499 t2 = time.time()
500 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000501 results.append((t2 - t1, result))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000502 Bunch(f, N).wait_for_finished()
Georg Brandlb9a43912010-10-28 09:03:20 +0000503 self.assertEqual(len(results), N)
504 for dt, result in results:
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000505 self.assertTimeout(dt, 0.5)
Georg Brandlb9a43912010-10-28 09:03:20 +0000506 # Note that conceptually (that"s the condition variable protocol)
507 # a wait() may succeed even if no one notifies us and before any
508 # timeout occurs. Spurious wakeups can occur.
509 # This makes it hard to verify the result value.
510 # In practice, this implementation has no spurious wakeups.
511 self.assertFalse(result)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000512
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000513 def test_waitfor(self):
514 cond = self.condtype()
515 state = 0
516 def f():
517 with cond:
518 result = cond.wait_for(lambda : state==4)
519 self.assertTrue(result)
520 self.assertEqual(state, 4)
521 b = Bunch(f, 1)
522 b.wait_for_started()
Victor Stinner3349bca2011-05-18 00:16:14 +0200523 for i in range(4):
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000524 time.sleep(0.01)
525 with cond:
526 state += 1
527 cond.notify()
528 b.wait_for_finished()
529
530 def test_waitfor_timeout(self):
531 cond = self.condtype()
532 state = 0
533 success = []
534 def f():
535 with cond:
536 dt = time.time()
537 result = cond.wait_for(lambda : state==4, timeout=0.1)
538 dt = time.time() - dt
539 self.assertFalse(result)
540 self.assertTimeout(dt, 0.1)
541 success.append(None)
542 b = Bunch(f, 1)
543 b.wait_for_started()
544 # Only increment 3 times, so state == 4 is never reached.
545 for i in range(3):
546 time.sleep(0.01)
547 with cond:
548 state += 1
549 cond.notify()
550 b.wait_for_finished()
551 self.assertEqual(len(success), 1)
552
Antoine Pitrou557934f2009-11-06 22:41:14 +0000553
554class BaseSemaphoreTests(BaseTestCase):
555 """
556 Common tests for {bounded, unbounded} semaphore objects.
557 """
558
559 def test_constructor(self):
560 self.assertRaises(ValueError, self.semtype, value = -1)
561 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
562
563 def test_acquire(self):
564 sem = self.semtype(1)
565 sem.acquire()
566 sem.release()
567 sem = self.semtype(2)
568 sem.acquire()
569 sem.acquire()
570 sem.release()
571 sem.release()
572
573 def test_acquire_destroy(self):
574 sem = self.semtype()
575 sem.acquire()
576 del sem
577
578 def test_acquire_contended(self):
579 sem = self.semtype(7)
580 sem.acquire()
581 N = 10
582 results1 = []
583 results2 = []
584 phase_num = 0
585 def f():
586 sem.acquire()
587 results1.append(phase_num)
588 sem.acquire()
589 results2.append(phase_num)
590 b = Bunch(f, 10)
591 b.wait_for_started()
592 while len(results1) + len(results2) < 6:
593 _wait()
594 self.assertEqual(results1 + results2, [0] * 6)
595 phase_num = 1
596 for i in range(7):
597 sem.release()
598 while len(results1) + len(results2) < 13:
599 _wait()
600 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
601 phase_num = 2
602 for i in range(6):
603 sem.release()
604 while len(results1) + len(results2) < 19:
605 _wait()
606 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
607 # The semaphore is still locked
608 self.assertFalse(sem.acquire(False))
609 # Final release, to let the last thread finish
610 sem.release()
611 b.wait_for_finished()
612
613 def test_try_acquire(self):
614 sem = self.semtype(2)
615 self.assertTrue(sem.acquire(False))
616 self.assertTrue(sem.acquire(False))
617 self.assertFalse(sem.acquire(False))
618 sem.release()
619 self.assertTrue(sem.acquire(False))
620
621 def test_try_acquire_contended(self):
622 sem = self.semtype(4)
623 sem.acquire()
624 results = []
625 def f():
626 results.append(sem.acquire(False))
627 results.append(sem.acquire(False))
628 Bunch(f, 5).wait_for_finished()
629 # There can be a thread switch between acquiring the semaphore and
630 # appending the result, therefore results will not necessarily be
631 # ordered.
632 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
633
Antoine Pitrou0454af92010-04-17 23:51:58 +0000634 def test_acquire_timeout(self):
635 sem = self.semtype(2)
636 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
637 self.assertTrue(sem.acquire(timeout=0.005))
638 self.assertTrue(sem.acquire(timeout=0.005))
639 self.assertFalse(sem.acquire(timeout=0.005))
640 sem.release()
641 self.assertTrue(sem.acquire(timeout=0.005))
642 t = time.time()
643 self.assertFalse(sem.acquire(timeout=0.5))
644 dt = time.time() - t
645 self.assertTimeout(dt, 0.5)
646
Antoine Pitrou557934f2009-11-06 22:41:14 +0000647 def test_default_value(self):
648 # The default initial value is 1.
649 sem = self.semtype()
650 sem.acquire()
651 def f():
652 sem.acquire()
653 sem.release()
654 b = Bunch(f, 1)
655 b.wait_for_started()
656 _wait()
657 self.assertFalse(b.finished)
658 sem.release()
659 b.wait_for_finished()
660
661 def test_with(self):
662 sem = self.semtype(2)
663 def _with(err=None):
664 with sem:
665 self.assertTrue(sem.acquire(False))
666 sem.release()
667 with sem:
668 self.assertFalse(sem.acquire(False))
669 if err:
670 raise err
671 _with()
672 self.assertTrue(sem.acquire(False))
673 sem.release()
674 self.assertRaises(TypeError, _with, TypeError)
675 self.assertTrue(sem.acquire(False))
676 sem.release()
677
678class SemaphoreTests(BaseSemaphoreTests):
679 """
680 Tests for unbounded semaphores.
681 """
682
683 def test_release_unacquired(self):
684 # Unbounded releases are allowed and increment the semaphore's value
685 sem = self.semtype(1)
686 sem.release()
687 sem.acquire()
688 sem.acquire()
689 sem.release()
690
691
692class BoundedSemaphoreTests(BaseSemaphoreTests):
693 """
694 Tests for bounded semaphores.
695 """
696
697 def test_release_unacquired(self):
698 # Cannot go past the initial value
699 sem = self.semtype()
700 self.assertRaises(ValueError, sem.release)
701 sem.acquire()
702 sem.release()
703 self.assertRaises(ValueError, sem.release)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000704
705
706class BarrierTests(BaseTestCase):
707 """
708 Tests for Barrier objects.
709 """
710 N = 5
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000711 defaultTimeout = 2.0
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000712
713 def setUp(self):
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000714 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000715 def tearDown(self):
716 self.barrier.abort()
717
718 def run_threads(self, f):
719 b = Bunch(f, self.N-1)
720 f()
721 b.wait_for_finished()
722
723 def multipass(self, results, n):
724 m = self.barrier.parties
725 self.assertEqual(m, self.N)
726 for i in range(n):
727 results[0].append(True)
728 self.assertEqual(len(results[1]), i * m)
729 self.barrier.wait()
730 results[1].append(True)
731 self.assertEqual(len(results[0]), (i + 1) * m)
732 self.barrier.wait()
733 self.assertEqual(self.barrier.n_waiting, 0)
734 self.assertFalse(self.barrier.broken)
735
736 def test_barrier(self, passes=1):
737 """
738 Test that a barrier is passed in lockstep
739 """
740 results = [[],[]]
741 def f():
742 self.multipass(results, passes)
743 self.run_threads(f)
744
745 def test_barrier_10(self):
746 """
747 Test that a barrier works for 10 consecutive runs
748 """
749 return self.test_barrier(10)
750
751 def test_wait_return(self):
752 """
753 test the return value from barrier.wait
754 """
755 results = []
756 def f():
757 r = self.barrier.wait()
758 results.append(r)
759
760 self.run_threads(f)
761 self.assertEqual(sum(results), sum(range(self.N)))
762
763 def test_action(self):
764 """
765 Test the 'action' callback
766 """
767 results = []
768 def action():
769 results.append(True)
770 barrier = self.barriertype(self.N, action)
771 def f():
772 barrier.wait()
773 self.assertEqual(len(results), 1)
774
775 self.run_threads(f)
776
777 def test_abort(self):
778 """
779 Test that an abort will put the barrier in a broken state
780 """
781 results1 = []
782 results2 = []
783 def f():
784 try:
785 i = self.barrier.wait()
786 if i == self.N//2:
787 raise RuntimeError
788 self.barrier.wait()
789 results1.append(True)
790 except threading.BrokenBarrierError:
791 results2.append(True)
792 except RuntimeError:
793 self.barrier.abort()
794 pass
795
796 self.run_threads(f)
797 self.assertEqual(len(results1), 0)
798 self.assertEqual(len(results2), self.N-1)
799 self.assertTrue(self.barrier.broken)
800
801 def test_reset(self):
802 """
803 Test that a 'reset' on a barrier frees the waiting threads
804 """
805 results1 = []
806 results2 = []
807 results3 = []
808 def f():
809 i = self.barrier.wait()
810 if i == self.N//2:
811 # Wait until the other threads are all in the barrier.
812 while self.barrier.n_waiting < self.N-1:
813 time.sleep(0.001)
814 self.barrier.reset()
815 else:
816 try:
817 self.barrier.wait()
818 results1.append(True)
819 except threading.BrokenBarrierError:
820 results2.append(True)
821 # Now, pass the barrier again
822 self.barrier.wait()
823 results3.append(True)
824
825 self.run_threads(f)
826 self.assertEqual(len(results1), 0)
827 self.assertEqual(len(results2), self.N-1)
828 self.assertEqual(len(results3), self.N)
829
830
831 def test_abort_and_reset(self):
832 """
833 Test that a barrier can be reset after being broken.
834 """
835 results1 = []
836 results2 = []
837 results3 = []
838 barrier2 = self.barriertype(self.N)
839 def f():
840 try:
841 i = self.barrier.wait()
842 if i == self.N//2:
843 raise RuntimeError
844 self.barrier.wait()
845 results1.append(True)
846 except threading.BrokenBarrierError:
847 results2.append(True)
848 except RuntimeError:
849 self.barrier.abort()
850 pass
851 # Synchronize and reset the barrier. Must synchronize first so
852 # that everyone has left it when we reset, and after so that no
853 # one enters it before the reset.
854 if barrier2.wait() == self.N//2:
855 self.barrier.reset()
856 barrier2.wait()
857 self.barrier.wait()
858 results3.append(True)
859
860 self.run_threads(f)
861 self.assertEqual(len(results1), 0)
862 self.assertEqual(len(results2), self.N-1)
863 self.assertEqual(len(results3), self.N)
864
865 def test_timeout(self):
866 """
867 Test wait(timeout)
868 """
869 def f():
870 i = self.barrier.wait()
871 if i == self.N // 2:
872 # One thread is late!
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000873 time.sleep(1.0)
874 # Default timeout is 2.0, so this is shorter.
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000875 self.assertRaises(threading.BrokenBarrierError,
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000876 self.barrier.wait, 0.5)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000877 self.run_threads(f)
878
879 def test_default_timeout(self):
880 """
881 Test the barrier's default timeout
882 """
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200883 # create a barrier with a low default timeout
884 barrier = self.barriertype(self.N, timeout=0.3)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000885 def f():
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000886 i = barrier.wait()
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000887 if i == self.N // 2:
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200888 # One thread is later than the default timeout of 0.3s.
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000889 time.sleep(1.0)
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000890 self.assertRaises(threading.BrokenBarrierError, barrier.wait)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000891 self.run_threads(f)
892
893 def test_single_thread(self):
894 b = self.barriertype(1)
895 b.wait()
896 b.wait()