blob: 005c912fcf610ae0cd10eabba4e4edad43522837 [file] [log] [blame]
Antoine Pitrou557934f2009-11-06 22:41:14 +00001"""
2Various tests for synchronization primitives.
3"""
4
5import sys
6import time
Antoine Pitrou7c3e5772010-04-14 15:44:10 +00007from _thread import start_new_thread, get_ident, TIMEOUT_MAX
Antoine Pitrou557934f2009-11-06 22:41:14 +00008import threading
9import unittest
10
11from test import support
12
13
14def _wait():
15 # A crude wait/yield function not relying on synchronization primitives.
16 time.sleep(0.01)
17
18class Bunch(object):
19 """
20 A bunch of threads.
21 """
22 def __init__(self, f, n, wait_before_exit=False):
23 """
24 Construct a bunch of `n` threads running the same function `f`.
25 If `wait_before_exit` is True, the threads won't terminate until
26 do_finish() is called.
27 """
28 self.f = f
29 self.n = n
30 self.started = []
31 self.finished = []
32 self._can_exit = not wait_before_exit
33 def task():
34 tid = get_ident()
35 self.started.append(tid)
36 try:
37 f()
38 finally:
39 self.finished.append(tid)
40 while not self._can_exit:
41 _wait()
42 for i in range(n):
43 start_new_thread(task, ())
44
45 def wait_for_started(self):
46 while len(self.started) < self.n:
47 _wait()
48
49 def wait_for_finished(self):
50 while len(self.finished) < self.n:
51 _wait()
52
53 def do_finish(self):
54 self._can_exit = True
55
56
57class BaseTestCase(unittest.TestCase):
58 def setUp(self):
59 self._threads = support.threading_setup()
60
61 def tearDown(self):
62 support.threading_cleanup(*self._threads)
63 support.reap_children()
64
Antoine Pitrou7c3e5772010-04-14 15:44:10 +000065 def assertTimeout(self, actual, expected):
66 # The waiting and/or time.time() can be imprecise, which
67 # is why comparing to the expected value would sometimes fail
68 # (especially under Windows).
69 self.assertGreaterEqual(actual, expected * 0.6)
70 # Test nothing insane happened
71 self.assertLess(actual, expected * 10.0)
72
Antoine Pitrou557934f2009-11-06 22:41:14 +000073
74class BaseLockTests(BaseTestCase):
75 """
76 Tests for both recursive and non-recursive locks.
77 """
78
79 def test_constructor(self):
80 lock = self.locktype()
81 del lock
82
83 def test_acquire_destroy(self):
84 lock = self.locktype()
85 lock.acquire()
86 del lock
87
88 def test_acquire_release(self):
89 lock = self.locktype()
90 lock.acquire()
91 lock.release()
92 del lock
93
94 def test_try_acquire(self):
95 lock = self.locktype()
96 self.assertTrue(lock.acquire(False))
97 lock.release()
98
99 def test_try_acquire_contended(self):
100 lock = self.locktype()
101 lock.acquire()
102 result = []
103 def f():
104 result.append(lock.acquire(False))
105 Bunch(f, 1).wait_for_finished()
106 self.assertFalse(result[0])
107 lock.release()
108
109 def test_acquire_contended(self):
110 lock = self.locktype()
111 lock.acquire()
112 N = 5
113 def f():
114 lock.acquire()
115 lock.release()
116
117 b = Bunch(f, N)
118 b.wait_for_started()
119 _wait()
120 self.assertEqual(len(b.finished), 0)
121 lock.release()
122 b.wait_for_finished()
123 self.assertEqual(len(b.finished), N)
124
125 def test_with(self):
126 lock = self.locktype()
127 def f():
128 lock.acquire()
129 lock.release()
130 def _with(err=None):
131 with lock:
132 if err is not None:
133 raise err
134 _with()
135 # Check the lock is unacquired
136 Bunch(f, 1).wait_for_finished()
137 self.assertRaises(TypeError, _with, TypeError)
138 # Check the lock is unacquired
139 Bunch(f, 1).wait_for_finished()
140
Antoine Pitroub0872682009-11-09 16:08:16 +0000141 def test_thread_leak(self):
142 # The lock shouldn't leak a Thread instance when used from a foreign
143 # (non-threading) thread.
144 lock = self.locktype()
145 def f():
146 lock.acquire()
147 lock.release()
148 n = len(threading.enumerate())
149 # We run many threads in the hope that existing threads ids won't
150 # be recycled.
151 Bunch(f, 15).wait_for_finished()
152 self.assertEqual(n, len(threading.enumerate()))
153
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000154 def test_timeout(self):
155 lock = self.locktype()
156 # Can't set timeout if not blocking
157 self.assertRaises(ValueError, lock.acquire, 0, 1)
158 # Invalid timeout values
159 self.assertRaises(ValueError, lock.acquire, timeout=-100)
160 self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
161 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
162 # TIMEOUT_MAX is ok
163 lock.acquire(timeout=TIMEOUT_MAX)
164 lock.release()
165 t1 = time.time()
166 self.assertTrue(lock.acquire(timeout=5))
167 t2 = time.time()
168 # Just a sanity test that it didn't actually wait for the timeout.
169 self.assertLess(t2 - t1, 5)
170 results = []
171 def f():
172 t1 = time.time()
173 results.append(lock.acquire(timeout=0.5))
174 t2 = time.time()
175 results.append(t2 - t1)
176 Bunch(f, 1).wait_for_finished()
177 self.assertFalse(results[0])
178 self.assertTimeout(results[1], 0.5)
179
Antoine Pitrou557934f2009-11-06 22:41:14 +0000180
181class LockTests(BaseLockTests):
182 """
183 Tests for non-recursive, weak locks
184 (which can be acquired and released from different threads).
185 """
186 def test_reacquire(self):
187 # Lock needs to be released before re-acquiring.
188 lock = self.locktype()
189 phase = []
190 def f():
191 lock.acquire()
192 phase.append(None)
193 lock.acquire()
194 phase.append(None)
195 start_new_thread(f, ())
196 while len(phase) == 0:
197 _wait()
198 _wait()
199 self.assertEqual(len(phase), 1)
200 lock.release()
201 while len(phase) == 1:
202 _wait()
203 self.assertEqual(len(phase), 2)
204
205 def test_different_thread(self):
206 # Lock can be released from a different thread.
207 lock = self.locktype()
208 lock.acquire()
209 def f():
210 lock.release()
211 b = Bunch(f, 1)
212 b.wait_for_finished()
213 lock.acquire()
214 lock.release()
215
Antoine Pitrou7899acf2011-03-31 01:00:32 +0200216 def test_state_after_timeout(self):
217 # Issue #11618: check that lock is in a proper state after a
218 # (non-zero) timeout.
219 lock = self.locktype()
220 lock.acquire()
221 self.assertFalse(lock.acquire(timeout=0.01))
222 lock.release()
223 self.assertFalse(lock.locked())
224 self.assertTrue(lock.acquire(blocking=False))
225
Antoine Pitrou557934f2009-11-06 22:41:14 +0000226
227class RLockTests(BaseLockTests):
228 """
229 Tests for recursive locks.
230 """
231 def test_reacquire(self):
232 lock = self.locktype()
233 lock.acquire()
234 lock.acquire()
235 lock.release()
236 lock.acquire()
237 lock.release()
238 lock.release()
239
240 def test_release_unacquired(self):
241 # Cannot release an unacquired lock
242 lock = self.locktype()
243 self.assertRaises(RuntimeError, lock.release)
244 lock.acquire()
245 lock.acquire()
246 lock.release()
247 lock.acquire()
248 lock.release()
249 lock.release()
250 self.assertRaises(RuntimeError, lock.release)
251
252 def test_different_thread(self):
253 # Cannot release from a different thread
254 lock = self.locktype()
255 def f():
256 lock.acquire()
257 b = Bunch(f, 1, True)
258 try:
259 self.assertRaises(RuntimeError, lock.release)
260 finally:
261 b.do_finish()
262
263 def test__is_owned(self):
264 lock = self.locktype()
265 self.assertFalse(lock._is_owned())
266 lock.acquire()
267 self.assertTrue(lock._is_owned())
268 lock.acquire()
269 self.assertTrue(lock._is_owned())
270 result = []
271 def f():
272 result.append(lock._is_owned())
273 Bunch(f, 1).wait_for_finished()
274 self.assertFalse(result[0])
275 lock.release()
276 self.assertTrue(lock._is_owned())
277 lock.release()
278 self.assertFalse(lock._is_owned())
279
280
281class EventTests(BaseTestCase):
282 """
283 Tests for Event objects.
284 """
285
286 def test_is_set(self):
287 evt = self.eventtype()
288 self.assertFalse(evt.is_set())
289 evt.set()
290 self.assertTrue(evt.is_set())
291 evt.set()
292 self.assertTrue(evt.is_set())
293 evt.clear()
294 self.assertFalse(evt.is_set())
295 evt.clear()
296 self.assertFalse(evt.is_set())
297
298 def _check_notify(self, evt):
299 # All threads get notified
300 N = 5
301 results1 = []
302 results2 = []
303 def f():
304 results1.append(evt.wait())
305 results2.append(evt.wait())
306 b = Bunch(f, N)
307 b.wait_for_started()
308 _wait()
309 self.assertEqual(len(results1), 0)
310 evt.set()
311 b.wait_for_finished()
312 self.assertEqual(results1, [True] * N)
313 self.assertEqual(results2, [True] * N)
314
315 def test_notify(self):
316 evt = self.eventtype()
317 self._check_notify(evt)
318 # Another time, after an explicit clear()
319 evt.set()
320 evt.clear()
321 self._check_notify(evt)
322
323 def test_timeout(self):
324 evt = self.eventtype()
325 results1 = []
326 results2 = []
327 N = 5
328 def f():
329 results1.append(evt.wait(0.0))
330 t1 = time.time()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000331 r = evt.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000332 t2 = time.time()
333 results2.append((r, t2 - t1))
334 Bunch(f, N).wait_for_finished()
335 self.assertEqual(results1, [False] * N)
336 for r, dt in results2:
337 self.assertFalse(r)
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000338 self.assertTimeout(dt, 0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000339 # The event is set
340 results1 = []
341 results2 = []
342 evt.set()
343 Bunch(f, N).wait_for_finished()
344 self.assertEqual(results1, [True] * N)
345 for r, dt in results2:
346 self.assertTrue(r)
347
348
349class ConditionTests(BaseTestCase):
350 """
351 Tests for condition variables.
352 """
353
354 def test_acquire(self):
355 cond = self.condtype()
356 # Be default we have an RLock: the condition can be acquired multiple
357 # times.
358 cond.acquire()
359 cond.acquire()
360 cond.release()
361 cond.release()
362 lock = threading.Lock()
363 cond = self.condtype(lock)
364 cond.acquire()
365 self.assertFalse(lock.acquire(False))
366 cond.release()
367 self.assertTrue(lock.acquire(False))
368 self.assertFalse(cond.acquire(False))
369 lock.release()
370 with cond:
371 self.assertFalse(lock.acquire(False))
372
373 def test_unacquired_wait(self):
374 cond = self.condtype()
375 self.assertRaises(RuntimeError, cond.wait)
376
377 def test_unacquired_notify(self):
378 cond = self.condtype()
379 self.assertRaises(RuntimeError, cond.notify)
380
381 def _check_notify(self, cond):
382 N = 5
383 results1 = []
384 results2 = []
385 phase_num = 0
386 def f():
387 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000388 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000389 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000390 results1.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000391 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000392 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000393 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000394 results2.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000395 b = Bunch(f, N)
396 b.wait_for_started()
397 _wait()
398 self.assertEqual(results1, [])
399 # Notify 3 threads at first
400 cond.acquire()
401 cond.notify(3)
402 _wait()
403 phase_num = 1
404 cond.release()
405 while len(results1) < 3:
406 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000407 self.assertEqual(results1, [(True, 1)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000408 self.assertEqual(results2, [])
409 # Notify 5 threads: they might be in their first or second wait
410 cond.acquire()
411 cond.notify(5)
412 _wait()
413 phase_num = 2
414 cond.release()
415 while len(results1) + len(results2) < 8:
416 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000417 self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
418 self.assertEqual(results2, [(True, 2)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000419 # Notify all threads: they are all in their second wait
420 cond.acquire()
421 cond.notify_all()
422 _wait()
423 phase_num = 3
424 cond.release()
425 while len(results2) < 5:
426 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000427 self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
428 self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000429 b.wait_for_finished()
430
431 def test_notify(self):
432 cond = self.condtype()
433 self._check_notify(cond)
434 # A second time, to check internal state is still ok.
435 self._check_notify(cond)
436
437 def test_timeout(self):
438 cond = self.condtype()
439 results = []
440 N = 5
441 def f():
442 cond.acquire()
443 t1 = time.time()
Georg Brandlb9a43912010-10-28 09:03:20 +0000444 result = cond.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000445 t2 = time.time()
446 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000447 results.append((t2 - t1, result))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000448 Bunch(f, N).wait_for_finished()
Georg Brandlb9a43912010-10-28 09:03:20 +0000449 self.assertEqual(len(results), N)
450 for dt, result in results:
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000451 self.assertTimeout(dt, 0.5)
Georg Brandlb9a43912010-10-28 09:03:20 +0000452 # Note that conceptually (that"s the condition variable protocol)
453 # a wait() may succeed even if no one notifies us and before any
454 # timeout occurs. Spurious wakeups can occur.
455 # This makes it hard to verify the result value.
456 # In practice, this implementation has no spurious wakeups.
457 self.assertFalse(result)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000458
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000459 def test_waitfor(self):
460 cond = self.condtype()
461 state = 0
462 def f():
463 with cond:
464 result = cond.wait_for(lambda : state==4)
465 self.assertTrue(result)
466 self.assertEqual(state, 4)
467 b = Bunch(f, 1)
468 b.wait_for_started()
469 for i in range(5):
470 time.sleep(0.01)
471 with cond:
472 state += 1
473 cond.notify()
474 b.wait_for_finished()
475
476 def test_waitfor_timeout(self):
477 cond = self.condtype()
478 state = 0
479 success = []
480 def f():
481 with cond:
482 dt = time.time()
483 result = cond.wait_for(lambda : state==4, timeout=0.1)
484 dt = time.time() - dt
485 self.assertFalse(result)
486 self.assertTimeout(dt, 0.1)
487 success.append(None)
488 b = Bunch(f, 1)
489 b.wait_for_started()
490 # Only increment 3 times, so state == 4 is never reached.
491 for i in range(3):
492 time.sleep(0.01)
493 with cond:
494 state += 1
495 cond.notify()
496 b.wait_for_finished()
497 self.assertEqual(len(success), 1)
498
Antoine Pitrou557934f2009-11-06 22:41:14 +0000499
500class BaseSemaphoreTests(BaseTestCase):
501 """
502 Common tests for {bounded, unbounded} semaphore objects.
503 """
504
505 def test_constructor(self):
506 self.assertRaises(ValueError, self.semtype, value = -1)
507 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
508
509 def test_acquire(self):
510 sem = self.semtype(1)
511 sem.acquire()
512 sem.release()
513 sem = self.semtype(2)
514 sem.acquire()
515 sem.acquire()
516 sem.release()
517 sem.release()
518
519 def test_acquire_destroy(self):
520 sem = self.semtype()
521 sem.acquire()
522 del sem
523
524 def test_acquire_contended(self):
525 sem = self.semtype(7)
526 sem.acquire()
527 N = 10
528 results1 = []
529 results2 = []
530 phase_num = 0
531 def f():
532 sem.acquire()
533 results1.append(phase_num)
534 sem.acquire()
535 results2.append(phase_num)
536 b = Bunch(f, 10)
537 b.wait_for_started()
538 while len(results1) + len(results2) < 6:
539 _wait()
540 self.assertEqual(results1 + results2, [0] * 6)
541 phase_num = 1
542 for i in range(7):
543 sem.release()
544 while len(results1) + len(results2) < 13:
545 _wait()
546 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
547 phase_num = 2
548 for i in range(6):
549 sem.release()
550 while len(results1) + len(results2) < 19:
551 _wait()
552 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
553 # The semaphore is still locked
554 self.assertFalse(sem.acquire(False))
555 # Final release, to let the last thread finish
556 sem.release()
557 b.wait_for_finished()
558
559 def test_try_acquire(self):
560 sem = self.semtype(2)
561 self.assertTrue(sem.acquire(False))
562 self.assertTrue(sem.acquire(False))
563 self.assertFalse(sem.acquire(False))
564 sem.release()
565 self.assertTrue(sem.acquire(False))
566
567 def test_try_acquire_contended(self):
568 sem = self.semtype(4)
569 sem.acquire()
570 results = []
571 def f():
572 results.append(sem.acquire(False))
573 results.append(sem.acquire(False))
574 Bunch(f, 5).wait_for_finished()
575 # There can be a thread switch between acquiring the semaphore and
576 # appending the result, therefore results will not necessarily be
577 # ordered.
578 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
579
Antoine Pitrou0454af92010-04-17 23:51:58 +0000580 def test_acquire_timeout(self):
581 sem = self.semtype(2)
582 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
583 self.assertTrue(sem.acquire(timeout=0.005))
584 self.assertTrue(sem.acquire(timeout=0.005))
585 self.assertFalse(sem.acquire(timeout=0.005))
586 sem.release()
587 self.assertTrue(sem.acquire(timeout=0.005))
588 t = time.time()
589 self.assertFalse(sem.acquire(timeout=0.5))
590 dt = time.time() - t
591 self.assertTimeout(dt, 0.5)
592
Antoine Pitrou557934f2009-11-06 22:41:14 +0000593 def test_default_value(self):
594 # The default initial value is 1.
595 sem = self.semtype()
596 sem.acquire()
597 def f():
598 sem.acquire()
599 sem.release()
600 b = Bunch(f, 1)
601 b.wait_for_started()
602 _wait()
603 self.assertFalse(b.finished)
604 sem.release()
605 b.wait_for_finished()
606
607 def test_with(self):
608 sem = self.semtype(2)
609 def _with(err=None):
610 with sem:
611 self.assertTrue(sem.acquire(False))
612 sem.release()
613 with sem:
614 self.assertFalse(sem.acquire(False))
615 if err:
616 raise err
617 _with()
618 self.assertTrue(sem.acquire(False))
619 sem.release()
620 self.assertRaises(TypeError, _with, TypeError)
621 self.assertTrue(sem.acquire(False))
622 sem.release()
623
624class SemaphoreTests(BaseSemaphoreTests):
625 """
626 Tests for unbounded semaphores.
627 """
628
629 def test_release_unacquired(self):
630 # Unbounded releases are allowed and increment the semaphore's value
631 sem = self.semtype(1)
632 sem.release()
633 sem.acquire()
634 sem.acquire()
635 sem.release()
636
637
638class BoundedSemaphoreTests(BaseSemaphoreTests):
639 """
640 Tests for bounded semaphores.
641 """
642
643 def test_release_unacquired(self):
644 # Cannot go past the initial value
645 sem = self.semtype()
646 self.assertRaises(ValueError, sem.release)
647 sem.acquire()
648 sem.release()
649 self.assertRaises(ValueError, sem.release)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000650
651
652class BarrierTests(BaseTestCase):
653 """
654 Tests for Barrier objects.
655 """
656 N = 5
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000657 defaultTimeout = 2.0
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000658
659 def setUp(self):
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000660 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000661 def tearDown(self):
662 self.barrier.abort()
663
664 def run_threads(self, f):
665 b = Bunch(f, self.N-1)
666 f()
667 b.wait_for_finished()
668
669 def multipass(self, results, n):
670 m = self.barrier.parties
671 self.assertEqual(m, self.N)
672 for i in range(n):
673 results[0].append(True)
674 self.assertEqual(len(results[1]), i * m)
675 self.barrier.wait()
676 results[1].append(True)
677 self.assertEqual(len(results[0]), (i + 1) * m)
678 self.barrier.wait()
679 self.assertEqual(self.barrier.n_waiting, 0)
680 self.assertFalse(self.barrier.broken)
681
682 def test_barrier(self, passes=1):
683 """
684 Test that a barrier is passed in lockstep
685 """
686 results = [[],[]]
687 def f():
688 self.multipass(results, passes)
689 self.run_threads(f)
690
691 def test_barrier_10(self):
692 """
693 Test that a barrier works for 10 consecutive runs
694 """
695 return self.test_barrier(10)
696
697 def test_wait_return(self):
698 """
699 test the return value from barrier.wait
700 """
701 results = []
702 def f():
703 r = self.barrier.wait()
704 results.append(r)
705
706 self.run_threads(f)
707 self.assertEqual(sum(results), sum(range(self.N)))
708
709 def test_action(self):
710 """
711 Test the 'action' callback
712 """
713 results = []
714 def action():
715 results.append(True)
716 barrier = self.barriertype(self.N, action)
717 def f():
718 barrier.wait()
719 self.assertEqual(len(results), 1)
720
721 self.run_threads(f)
722
723 def test_abort(self):
724 """
725 Test that an abort will put the barrier in a broken state
726 """
727 results1 = []
728 results2 = []
729 def f():
730 try:
731 i = self.barrier.wait()
732 if i == self.N//2:
733 raise RuntimeError
734 self.barrier.wait()
735 results1.append(True)
736 except threading.BrokenBarrierError:
737 results2.append(True)
738 except RuntimeError:
739 self.barrier.abort()
740 pass
741
742 self.run_threads(f)
743 self.assertEqual(len(results1), 0)
744 self.assertEqual(len(results2), self.N-1)
745 self.assertTrue(self.barrier.broken)
746
747 def test_reset(self):
748 """
749 Test that a 'reset' on a barrier frees the waiting threads
750 """
751 results1 = []
752 results2 = []
753 results3 = []
754 def f():
755 i = self.barrier.wait()
756 if i == self.N//2:
757 # Wait until the other threads are all in the barrier.
758 while self.barrier.n_waiting < self.N-1:
759 time.sleep(0.001)
760 self.barrier.reset()
761 else:
762 try:
763 self.barrier.wait()
764 results1.append(True)
765 except threading.BrokenBarrierError:
766 results2.append(True)
767 # Now, pass the barrier again
768 self.barrier.wait()
769 results3.append(True)
770
771 self.run_threads(f)
772 self.assertEqual(len(results1), 0)
773 self.assertEqual(len(results2), self.N-1)
774 self.assertEqual(len(results3), self.N)
775
776
777 def test_abort_and_reset(self):
778 """
779 Test that a barrier can be reset after being broken.
780 """
781 results1 = []
782 results2 = []
783 results3 = []
784 barrier2 = self.barriertype(self.N)
785 def f():
786 try:
787 i = self.barrier.wait()
788 if i == self.N//2:
789 raise RuntimeError
790 self.barrier.wait()
791 results1.append(True)
792 except threading.BrokenBarrierError:
793 results2.append(True)
794 except RuntimeError:
795 self.barrier.abort()
796 pass
797 # Synchronize and reset the barrier. Must synchronize first so
798 # that everyone has left it when we reset, and after so that no
799 # one enters it before the reset.
800 if barrier2.wait() == self.N//2:
801 self.barrier.reset()
802 barrier2.wait()
803 self.barrier.wait()
804 results3.append(True)
805
806 self.run_threads(f)
807 self.assertEqual(len(results1), 0)
808 self.assertEqual(len(results2), self.N-1)
809 self.assertEqual(len(results3), self.N)
810
811 def test_timeout(self):
812 """
813 Test wait(timeout)
814 """
815 def f():
816 i = self.barrier.wait()
817 if i == self.N // 2:
818 # One thread is late!
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000819 time.sleep(1.0)
820 # Default timeout is 2.0, so this is shorter.
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000821 self.assertRaises(threading.BrokenBarrierError,
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000822 self.barrier.wait, 0.5)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000823 self.run_threads(f)
824
825 def test_default_timeout(self):
826 """
827 Test the barrier's default timeout
828 """
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000829 #create a barrier with a low default timeout
830 barrier = self.barriertype(self.N, timeout=0.1)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000831 def f():
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000832 i = barrier.wait()
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000833 if i == self.N // 2:
834 # One thread is later than the default timeout of 0.1s.
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000835 time.sleep(1.0)
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000836 self.assertRaises(threading.BrokenBarrierError, barrier.wait)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000837 self.run_threads(f)
838
839 def test_single_thread(self):
840 b = self.barriertype(1)
841 b.wait()
842 b.wait()