blob: 7bcc43607fe57d29e788fc510ab1e64085be923b [file] [log] [blame]
Antoine Pitrou557934f2009-11-06 22:41:14 +00001"""
2Various tests for synchronization primitives.
3"""
4
5import sys
6import time
Victor Stinner2a129742011-05-30 23:02:52 +02007from _thread import start_new_thread, TIMEOUT_MAX
Antoine Pitrou557934f2009-11-06 22:41:14 +00008import threading
9import unittest
10
11from test import support
12
13
14def _wait():
15 # A crude wait/yield function not relying on synchronization primitives.
16 time.sleep(0.01)
17
18class Bunch(object):
19 """
20 A bunch of threads.
21 """
22 def __init__(self, f, n, wait_before_exit=False):
23 """
24 Construct a bunch of `n` threads running the same function `f`.
25 If `wait_before_exit` is True, the threads won't terminate until
26 do_finish() is called.
27 """
28 self.f = f
29 self.n = n
30 self.started = []
31 self.finished = []
32 self._can_exit = not wait_before_exit
33 def task():
Victor Stinner2a129742011-05-30 23:02:52 +020034 tid = threading.get_ident()
Antoine Pitrou557934f2009-11-06 22:41:14 +000035 self.started.append(tid)
36 try:
37 f()
38 finally:
39 self.finished.append(tid)
40 while not self._can_exit:
41 _wait()
42 for i in range(n):
43 start_new_thread(task, ())
44
45 def wait_for_started(self):
46 while len(self.started) < self.n:
47 _wait()
48
49 def wait_for_finished(self):
50 while len(self.finished) < self.n:
51 _wait()
52
53 def do_finish(self):
54 self._can_exit = True
55
56
57class BaseTestCase(unittest.TestCase):
58 def setUp(self):
59 self._threads = support.threading_setup()
60
61 def tearDown(self):
62 support.threading_cleanup(*self._threads)
63 support.reap_children()
64
Antoine Pitrou7c3e5772010-04-14 15:44:10 +000065 def assertTimeout(self, actual, expected):
66 # The waiting and/or time.time() can be imprecise, which
67 # is why comparing to the expected value would sometimes fail
68 # (especially under Windows).
69 self.assertGreaterEqual(actual, expected * 0.6)
70 # Test nothing insane happened
71 self.assertLess(actual, expected * 10.0)
72
Antoine Pitrou557934f2009-11-06 22:41:14 +000073
74class BaseLockTests(BaseTestCase):
75 """
76 Tests for both recursive and non-recursive locks.
77 """
78
79 def test_constructor(self):
80 lock = self.locktype()
81 del lock
82
83 def test_acquire_destroy(self):
84 lock = self.locktype()
85 lock.acquire()
86 del lock
87
88 def test_acquire_release(self):
89 lock = self.locktype()
90 lock.acquire()
91 lock.release()
92 del lock
93
94 def test_try_acquire(self):
95 lock = self.locktype()
96 self.assertTrue(lock.acquire(False))
97 lock.release()
98
99 def test_try_acquire_contended(self):
100 lock = self.locktype()
101 lock.acquire()
102 result = []
103 def f():
104 result.append(lock.acquire(False))
105 Bunch(f, 1).wait_for_finished()
106 self.assertFalse(result[0])
107 lock.release()
108
109 def test_acquire_contended(self):
110 lock = self.locktype()
111 lock.acquire()
112 N = 5
113 def f():
114 lock.acquire()
115 lock.release()
116
117 b = Bunch(f, N)
118 b.wait_for_started()
119 _wait()
120 self.assertEqual(len(b.finished), 0)
121 lock.release()
122 b.wait_for_finished()
123 self.assertEqual(len(b.finished), N)
124
125 def test_with(self):
126 lock = self.locktype()
127 def f():
128 lock.acquire()
129 lock.release()
130 def _with(err=None):
131 with lock:
132 if err is not None:
133 raise err
134 _with()
135 # Check the lock is unacquired
136 Bunch(f, 1).wait_for_finished()
137 self.assertRaises(TypeError, _with, TypeError)
138 # Check the lock is unacquired
139 Bunch(f, 1).wait_for_finished()
140
Antoine Pitroub0872682009-11-09 16:08:16 +0000141 def test_thread_leak(self):
142 # The lock shouldn't leak a Thread instance when used from a foreign
143 # (non-threading) thread.
144 lock = self.locktype()
145 def f():
146 lock.acquire()
147 lock.release()
148 n = len(threading.enumerate())
149 # We run many threads in the hope that existing threads ids won't
150 # be recycled.
151 Bunch(f, 15).wait_for_finished()
Antoine Pitrou45fdb452011-04-04 21:59:09 +0200152 if len(threading.enumerate()) != n:
153 # There is a small window during which a Thread instance's
154 # target function has finished running, but the Thread is still
155 # alive and registered. Avoid spurious failures by waiting a
156 # bit more (seen on a buildbot).
157 time.sleep(0.4)
158 self.assertEqual(n, len(threading.enumerate()))
Antoine Pitroub0872682009-11-09 16:08:16 +0000159
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000160 def test_timeout(self):
161 lock = self.locktype()
162 # Can't set timeout if not blocking
163 self.assertRaises(ValueError, lock.acquire, 0, 1)
164 # Invalid timeout values
165 self.assertRaises(ValueError, lock.acquire, timeout=-100)
166 self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
167 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
168 # TIMEOUT_MAX is ok
169 lock.acquire(timeout=TIMEOUT_MAX)
170 lock.release()
171 t1 = time.time()
172 self.assertTrue(lock.acquire(timeout=5))
173 t2 = time.time()
174 # Just a sanity test that it didn't actually wait for the timeout.
175 self.assertLess(t2 - t1, 5)
176 results = []
177 def f():
178 t1 = time.time()
179 results.append(lock.acquire(timeout=0.5))
180 t2 = time.time()
181 results.append(t2 - t1)
182 Bunch(f, 1).wait_for_finished()
183 self.assertFalse(results[0])
184 self.assertTimeout(results[1], 0.5)
185
Antoine Pitrou557934f2009-11-06 22:41:14 +0000186
187class LockTests(BaseLockTests):
188 """
189 Tests for non-recursive, weak locks
190 (which can be acquired and released from different threads).
191 """
192 def test_reacquire(self):
193 # Lock needs to be released before re-acquiring.
194 lock = self.locktype()
195 phase = []
196 def f():
197 lock.acquire()
198 phase.append(None)
199 lock.acquire()
200 phase.append(None)
201 start_new_thread(f, ())
202 while len(phase) == 0:
203 _wait()
204 _wait()
205 self.assertEqual(len(phase), 1)
206 lock.release()
207 while len(phase) == 1:
208 _wait()
209 self.assertEqual(len(phase), 2)
210
211 def test_different_thread(self):
212 # Lock can be released from a different thread.
213 lock = self.locktype()
214 lock.acquire()
215 def f():
216 lock.release()
217 b = Bunch(f, 1)
218 b.wait_for_finished()
219 lock.acquire()
220 lock.release()
221
Antoine Pitrou7899acf2011-03-31 01:00:32 +0200222 def test_state_after_timeout(self):
223 # Issue #11618: check that lock is in a proper state after a
224 # (non-zero) timeout.
225 lock = self.locktype()
226 lock.acquire()
227 self.assertFalse(lock.acquire(timeout=0.01))
228 lock.release()
229 self.assertFalse(lock.locked())
230 self.assertTrue(lock.acquire(blocking=False))
231
Antoine Pitrou557934f2009-11-06 22:41:14 +0000232
233class RLockTests(BaseLockTests):
234 """
235 Tests for recursive locks.
236 """
237 def test_reacquire(self):
238 lock = self.locktype()
239 lock.acquire()
240 lock.acquire()
241 lock.release()
242 lock.acquire()
243 lock.release()
244 lock.release()
245
246 def test_release_unacquired(self):
247 # Cannot release an unacquired lock
248 lock = self.locktype()
249 self.assertRaises(RuntimeError, lock.release)
Victor Stinnerc2824d42011-04-24 23:41:33 +0200250 self.assertRaises(RuntimeError, lock._release_save)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000251 lock.acquire()
252 lock.acquire()
253 lock.release()
254 lock.acquire()
255 lock.release()
256 lock.release()
257 self.assertRaises(RuntimeError, lock.release)
Victor Stinnerc2824d42011-04-24 23:41:33 +0200258 self.assertRaises(RuntimeError, lock._release_save)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000259
260 def test_different_thread(self):
261 # Cannot release from a different thread
262 lock = self.locktype()
263 def f():
264 lock.acquire()
265 b = Bunch(f, 1, True)
266 try:
267 self.assertRaises(RuntimeError, lock.release)
268 finally:
269 b.do_finish()
270
271 def test__is_owned(self):
272 lock = self.locktype()
273 self.assertFalse(lock._is_owned())
274 lock.acquire()
275 self.assertTrue(lock._is_owned())
276 lock.acquire()
277 self.assertTrue(lock._is_owned())
278 result = []
279 def f():
280 result.append(lock._is_owned())
281 Bunch(f, 1).wait_for_finished()
282 self.assertFalse(result[0])
283 lock.release()
284 self.assertTrue(lock._is_owned())
285 lock.release()
286 self.assertFalse(lock._is_owned())
287
288
289class EventTests(BaseTestCase):
290 """
291 Tests for Event objects.
292 """
293
294 def test_is_set(self):
295 evt = self.eventtype()
296 self.assertFalse(evt.is_set())
297 evt.set()
298 self.assertTrue(evt.is_set())
299 evt.set()
300 self.assertTrue(evt.is_set())
301 evt.clear()
302 self.assertFalse(evt.is_set())
303 evt.clear()
304 self.assertFalse(evt.is_set())
305
306 def _check_notify(self, evt):
307 # All threads get notified
308 N = 5
309 results1 = []
310 results2 = []
311 def f():
312 results1.append(evt.wait())
313 results2.append(evt.wait())
314 b = Bunch(f, N)
315 b.wait_for_started()
316 _wait()
317 self.assertEqual(len(results1), 0)
318 evt.set()
319 b.wait_for_finished()
320 self.assertEqual(results1, [True] * N)
321 self.assertEqual(results2, [True] * N)
322
323 def test_notify(self):
324 evt = self.eventtype()
325 self._check_notify(evt)
326 # Another time, after an explicit clear()
327 evt.set()
328 evt.clear()
329 self._check_notify(evt)
330
331 def test_timeout(self):
332 evt = self.eventtype()
333 results1 = []
334 results2 = []
335 N = 5
336 def f():
337 results1.append(evt.wait(0.0))
338 t1 = time.time()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000339 r = evt.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000340 t2 = time.time()
341 results2.append((r, t2 - t1))
342 Bunch(f, N).wait_for_finished()
343 self.assertEqual(results1, [False] * N)
344 for r, dt in results2:
345 self.assertFalse(r)
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000346 self.assertTimeout(dt, 0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000347 # The event is set
348 results1 = []
349 results2 = []
350 evt.set()
351 Bunch(f, N).wait_for_finished()
352 self.assertEqual(results1, [True] * N)
353 for r, dt in results2:
354 self.assertTrue(r)
355
356
357class ConditionTests(BaseTestCase):
358 """
359 Tests for condition variables.
360 """
361
362 def test_acquire(self):
363 cond = self.condtype()
364 # Be default we have an RLock: the condition can be acquired multiple
365 # times.
366 cond.acquire()
367 cond.acquire()
368 cond.release()
369 cond.release()
370 lock = threading.Lock()
371 cond = self.condtype(lock)
372 cond.acquire()
373 self.assertFalse(lock.acquire(False))
374 cond.release()
375 self.assertTrue(lock.acquire(False))
376 self.assertFalse(cond.acquire(False))
377 lock.release()
378 with cond:
379 self.assertFalse(lock.acquire(False))
380
381 def test_unacquired_wait(self):
382 cond = self.condtype()
383 self.assertRaises(RuntimeError, cond.wait)
384
385 def test_unacquired_notify(self):
386 cond = self.condtype()
387 self.assertRaises(RuntimeError, cond.notify)
388
389 def _check_notify(self, cond):
390 N = 5
391 results1 = []
392 results2 = []
393 phase_num = 0
394 def f():
395 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000396 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000397 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000398 results1.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000399 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000400 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000401 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000402 results2.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000403 b = Bunch(f, N)
404 b.wait_for_started()
405 _wait()
406 self.assertEqual(results1, [])
407 # Notify 3 threads at first
408 cond.acquire()
409 cond.notify(3)
410 _wait()
411 phase_num = 1
412 cond.release()
413 while len(results1) < 3:
414 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000415 self.assertEqual(results1, [(True, 1)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000416 self.assertEqual(results2, [])
417 # Notify 5 threads: they might be in their first or second wait
418 cond.acquire()
419 cond.notify(5)
420 _wait()
421 phase_num = 2
422 cond.release()
423 while len(results1) + len(results2) < 8:
424 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000425 self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
426 self.assertEqual(results2, [(True, 2)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000427 # Notify all threads: they are all in their second wait
428 cond.acquire()
429 cond.notify_all()
430 _wait()
431 phase_num = 3
432 cond.release()
433 while len(results2) < 5:
434 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000435 self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
436 self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000437 b.wait_for_finished()
438
439 def test_notify(self):
440 cond = self.condtype()
441 self._check_notify(cond)
442 # A second time, to check internal state is still ok.
443 self._check_notify(cond)
444
445 def test_timeout(self):
446 cond = self.condtype()
447 results = []
448 N = 5
449 def f():
450 cond.acquire()
451 t1 = time.time()
Georg Brandlb9a43912010-10-28 09:03:20 +0000452 result = cond.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000453 t2 = time.time()
454 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000455 results.append((t2 - t1, result))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000456 Bunch(f, N).wait_for_finished()
Georg Brandlb9a43912010-10-28 09:03:20 +0000457 self.assertEqual(len(results), N)
458 for dt, result in results:
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000459 self.assertTimeout(dt, 0.5)
Georg Brandlb9a43912010-10-28 09:03:20 +0000460 # Note that conceptually (that"s the condition variable protocol)
461 # a wait() may succeed even if no one notifies us and before any
462 # timeout occurs. Spurious wakeups can occur.
463 # This makes it hard to verify the result value.
464 # In practice, this implementation has no spurious wakeups.
465 self.assertFalse(result)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000466
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000467 def test_waitfor(self):
468 cond = self.condtype()
469 state = 0
470 def f():
471 with cond:
472 result = cond.wait_for(lambda : state==4)
473 self.assertTrue(result)
474 self.assertEqual(state, 4)
475 b = Bunch(f, 1)
476 b.wait_for_started()
Victor Stinner3349bca2011-05-18 00:16:14 +0200477 for i in range(4):
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000478 time.sleep(0.01)
479 with cond:
480 state += 1
481 cond.notify()
482 b.wait_for_finished()
483
484 def test_waitfor_timeout(self):
485 cond = self.condtype()
486 state = 0
487 success = []
488 def f():
489 with cond:
490 dt = time.time()
491 result = cond.wait_for(lambda : state==4, timeout=0.1)
492 dt = time.time() - dt
493 self.assertFalse(result)
494 self.assertTimeout(dt, 0.1)
495 success.append(None)
496 b = Bunch(f, 1)
497 b.wait_for_started()
498 # Only increment 3 times, so state == 4 is never reached.
499 for i in range(3):
500 time.sleep(0.01)
501 with cond:
502 state += 1
503 cond.notify()
504 b.wait_for_finished()
505 self.assertEqual(len(success), 1)
506
Antoine Pitrou557934f2009-11-06 22:41:14 +0000507
508class BaseSemaphoreTests(BaseTestCase):
509 """
510 Common tests for {bounded, unbounded} semaphore objects.
511 """
512
513 def test_constructor(self):
514 self.assertRaises(ValueError, self.semtype, value = -1)
515 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
516
517 def test_acquire(self):
518 sem = self.semtype(1)
519 sem.acquire()
520 sem.release()
521 sem = self.semtype(2)
522 sem.acquire()
523 sem.acquire()
524 sem.release()
525 sem.release()
526
527 def test_acquire_destroy(self):
528 sem = self.semtype()
529 sem.acquire()
530 del sem
531
532 def test_acquire_contended(self):
533 sem = self.semtype(7)
534 sem.acquire()
535 N = 10
536 results1 = []
537 results2 = []
538 phase_num = 0
539 def f():
540 sem.acquire()
541 results1.append(phase_num)
542 sem.acquire()
543 results2.append(phase_num)
544 b = Bunch(f, 10)
545 b.wait_for_started()
546 while len(results1) + len(results2) < 6:
547 _wait()
548 self.assertEqual(results1 + results2, [0] * 6)
549 phase_num = 1
550 for i in range(7):
551 sem.release()
552 while len(results1) + len(results2) < 13:
553 _wait()
554 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
555 phase_num = 2
556 for i in range(6):
557 sem.release()
558 while len(results1) + len(results2) < 19:
559 _wait()
560 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
561 # The semaphore is still locked
562 self.assertFalse(sem.acquire(False))
563 # Final release, to let the last thread finish
564 sem.release()
565 b.wait_for_finished()
566
567 def test_try_acquire(self):
568 sem = self.semtype(2)
569 self.assertTrue(sem.acquire(False))
570 self.assertTrue(sem.acquire(False))
571 self.assertFalse(sem.acquire(False))
572 sem.release()
573 self.assertTrue(sem.acquire(False))
574
575 def test_try_acquire_contended(self):
576 sem = self.semtype(4)
577 sem.acquire()
578 results = []
579 def f():
580 results.append(sem.acquire(False))
581 results.append(sem.acquire(False))
582 Bunch(f, 5).wait_for_finished()
583 # There can be a thread switch between acquiring the semaphore and
584 # appending the result, therefore results will not necessarily be
585 # ordered.
586 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
587
Antoine Pitrou0454af92010-04-17 23:51:58 +0000588 def test_acquire_timeout(self):
589 sem = self.semtype(2)
590 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
591 self.assertTrue(sem.acquire(timeout=0.005))
592 self.assertTrue(sem.acquire(timeout=0.005))
593 self.assertFalse(sem.acquire(timeout=0.005))
594 sem.release()
595 self.assertTrue(sem.acquire(timeout=0.005))
596 t = time.time()
597 self.assertFalse(sem.acquire(timeout=0.5))
598 dt = time.time() - t
599 self.assertTimeout(dt, 0.5)
600
Antoine Pitrou557934f2009-11-06 22:41:14 +0000601 def test_default_value(self):
602 # The default initial value is 1.
603 sem = self.semtype()
604 sem.acquire()
605 def f():
606 sem.acquire()
607 sem.release()
608 b = Bunch(f, 1)
609 b.wait_for_started()
610 _wait()
611 self.assertFalse(b.finished)
612 sem.release()
613 b.wait_for_finished()
614
615 def test_with(self):
616 sem = self.semtype(2)
617 def _with(err=None):
618 with sem:
619 self.assertTrue(sem.acquire(False))
620 sem.release()
621 with sem:
622 self.assertFalse(sem.acquire(False))
623 if err:
624 raise err
625 _with()
626 self.assertTrue(sem.acquire(False))
627 sem.release()
628 self.assertRaises(TypeError, _with, TypeError)
629 self.assertTrue(sem.acquire(False))
630 sem.release()
631
632class SemaphoreTests(BaseSemaphoreTests):
633 """
634 Tests for unbounded semaphores.
635 """
636
637 def test_release_unacquired(self):
638 # Unbounded releases are allowed and increment the semaphore's value
639 sem = self.semtype(1)
640 sem.release()
641 sem.acquire()
642 sem.acquire()
643 sem.release()
644
645
646class BoundedSemaphoreTests(BaseSemaphoreTests):
647 """
648 Tests for bounded semaphores.
649 """
650
651 def test_release_unacquired(self):
652 # Cannot go past the initial value
653 sem = self.semtype()
654 self.assertRaises(ValueError, sem.release)
655 sem.acquire()
656 sem.release()
657 self.assertRaises(ValueError, sem.release)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000658
659
660class BarrierTests(BaseTestCase):
661 """
662 Tests for Barrier objects.
663 """
664 N = 5
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000665 defaultTimeout = 2.0
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000666
667 def setUp(self):
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000668 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000669 def tearDown(self):
670 self.barrier.abort()
671
672 def run_threads(self, f):
673 b = Bunch(f, self.N-1)
674 f()
675 b.wait_for_finished()
676
677 def multipass(self, results, n):
678 m = self.barrier.parties
679 self.assertEqual(m, self.N)
680 for i in range(n):
681 results[0].append(True)
682 self.assertEqual(len(results[1]), i * m)
683 self.barrier.wait()
684 results[1].append(True)
685 self.assertEqual(len(results[0]), (i + 1) * m)
686 self.barrier.wait()
687 self.assertEqual(self.barrier.n_waiting, 0)
688 self.assertFalse(self.barrier.broken)
689
690 def test_barrier(self, passes=1):
691 """
692 Test that a barrier is passed in lockstep
693 """
694 results = [[],[]]
695 def f():
696 self.multipass(results, passes)
697 self.run_threads(f)
698
699 def test_barrier_10(self):
700 """
701 Test that a barrier works for 10 consecutive runs
702 """
703 return self.test_barrier(10)
704
705 def test_wait_return(self):
706 """
707 test the return value from barrier.wait
708 """
709 results = []
710 def f():
711 r = self.barrier.wait()
712 results.append(r)
713
714 self.run_threads(f)
715 self.assertEqual(sum(results), sum(range(self.N)))
716
717 def test_action(self):
718 """
719 Test the 'action' callback
720 """
721 results = []
722 def action():
723 results.append(True)
724 barrier = self.barriertype(self.N, action)
725 def f():
726 barrier.wait()
727 self.assertEqual(len(results), 1)
728
729 self.run_threads(f)
730
731 def test_abort(self):
732 """
733 Test that an abort will put the barrier in a broken state
734 """
735 results1 = []
736 results2 = []
737 def f():
738 try:
739 i = self.barrier.wait()
740 if i == self.N//2:
741 raise RuntimeError
742 self.barrier.wait()
743 results1.append(True)
744 except threading.BrokenBarrierError:
745 results2.append(True)
746 except RuntimeError:
747 self.barrier.abort()
748 pass
749
750 self.run_threads(f)
751 self.assertEqual(len(results1), 0)
752 self.assertEqual(len(results2), self.N-1)
753 self.assertTrue(self.barrier.broken)
754
755 def test_reset(self):
756 """
757 Test that a 'reset' on a barrier frees the waiting threads
758 """
759 results1 = []
760 results2 = []
761 results3 = []
762 def f():
763 i = self.barrier.wait()
764 if i == self.N//2:
765 # Wait until the other threads are all in the barrier.
766 while self.barrier.n_waiting < self.N-1:
767 time.sleep(0.001)
768 self.barrier.reset()
769 else:
770 try:
771 self.barrier.wait()
772 results1.append(True)
773 except threading.BrokenBarrierError:
774 results2.append(True)
775 # Now, pass the barrier again
776 self.barrier.wait()
777 results3.append(True)
778
779 self.run_threads(f)
780 self.assertEqual(len(results1), 0)
781 self.assertEqual(len(results2), self.N-1)
782 self.assertEqual(len(results3), self.N)
783
784
785 def test_abort_and_reset(self):
786 """
787 Test that a barrier can be reset after being broken.
788 """
789 results1 = []
790 results2 = []
791 results3 = []
792 barrier2 = self.barriertype(self.N)
793 def f():
794 try:
795 i = self.barrier.wait()
796 if i == self.N//2:
797 raise RuntimeError
798 self.barrier.wait()
799 results1.append(True)
800 except threading.BrokenBarrierError:
801 results2.append(True)
802 except RuntimeError:
803 self.barrier.abort()
804 pass
805 # Synchronize and reset the barrier. Must synchronize first so
806 # that everyone has left it when we reset, and after so that no
807 # one enters it before the reset.
808 if barrier2.wait() == self.N//2:
809 self.barrier.reset()
810 barrier2.wait()
811 self.barrier.wait()
812 results3.append(True)
813
814 self.run_threads(f)
815 self.assertEqual(len(results1), 0)
816 self.assertEqual(len(results2), self.N-1)
817 self.assertEqual(len(results3), self.N)
818
819 def test_timeout(self):
820 """
821 Test wait(timeout)
822 """
823 def f():
824 i = self.barrier.wait()
825 if i == self.N // 2:
826 # One thread is late!
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000827 time.sleep(1.0)
828 # Default timeout is 2.0, so this is shorter.
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000829 self.assertRaises(threading.BrokenBarrierError,
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000830 self.barrier.wait, 0.5)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000831 self.run_threads(f)
832
833 def test_default_timeout(self):
834 """
835 Test the barrier's default timeout
836 """
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000837 #create a barrier with a low default timeout
838 barrier = self.barriertype(self.N, timeout=0.1)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000839 def f():
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000840 i = barrier.wait()
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000841 if i == self.N // 2:
842 # One thread is later than the default timeout of 0.1s.
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000843 time.sleep(1.0)
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000844 self.assertRaises(threading.BrokenBarrierError, barrier.wait)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000845 self.run_threads(f)
846
847 def test_single_thread(self):
848 b = self.barriertype(1)
849 b.wait()
850 b.wait()