blob: 1a24bdeed373fbd075e2c698d18616f55c941442 [file] [log] [blame]
Antoine Pitrou557934f2009-11-06 22:41:14 +00001"""
2Various tests for synchronization primitives.
3"""
4
5import sys
6import time
Victor Stinner2a129742011-05-30 23:02:52 +02007from _thread import start_new_thread, TIMEOUT_MAX
Antoine Pitrou557934f2009-11-06 22:41:14 +00008import threading
9import unittest
10
11from test import support
12
13
14def _wait():
15 # A crude wait/yield function not relying on synchronization primitives.
16 time.sleep(0.01)
17
18class Bunch(object):
19 """
20 A bunch of threads.
21 """
22 def __init__(self, f, n, wait_before_exit=False):
23 """
24 Construct a bunch of `n` threads running the same function `f`.
25 If `wait_before_exit` is True, the threads won't terminate until
26 do_finish() is called.
27 """
28 self.f = f
29 self.n = n
30 self.started = []
31 self.finished = []
32 self._can_exit = not wait_before_exit
33 def task():
Victor Stinner2a129742011-05-30 23:02:52 +020034 tid = threading.get_ident()
Antoine Pitrou557934f2009-11-06 22:41:14 +000035 self.started.append(tid)
36 try:
37 f()
38 finally:
39 self.finished.append(tid)
40 while not self._can_exit:
41 _wait()
42 for i in range(n):
43 start_new_thread(task, ())
44
45 def wait_for_started(self):
46 while len(self.started) < self.n:
47 _wait()
48
49 def wait_for_finished(self):
50 while len(self.finished) < self.n:
51 _wait()
52
53 def do_finish(self):
54 self._can_exit = True
55
56
57class BaseTestCase(unittest.TestCase):
58 def setUp(self):
59 self._threads = support.threading_setup()
60
61 def tearDown(self):
62 support.threading_cleanup(*self._threads)
63 support.reap_children()
64
Antoine Pitrou7c3e5772010-04-14 15:44:10 +000065 def assertTimeout(self, actual, expected):
66 # The waiting and/or time.time() can be imprecise, which
67 # is why comparing to the expected value would sometimes fail
68 # (especially under Windows).
69 self.assertGreaterEqual(actual, expected * 0.6)
70 # Test nothing insane happened
71 self.assertLess(actual, expected * 10.0)
72
Antoine Pitrou557934f2009-11-06 22:41:14 +000073
74class BaseLockTests(BaseTestCase):
75 """
76 Tests for both recursive and non-recursive locks.
77 """
78
79 def test_constructor(self):
80 lock = self.locktype()
81 del lock
82
Christian Heimesc5d95b12013-07-30 15:54:39 +020083 def test_repr(self):
84 lock = self.locktype()
85 repr(lock)
86 del lock
87
Antoine Pitrou557934f2009-11-06 22:41:14 +000088 def test_acquire_destroy(self):
89 lock = self.locktype()
90 lock.acquire()
91 del lock
92
93 def test_acquire_release(self):
94 lock = self.locktype()
95 lock.acquire()
96 lock.release()
97 del lock
98
99 def test_try_acquire(self):
100 lock = self.locktype()
101 self.assertTrue(lock.acquire(False))
102 lock.release()
103
104 def test_try_acquire_contended(self):
105 lock = self.locktype()
106 lock.acquire()
107 result = []
108 def f():
109 result.append(lock.acquire(False))
110 Bunch(f, 1).wait_for_finished()
111 self.assertFalse(result[0])
112 lock.release()
113
114 def test_acquire_contended(self):
115 lock = self.locktype()
116 lock.acquire()
117 N = 5
118 def f():
119 lock.acquire()
120 lock.release()
121
122 b = Bunch(f, N)
123 b.wait_for_started()
124 _wait()
125 self.assertEqual(len(b.finished), 0)
126 lock.release()
127 b.wait_for_finished()
128 self.assertEqual(len(b.finished), N)
129
130 def test_with(self):
131 lock = self.locktype()
132 def f():
133 lock.acquire()
134 lock.release()
135 def _with(err=None):
136 with lock:
137 if err is not None:
138 raise err
139 _with()
140 # Check the lock is unacquired
141 Bunch(f, 1).wait_for_finished()
142 self.assertRaises(TypeError, _with, TypeError)
143 # Check the lock is unacquired
144 Bunch(f, 1).wait_for_finished()
145
Antoine Pitroub0872682009-11-09 16:08:16 +0000146 def test_thread_leak(self):
147 # The lock shouldn't leak a Thread instance when used from a foreign
148 # (non-threading) thread.
149 lock = self.locktype()
150 def f():
151 lock.acquire()
152 lock.release()
153 n = len(threading.enumerate())
154 # We run many threads in the hope that existing threads ids won't
155 # be recycled.
156 Bunch(f, 15).wait_for_finished()
Antoine Pitrou45fdb452011-04-04 21:59:09 +0200157 if len(threading.enumerate()) != n:
158 # There is a small window during which a Thread instance's
159 # target function has finished running, but the Thread is still
160 # alive and registered. Avoid spurious failures by waiting a
161 # bit more (seen on a buildbot).
162 time.sleep(0.4)
163 self.assertEqual(n, len(threading.enumerate()))
Antoine Pitroub0872682009-11-09 16:08:16 +0000164
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000165 def test_timeout(self):
166 lock = self.locktype()
167 # Can't set timeout if not blocking
168 self.assertRaises(ValueError, lock.acquire, 0, 1)
169 # Invalid timeout values
170 self.assertRaises(ValueError, lock.acquire, timeout=-100)
171 self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
172 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
173 # TIMEOUT_MAX is ok
174 lock.acquire(timeout=TIMEOUT_MAX)
175 lock.release()
176 t1 = time.time()
177 self.assertTrue(lock.acquire(timeout=5))
178 t2 = time.time()
179 # Just a sanity test that it didn't actually wait for the timeout.
180 self.assertLess(t2 - t1, 5)
181 results = []
182 def f():
183 t1 = time.time()
184 results.append(lock.acquire(timeout=0.5))
185 t2 = time.time()
186 results.append(t2 - t1)
187 Bunch(f, 1).wait_for_finished()
188 self.assertFalse(results[0])
189 self.assertTimeout(results[1], 0.5)
190
Antoine Pitrou557934f2009-11-06 22:41:14 +0000191
192class LockTests(BaseLockTests):
193 """
194 Tests for non-recursive, weak locks
195 (which can be acquired and released from different threads).
196 """
197 def test_reacquire(self):
198 # Lock needs to be released before re-acquiring.
199 lock = self.locktype()
200 phase = []
201 def f():
202 lock.acquire()
203 phase.append(None)
204 lock.acquire()
205 phase.append(None)
206 start_new_thread(f, ())
207 while len(phase) == 0:
208 _wait()
209 _wait()
210 self.assertEqual(len(phase), 1)
211 lock.release()
212 while len(phase) == 1:
213 _wait()
214 self.assertEqual(len(phase), 2)
215
216 def test_different_thread(self):
217 # Lock can be released from a different thread.
218 lock = self.locktype()
219 lock.acquire()
220 def f():
221 lock.release()
222 b = Bunch(f, 1)
223 b.wait_for_finished()
224 lock.acquire()
225 lock.release()
226
Antoine Pitrou7899acf2011-03-31 01:00:32 +0200227 def test_state_after_timeout(self):
228 # Issue #11618: check that lock is in a proper state after a
229 # (non-zero) timeout.
230 lock = self.locktype()
231 lock.acquire()
232 self.assertFalse(lock.acquire(timeout=0.01))
233 lock.release()
234 self.assertFalse(lock.locked())
235 self.assertTrue(lock.acquire(blocking=False))
236
Antoine Pitrou557934f2009-11-06 22:41:14 +0000237
238class RLockTests(BaseLockTests):
239 """
240 Tests for recursive locks.
241 """
242 def test_reacquire(self):
243 lock = self.locktype()
244 lock.acquire()
245 lock.acquire()
246 lock.release()
247 lock.acquire()
248 lock.release()
249 lock.release()
250
251 def test_release_unacquired(self):
252 # Cannot release an unacquired lock
253 lock = self.locktype()
254 self.assertRaises(RuntimeError, lock.release)
255 lock.acquire()
256 lock.acquire()
257 lock.release()
258 lock.acquire()
259 lock.release()
260 lock.release()
261 self.assertRaises(RuntimeError, lock.release)
Antoine Pitrouea3eb882012-05-17 18:55:59 +0200262
263 def test_release_save_unacquired(self):
264 # Cannot _release_save an unacquired lock
265 lock = self.locktype()
266 self.assertRaises(RuntimeError, lock._release_save)
267 lock.acquire()
268 lock.acquire()
269 lock.release()
270 lock.acquire()
271 lock.release()
272 lock.release()
Victor Stinnerc2824d42011-04-24 23:41:33 +0200273 self.assertRaises(RuntimeError, lock._release_save)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000274
275 def test_different_thread(self):
276 # Cannot release from a different thread
277 lock = self.locktype()
278 def f():
279 lock.acquire()
280 b = Bunch(f, 1, True)
281 try:
282 self.assertRaises(RuntimeError, lock.release)
283 finally:
284 b.do_finish()
285
286 def test__is_owned(self):
287 lock = self.locktype()
288 self.assertFalse(lock._is_owned())
289 lock.acquire()
290 self.assertTrue(lock._is_owned())
291 lock.acquire()
292 self.assertTrue(lock._is_owned())
293 result = []
294 def f():
295 result.append(lock._is_owned())
296 Bunch(f, 1).wait_for_finished()
297 self.assertFalse(result[0])
298 lock.release()
299 self.assertTrue(lock._is_owned())
300 lock.release()
301 self.assertFalse(lock._is_owned())
302
303
304class EventTests(BaseTestCase):
305 """
306 Tests for Event objects.
307 """
308
309 def test_is_set(self):
310 evt = self.eventtype()
311 self.assertFalse(evt.is_set())
312 evt.set()
313 self.assertTrue(evt.is_set())
314 evt.set()
315 self.assertTrue(evt.is_set())
316 evt.clear()
317 self.assertFalse(evt.is_set())
318 evt.clear()
319 self.assertFalse(evt.is_set())
320
321 def _check_notify(self, evt):
322 # All threads get notified
323 N = 5
324 results1 = []
325 results2 = []
326 def f():
327 results1.append(evt.wait())
328 results2.append(evt.wait())
329 b = Bunch(f, N)
330 b.wait_for_started()
331 _wait()
332 self.assertEqual(len(results1), 0)
333 evt.set()
334 b.wait_for_finished()
335 self.assertEqual(results1, [True] * N)
336 self.assertEqual(results2, [True] * N)
337
338 def test_notify(self):
339 evt = self.eventtype()
340 self._check_notify(evt)
341 # Another time, after an explicit clear()
342 evt.set()
343 evt.clear()
344 self._check_notify(evt)
345
346 def test_timeout(self):
347 evt = self.eventtype()
348 results1 = []
349 results2 = []
350 N = 5
351 def f():
352 results1.append(evt.wait(0.0))
353 t1 = time.time()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000354 r = evt.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000355 t2 = time.time()
356 results2.append((r, t2 - t1))
357 Bunch(f, N).wait_for_finished()
358 self.assertEqual(results1, [False] * N)
359 for r, dt in results2:
360 self.assertFalse(r)
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000361 self.assertTimeout(dt, 0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000362 # The event is set
363 results1 = []
364 results2 = []
365 evt.set()
366 Bunch(f, N).wait_for_finished()
367 self.assertEqual(results1, [True] * N)
368 for r, dt in results2:
369 self.assertTrue(r)
370
Charles-François Natalided03482012-01-07 18:24:56 +0100371 def test_set_and_clear(self):
372 # Issue #13502: check that wait() returns true even when the event is
373 # cleared before the waiting thread is woken up.
374 evt = self.eventtype()
375 results = []
376 N = 5
377 def f():
378 results.append(evt.wait(1))
379 b = Bunch(f, N)
380 b.wait_for_started()
381 time.sleep(0.5)
382 evt.set()
383 evt.clear()
384 b.wait_for_finished()
385 self.assertEqual(results, [True] * N)
386
Antoine Pitrou557934f2009-11-06 22:41:14 +0000387
388class ConditionTests(BaseTestCase):
389 """
390 Tests for condition variables.
391 """
392
393 def test_acquire(self):
394 cond = self.condtype()
395 # Be default we have an RLock: the condition can be acquired multiple
396 # times.
397 cond.acquire()
398 cond.acquire()
399 cond.release()
400 cond.release()
401 lock = threading.Lock()
402 cond = self.condtype(lock)
403 cond.acquire()
404 self.assertFalse(lock.acquire(False))
405 cond.release()
406 self.assertTrue(lock.acquire(False))
407 self.assertFalse(cond.acquire(False))
408 lock.release()
409 with cond:
410 self.assertFalse(lock.acquire(False))
411
412 def test_unacquired_wait(self):
413 cond = self.condtype()
414 self.assertRaises(RuntimeError, cond.wait)
415
416 def test_unacquired_notify(self):
417 cond = self.condtype()
418 self.assertRaises(RuntimeError, cond.notify)
419
420 def _check_notify(self, cond):
421 N = 5
422 results1 = []
423 results2 = []
424 phase_num = 0
425 def f():
426 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000427 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000428 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000429 results1.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000430 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000431 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000432 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000433 results2.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000434 b = Bunch(f, N)
435 b.wait_for_started()
436 _wait()
437 self.assertEqual(results1, [])
438 # Notify 3 threads at first
439 cond.acquire()
440 cond.notify(3)
441 _wait()
442 phase_num = 1
443 cond.release()
444 while len(results1) < 3:
445 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000446 self.assertEqual(results1, [(True, 1)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000447 self.assertEqual(results2, [])
448 # Notify 5 threads: they might be in their first or second wait
449 cond.acquire()
450 cond.notify(5)
451 _wait()
452 phase_num = 2
453 cond.release()
454 while len(results1) + len(results2) < 8:
455 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000456 self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
457 self.assertEqual(results2, [(True, 2)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000458 # Notify all threads: they are all in their second wait
459 cond.acquire()
460 cond.notify_all()
461 _wait()
462 phase_num = 3
463 cond.release()
464 while len(results2) < 5:
465 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000466 self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
467 self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000468 b.wait_for_finished()
469
470 def test_notify(self):
471 cond = self.condtype()
472 self._check_notify(cond)
473 # A second time, to check internal state is still ok.
474 self._check_notify(cond)
475
476 def test_timeout(self):
477 cond = self.condtype()
478 results = []
479 N = 5
480 def f():
481 cond.acquire()
482 t1 = time.time()
Georg Brandlb9a43912010-10-28 09:03:20 +0000483 result = cond.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000484 t2 = time.time()
485 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000486 results.append((t2 - t1, result))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000487 Bunch(f, N).wait_for_finished()
Georg Brandlb9a43912010-10-28 09:03:20 +0000488 self.assertEqual(len(results), N)
489 for dt, result in results:
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000490 self.assertTimeout(dt, 0.5)
Georg Brandlb9a43912010-10-28 09:03:20 +0000491 # Note that conceptually (that"s the condition variable protocol)
492 # a wait() may succeed even if no one notifies us and before any
493 # timeout occurs. Spurious wakeups can occur.
494 # This makes it hard to verify the result value.
495 # In practice, this implementation has no spurious wakeups.
496 self.assertFalse(result)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000497
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000498 def test_waitfor(self):
499 cond = self.condtype()
500 state = 0
501 def f():
502 with cond:
503 result = cond.wait_for(lambda : state==4)
504 self.assertTrue(result)
505 self.assertEqual(state, 4)
506 b = Bunch(f, 1)
507 b.wait_for_started()
Victor Stinner3349bca2011-05-18 00:16:14 +0200508 for i in range(4):
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000509 time.sleep(0.01)
510 with cond:
511 state += 1
512 cond.notify()
513 b.wait_for_finished()
514
515 def test_waitfor_timeout(self):
516 cond = self.condtype()
517 state = 0
518 success = []
519 def f():
520 with cond:
521 dt = time.time()
522 result = cond.wait_for(lambda : state==4, timeout=0.1)
523 dt = time.time() - dt
524 self.assertFalse(result)
525 self.assertTimeout(dt, 0.1)
526 success.append(None)
527 b = Bunch(f, 1)
528 b.wait_for_started()
529 # Only increment 3 times, so state == 4 is never reached.
530 for i in range(3):
531 time.sleep(0.01)
532 with cond:
533 state += 1
534 cond.notify()
535 b.wait_for_finished()
536 self.assertEqual(len(success), 1)
537
Antoine Pitrou557934f2009-11-06 22:41:14 +0000538
539class BaseSemaphoreTests(BaseTestCase):
540 """
541 Common tests for {bounded, unbounded} semaphore objects.
542 """
543
544 def test_constructor(self):
545 self.assertRaises(ValueError, self.semtype, value = -1)
546 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
547
548 def test_acquire(self):
549 sem = self.semtype(1)
550 sem.acquire()
551 sem.release()
552 sem = self.semtype(2)
553 sem.acquire()
554 sem.acquire()
555 sem.release()
556 sem.release()
557
558 def test_acquire_destroy(self):
559 sem = self.semtype()
560 sem.acquire()
561 del sem
562
563 def test_acquire_contended(self):
564 sem = self.semtype(7)
565 sem.acquire()
566 N = 10
567 results1 = []
568 results2 = []
569 phase_num = 0
570 def f():
571 sem.acquire()
572 results1.append(phase_num)
573 sem.acquire()
574 results2.append(phase_num)
575 b = Bunch(f, 10)
576 b.wait_for_started()
577 while len(results1) + len(results2) < 6:
578 _wait()
579 self.assertEqual(results1 + results2, [0] * 6)
580 phase_num = 1
581 for i in range(7):
582 sem.release()
583 while len(results1) + len(results2) < 13:
584 _wait()
585 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
586 phase_num = 2
587 for i in range(6):
588 sem.release()
589 while len(results1) + len(results2) < 19:
590 _wait()
591 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
592 # The semaphore is still locked
593 self.assertFalse(sem.acquire(False))
594 # Final release, to let the last thread finish
595 sem.release()
596 b.wait_for_finished()
597
598 def test_try_acquire(self):
599 sem = self.semtype(2)
600 self.assertTrue(sem.acquire(False))
601 self.assertTrue(sem.acquire(False))
602 self.assertFalse(sem.acquire(False))
603 sem.release()
604 self.assertTrue(sem.acquire(False))
605
606 def test_try_acquire_contended(self):
607 sem = self.semtype(4)
608 sem.acquire()
609 results = []
610 def f():
611 results.append(sem.acquire(False))
612 results.append(sem.acquire(False))
613 Bunch(f, 5).wait_for_finished()
614 # There can be a thread switch between acquiring the semaphore and
615 # appending the result, therefore results will not necessarily be
616 # ordered.
617 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
618
Antoine Pitrou0454af92010-04-17 23:51:58 +0000619 def test_acquire_timeout(self):
620 sem = self.semtype(2)
621 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
622 self.assertTrue(sem.acquire(timeout=0.005))
623 self.assertTrue(sem.acquire(timeout=0.005))
624 self.assertFalse(sem.acquire(timeout=0.005))
625 sem.release()
626 self.assertTrue(sem.acquire(timeout=0.005))
627 t = time.time()
628 self.assertFalse(sem.acquire(timeout=0.5))
629 dt = time.time() - t
630 self.assertTimeout(dt, 0.5)
631
Antoine Pitrou557934f2009-11-06 22:41:14 +0000632 def test_default_value(self):
633 # The default initial value is 1.
634 sem = self.semtype()
635 sem.acquire()
636 def f():
637 sem.acquire()
638 sem.release()
639 b = Bunch(f, 1)
640 b.wait_for_started()
641 _wait()
642 self.assertFalse(b.finished)
643 sem.release()
644 b.wait_for_finished()
645
646 def test_with(self):
647 sem = self.semtype(2)
648 def _with(err=None):
649 with sem:
650 self.assertTrue(sem.acquire(False))
651 sem.release()
652 with sem:
653 self.assertFalse(sem.acquire(False))
654 if err:
655 raise err
656 _with()
657 self.assertTrue(sem.acquire(False))
658 sem.release()
659 self.assertRaises(TypeError, _with, TypeError)
660 self.assertTrue(sem.acquire(False))
661 sem.release()
662
663class SemaphoreTests(BaseSemaphoreTests):
664 """
665 Tests for unbounded semaphores.
666 """
667
668 def test_release_unacquired(self):
669 # Unbounded releases are allowed and increment the semaphore's value
670 sem = self.semtype(1)
671 sem.release()
672 sem.acquire()
673 sem.acquire()
674 sem.release()
675
676
677class BoundedSemaphoreTests(BaseSemaphoreTests):
678 """
679 Tests for bounded semaphores.
680 """
681
682 def test_release_unacquired(self):
683 # Cannot go past the initial value
684 sem = self.semtype()
685 self.assertRaises(ValueError, sem.release)
686 sem.acquire()
687 sem.release()
688 self.assertRaises(ValueError, sem.release)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000689
690
691class BarrierTests(BaseTestCase):
692 """
693 Tests for Barrier objects.
694 """
695 N = 5
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000696 defaultTimeout = 2.0
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000697
698 def setUp(self):
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000699 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000700 def tearDown(self):
701 self.barrier.abort()
702
703 def run_threads(self, f):
704 b = Bunch(f, self.N-1)
705 f()
706 b.wait_for_finished()
707
708 def multipass(self, results, n):
709 m = self.barrier.parties
710 self.assertEqual(m, self.N)
711 for i in range(n):
712 results[0].append(True)
713 self.assertEqual(len(results[1]), i * m)
714 self.barrier.wait()
715 results[1].append(True)
716 self.assertEqual(len(results[0]), (i + 1) * m)
717 self.barrier.wait()
718 self.assertEqual(self.barrier.n_waiting, 0)
719 self.assertFalse(self.barrier.broken)
720
721 def test_barrier(self, passes=1):
722 """
723 Test that a barrier is passed in lockstep
724 """
725 results = [[],[]]
726 def f():
727 self.multipass(results, passes)
728 self.run_threads(f)
729
730 def test_barrier_10(self):
731 """
732 Test that a barrier works for 10 consecutive runs
733 """
734 return self.test_barrier(10)
735
736 def test_wait_return(self):
737 """
738 test the return value from barrier.wait
739 """
740 results = []
741 def f():
742 r = self.barrier.wait()
743 results.append(r)
744
745 self.run_threads(f)
746 self.assertEqual(sum(results), sum(range(self.N)))
747
748 def test_action(self):
749 """
750 Test the 'action' callback
751 """
752 results = []
753 def action():
754 results.append(True)
755 barrier = self.barriertype(self.N, action)
756 def f():
757 barrier.wait()
758 self.assertEqual(len(results), 1)
759
760 self.run_threads(f)
761
762 def test_abort(self):
763 """
764 Test that an abort will put the barrier in a broken state
765 """
766 results1 = []
767 results2 = []
768 def f():
769 try:
770 i = self.barrier.wait()
771 if i == self.N//2:
772 raise RuntimeError
773 self.barrier.wait()
774 results1.append(True)
775 except threading.BrokenBarrierError:
776 results2.append(True)
777 except RuntimeError:
778 self.barrier.abort()
779 pass
780
781 self.run_threads(f)
782 self.assertEqual(len(results1), 0)
783 self.assertEqual(len(results2), self.N-1)
784 self.assertTrue(self.barrier.broken)
785
786 def test_reset(self):
787 """
788 Test that a 'reset' on a barrier frees the waiting threads
789 """
790 results1 = []
791 results2 = []
792 results3 = []
793 def f():
794 i = self.barrier.wait()
795 if i == self.N//2:
796 # Wait until the other threads are all in the barrier.
797 while self.barrier.n_waiting < self.N-1:
798 time.sleep(0.001)
799 self.barrier.reset()
800 else:
801 try:
802 self.barrier.wait()
803 results1.append(True)
804 except threading.BrokenBarrierError:
805 results2.append(True)
806 # Now, pass the barrier again
807 self.barrier.wait()
808 results3.append(True)
809
810 self.run_threads(f)
811 self.assertEqual(len(results1), 0)
812 self.assertEqual(len(results2), self.N-1)
813 self.assertEqual(len(results3), self.N)
814
815
816 def test_abort_and_reset(self):
817 """
818 Test that a barrier can be reset after being broken.
819 """
820 results1 = []
821 results2 = []
822 results3 = []
823 barrier2 = self.barriertype(self.N)
824 def f():
825 try:
826 i = self.barrier.wait()
827 if i == self.N//2:
828 raise RuntimeError
829 self.barrier.wait()
830 results1.append(True)
831 except threading.BrokenBarrierError:
832 results2.append(True)
833 except RuntimeError:
834 self.barrier.abort()
835 pass
836 # Synchronize and reset the barrier. Must synchronize first so
837 # that everyone has left it when we reset, and after so that no
838 # one enters it before the reset.
839 if barrier2.wait() == self.N//2:
840 self.barrier.reset()
841 barrier2.wait()
842 self.barrier.wait()
843 results3.append(True)
844
845 self.run_threads(f)
846 self.assertEqual(len(results1), 0)
847 self.assertEqual(len(results2), self.N-1)
848 self.assertEqual(len(results3), self.N)
849
850 def test_timeout(self):
851 """
852 Test wait(timeout)
853 """
854 def f():
855 i = self.barrier.wait()
856 if i == self.N // 2:
857 # One thread is late!
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000858 time.sleep(1.0)
859 # Default timeout is 2.0, so this is shorter.
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000860 self.assertRaises(threading.BrokenBarrierError,
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000861 self.barrier.wait, 0.5)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000862 self.run_threads(f)
863
864 def test_default_timeout(self):
865 """
866 Test the barrier's default timeout
867 """
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200868 # create a barrier with a low default timeout
869 barrier = self.barriertype(self.N, timeout=0.3)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000870 def f():
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000871 i = barrier.wait()
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000872 if i == self.N // 2:
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200873 # One thread is later than the default timeout of 0.3s.
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000874 time.sleep(1.0)
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000875 self.assertRaises(threading.BrokenBarrierError, barrier.wait)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000876 self.run_threads(f)
877
878 def test_single_thread(self):
879 b = self.barriertype(1)
880 b.wait()
881 b.wait()