blob: bfbf44e0db70156131ebd3c1d65f3b487f9bb6b4 [file] [log] [blame]
Antoine Pitrou557934f2009-11-06 22:41:14 +00001"""
2Various tests for synchronization primitives.
3"""
4
5import sys
6import time
Victor Stinner2a129742011-05-30 23:02:52 +02007from _thread import start_new_thread, TIMEOUT_MAX
Antoine Pitrou557934f2009-11-06 22:41:14 +00008import threading
9import unittest
10
11from test import support
12
13
14def _wait():
15 # A crude wait/yield function not relying on synchronization primitives.
16 time.sleep(0.01)
17
18class Bunch(object):
19 """
20 A bunch of threads.
21 """
22 def __init__(self, f, n, wait_before_exit=False):
23 """
24 Construct a bunch of `n` threads running the same function `f`.
25 If `wait_before_exit` is True, the threads won't terminate until
26 do_finish() is called.
27 """
28 self.f = f
29 self.n = n
30 self.started = []
31 self.finished = []
32 self._can_exit = not wait_before_exit
33 def task():
Victor Stinner2a129742011-05-30 23:02:52 +020034 tid = threading.get_ident()
Antoine Pitrou557934f2009-11-06 22:41:14 +000035 self.started.append(tid)
36 try:
37 f()
38 finally:
39 self.finished.append(tid)
40 while not self._can_exit:
41 _wait()
42 for i in range(n):
43 start_new_thread(task, ())
44
45 def wait_for_started(self):
46 while len(self.started) < self.n:
47 _wait()
48
49 def wait_for_finished(self):
50 while len(self.finished) < self.n:
51 _wait()
52
53 def do_finish(self):
54 self._can_exit = True
55
56
57class BaseTestCase(unittest.TestCase):
58 def setUp(self):
59 self._threads = support.threading_setup()
60
61 def tearDown(self):
62 support.threading_cleanup(*self._threads)
63 support.reap_children()
64
Antoine Pitrou7c3e5772010-04-14 15:44:10 +000065 def assertTimeout(self, actual, expected):
66 # The waiting and/or time.time() can be imprecise, which
67 # is why comparing to the expected value would sometimes fail
68 # (especially under Windows).
69 self.assertGreaterEqual(actual, expected * 0.6)
70 # Test nothing insane happened
71 self.assertLess(actual, expected * 10.0)
72
Antoine Pitrou557934f2009-11-06 22:41:14 +000073
74class BaseLockTests(BaseTestCase):
75 """
76 Tests for both recursive and non-recursive locks.
77 """
78
79 def test_constructor(self):
80 lock = self.locktype()
81 del lock
82
83 def test_acquire_destroy(self):
84 lock = self.locktype()
85 lock.acquire()
86 del lock
87
88 def test_acquire_release(self):
89 lock = self.locktype()
90 lock.acquire()
91 lock.release()
92 del lock
93
94 def test_try_acquire(self):
95 lock = self.locktype()
96 self.assertTrue(lock.acquire(False))
97 lock.release()
98
99 def test_try_acquire_contended(self):
100 lock = self.locktype()
101 lock.acquire()
102 result = []
103 def f():
104 result.append(lock.acquire(False))
105 Bunch(f, 1).wait_for_finished()
106 self.assertFalse(result[0])
107 lock.release()
108
109 def test_acquire_contended(self):
110 lock = self.locktype()
111 lock.acquire()
112 N = 5
113 def f():
114 lock.acquire()
115 lock.release()
116
117 b = Bunch(f, N)
118 b.wait_for_started()
119 _wait()
120 self.assertEqual(len(b.finished), 0)
121 lock.release()
122 b.wait_for_finished()
123 self.assertEqual(len(b.finished), N)
124
125 def test_with(self):
126 lock = self.locktype()
127 def f():
128 lock.acquire()
129 lock.release()
130 def _with(err=None):
131 with lock:
132 if err is not None:
133 raise err
134 _with()
135 # Check the lock is unacquired
136 Bunch(f, 1).wait_for_finished()
137 self.assertRaises(TypeError, _with, TypeError)
138 # Check the lock is unacquired
139 Bunch(f, 1).wait_for_finished()
140
Antoine Pitroub0872682009-11-09 16:08:16 +0000141 def test_thread_leak(self):
142 # The lock shouldn't leak a Thread instance when used from a foreign
143 # (non-threading) thread.
144 lock = self.locktype()
145 def f():
146 lock.acquire()
147 lock.release()
148 n = len(threading.enumerate())
149 # We run many threads in the hope that existing threads ids won't
150 # be recycled.
151 Bunch(f, 15).wait_for_finished()
Antoine Pitrou45fdb452011-04-04 21:59:09 +0200152 if len(threading.enumerate()) != n:
153 # There is a small window during which a Thread instance's
154 # target function has finished running, but the Thread is still
155 # alive and registered. Avoid spurious failures by waiting a
156 # bit more (seen on a buildbot).
157 time.sleep(0.4)
158 self.assertEqual(n, len(threading.enumerate()))
Antoine Pitroub0872682009-11-09 16:08:16 +0000159
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000160 def test_timeout(self):
161 lock = self.locktype()
162 # Can't set timeout if not blocking
163 self.assertRaises(ValueError, lock.acquire, 0, 1)
164 # Invalid timeout values
165 self.assertRaises(ValueError, lock.acquire, timeout=-100)
166 self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
167 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
168 # TIMEOUT_MAX is ok
169 lock.acquire(timeout=TIMEOUT_MAX)
170 lock.release()
171 t1 = time.time()
172 self.assertTrue(lock.acquire(timeout=5))
173 t2 = time.time()
174 # Just a sanity test that it didn't actually wait for the timeout.
175 self.assertLess(t2 - t1, 5)
176 results = []
177 def f():
178 t1 = time.time()
179 results.append(lock.acquire(timeout=0.5))
180 t2 = time.time()
181 results.append(t2 - t1)
182 Bunch(f, 1).wait_for_finished()
183 self.assertFalse(results[0])
184 self.assertTimeout(results[1], 0.5)
185
Antoine Pitrou557934f2009-11-06 22:41:14 +0000186
187class LockTests(BaseLockTests):
188 """
189 Tests for non-recursive, weak locks
190 (which can be acquired and released from different threads).
191 """
192 def test_reacquire(self):
193 # Lock needs to be released before re-acquiring.
194 lock = self.locktype()
195 phase = []
196 def f():
197 lock.acquire()
198 phase.append(None)
199 lock.acquire()
200 phase.append(None)
201 start_new_thread(f, ())
202 while len(phase) == 0:
203 _wait()
204 _wait()
205 self.assertEqual(len(phase), 1)
206 lock.release()
207 while len(phase) == 1:
208 _wait()
209 self.assertEqual(len(phase), 2)
210
211 def test_different_thread(self):
212 # Lock can be released from a different thread.
213 lock = self.locktype()
214 lock.acquire()
215 def f():
216 lock.release()
217 b = Bunch(f, 1)
218 b.wait_for_finished()
219 lock.acquire()
220 lock.release()
221
Antoine Pitrou7899acf2011-03-31 01:00:32 +0200222 def test_state_after_timeout(self):
223 # Issue #11618: check that lock is in a proper state after a
224 # (non-zero) timeout.
225 lock = self.locktype()
226 lock.acquire()
227 self.assertFalse(lock.acquire(timeout=0.01))
228 lock.release()
229 self.assertFalse(lock.locked())
230 self.assertTrue(lock.acquire(blocking=False))
231
Antoine Pitrou557934f2009-11-06 22:41:14 +0000232
233class RLockTests(BaseLockTests):
234 """
235 Tests for recursive locks.
236 """
237 def test_reacquire(self):
238 lock = self.locktype()
239 lock.acquire()
240 lock.acquire()
241 lock.release()
242 lock.acquire()
243 lock.release()
244 lock.release()
245
246 def test_release_unacquired(self):
247 # Cannot release an unacquired lock
248 lock = self.locktype()
249 self.assertRaises(RuntimeError, lock.release)
250 lock.acquire()
251 lock.acquire()
252 lock.release()
253 lock.acquire()
254 lock.release()
255 lock.release()
256 self.assertRaises(RuntimeError, lock.release)
Antoine Pitrouea3eb882012-05-17 18:55:59 +0200257
258 def test_release_save_unacquired(self):
259 # Cannot _release_save an unacquired lock
260 lock = self.locktype()
261 self.assertRaises(RuntimeError, lock._release_save)
262 lock.acquire()
263 lock.acquire()
264 lock.release()
265 lock.acquire()
266 lock.release()
267 lock.release()
Victor Stinnerc2824d42011-04-24 23:41:33 +0200268 self.assertRaises(RuntimeError, lock._release_save)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000269
270 def test_different_thread(self):
271 # Cannot release from a different thread
272 lock = self.locktype()
273 def f():
274 lock.acquire()
275 b = Bunch(f, 1, True)
276 try:
277 self.assertRaises(RuntimeError, lock.release)
278 finally:
279 b.do_finish()
280
281 def test__is_owned(self):
282 lock = self.locktype()
283 self.assertFalse(lock._is_owned())
284 lock.acquire()
285 self.assertTrue(lock._is_owned())
286 lock.acquire()
287 self.assertTrue(lock._is_owned())
288 result = []
289 def f():
290 result.append(lock._is_owned())
291 Bunch(f, 1).wait_for_finished()
292 self.assertFalse(result[0])
293 lock.release()
294 self.assertTrue(lock._is_owned())
295 lock.release()
296 self.assertFalse(lock._is_owned())
297
298
299class EventTests(BaseTestCase):
300 """
301 Tests for Event objects.
302 """
303
304 def test_is_set(self):
305 evt = self.eventtype()
306 self.assertFalse(evt.is_set())
307 evt.set()
308 self.assertTrue(evt.is_set())
309 evt.set()
310 self.assertTrue(evt.is_set())
311 evt.clear()
312 self.assertFalse(evt.is_set())
313 evt.clear()
314 self.assertFalse(evt.is_set())
315
316 def _check_notify(self, evt):
317 # All threads get notified
318 N = 5
319 results1 = []
320 results2 = []
321 def f():
322 results1.append(evt.wait())
323 results2.append(evt.wait())
324 b = Bunch(f, N)
325 b.wait_for_started()
326 _wait()
327 self.assertEqual(len(results1), 0)
328 evt.set()
329 b.wait_for_finished()
330 self.assertEqual(results1, [True] * N)
331 self.assertEqual(results2, [True] * N)
332
333 def test_notify(self):
334 evt = self.eventtype()
335 self._check_notify(evt)
336 # Another time, after an explicit clear()
337 evt.set()
338 evt.clear()
339 self._check_notify(evt)
340
341 def test_timeout(self):
342 evt = self.eventtype()
343 results1 = []
344 results2 = []
345 N = 5
346 def f():
347 results1.append(evt.wait(0.0))
348 t1 = time.time()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000349 r = evt.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000350 t2 = time.time()
351 results2.append((r, t2 - t1))
352 Bunch(f, N).wait_for_finished()
353 self.assertEqual(results1, [False] * N)
354 for r, dt in results2:
355 self.assertFalse(r)
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000356 self.assertTimeout(dt, 0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000357 # The event is set
358 results1 = []
359 results2 = []
360 evt.set()
361 Bunch(f, N).wait_for_finished()
362 self.assertEqual(results1, [True] * N)
363 for r, dt in results2:
364 self.assertTrue(r)
365
Charles-François Natalided03482012-01-07 18:24:56 +0100366 def test_set_and_clear(self):
367 # Issue #13502: check that wait() returns true even when the event is
368 # cleared before the waiting thread is woken up.
369 evt = self.eventtype()
370 results = []
371 N = 5
372 def f():
373 results.append(evt.wait(1))
374 b = Bunch(f, N)
375 b.wait_for_started()
376 time.sleep(0.5)
377 evt.set()
378 evt.clear()
379 b.wait_for_finished()
380 self.assertEqual(results, [True] * N)
381
Antoine Pitrou557934f2009-11-06 22:41:14 +0000382
383class ConditionTests(BaseTestCase):
384 """
385 Tests for condition variables.
386 """
387
388 def test_acquire(self):
389 cond = self.condtype()
390 # Be default we have an RLock: the condition can be acquired multiple
391 # times.
392 cond.acquire()
393 cond.acquire()
394 cond.release()
395 cond.release()
396 lock = threading.Lock()
397 cond = self.condtype(lock)
398 cond.acquire()
399 self.assertFalse(lock.acquire(False))
400 cond.release()
401 self.assertTrue(lock.acquire(False))
402 self.assertFalse(cond.acquire(False))
403 lock.release()
404 with cond:
405 self.assertFalse(lock.acquire(False))
406
407 def test_unacquired_wait(self):
408 cond = self.condtype()
409 self.assertRaises(RuntimeError, cond.wait)
410
411 def test_unacquired_notify(self):
412 cond = self.condtype()
413 self.assertRaises(RuntimeError, cond.notify)
414
415 def _check_notify(self, cond):
416 N = 5
417 results1 = []
418 results2 = []
419 phase_num = 0
420 def f():
421 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000422 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000423 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000424 results1.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000425 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000426 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000427 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000428 results2.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000429 b = Bunch(f, N)
430 b.wait_for_started()
431 _wait()
432 self.assertEqual(results1, [])
433 # Notify 3 threads at first
434 cond.acquire()
435 cond.notify(3)
436 _wait()
437 phase_num = 1
438 cond.release()
439 while len(results1) < 3:
440 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000441 self.assertEqual(results1, [(True, 1)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000442 self.assertEqual(results2, [])
443 # Notify 5 threads: they might be in their first or second wait
444 cond.acquire()
445 cond.notify(5)
446 _wait()
447 phase_num = 2
448 cond.release()
449 while len(results1) + len(results2) < 8:
450 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000451 self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
452 self.assertEqual(results2, [(True, 2)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000453 # Notify all threads: they are all in their second wait
454 cond.acquire()
455 cond.notify_all()
456 _wait()
457 phase_num = 3
458 cond.release()
459 while len(results2) < 5:
460 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000461 self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
462 self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000463 b.wait_for_finished()
464
465 def test_notify(self):
466 cond = self.condtype()
467 self._check_notify(cond)
468 # A second time, to check internal state is still ok.
469 self._check_notify(cond)
470
471 def test_timeout(self):
472 cond = self.condtype()
473 results = []
474 N = 5
475 def f():
476 cond.acquire()
477 t1 = time.time()
Georg Brandlb9a43912010-10-28 09:03:20 +0000478 result = cond.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000479 t2 = time.time()
480 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000481 results.append((t2 - t1, result))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000482 Bunch(f, N).wait_for_finished()
Georg Brandlb9a43912010-10-28 09:03:20 +0000483 self.assertEqual(len(results), N)
484 for dt, result in results:
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000485 self.assertTimeout(dt, 0.5)
Georg Brandlb9a43912010-10-28 09:03:20 +0000486 # Note that conceptually (that"s the condition variable protocol)
487 # a wait() may succeed even if no one notifies us and before any
488 # timeout occurs. Spurious wakeups can occur.
489 # This makes it hard to verify the result value.
490 # In practice, this implementation has no spurious wakeups.
491 self.assertFalse(result)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000492
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000493 def test_waitfor(self):
494 cond = self.condtype()
495 state = 0
496 def f():
497 with cond:
498 result = cond.wait_for(lambda : state==4)
499 self.assertTrue(result)
500 self.assertEqual(state, 4)
501 b = Bunch(f, 1)
502 b.wait_for_started()
Victor Stinner3349bca2011-05-18 00:16:14 +0200503 for i in range(4):
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000504 time.sleep(0.01)
505 with cond:
506 state += 1
507 cond.notify()
508 b.wait_for_finished()
509
510 def test_waitfor_timeout(self):
511 cond = self.condtype()
512 state = 0
513 success = []
514 def f():
515 with cond:
516 dt = time.time()
517 result = cond.wait_for(lambda : state==4, timeout=0.1)
518 dt = time.time() - dt
519 self.assertFalse(result)
520 self.assertTimeout(dt, 0.1)
521 success.append(None)
522 b = Bunch(f, 1)
523 b.wait_for_started()
524 # Only increment 3 times, so state == 4 is never reached.
525 for i in range(3):
526 time.sleep(0.01)
527 with cond:
528 state += 1
529 cond.notify()
530 b.wait_for_finished()
531 self.assertEqual(len(success), 1)
532
Antoine Pitrou557934f2009-11-06 22:41:14 +0000533
534class BaseSemaphoreTests(BaseTestCase):
535 """
536 Common tests for {bounded, unbounded} semaphore objects.
537 """
538
539 def test_constructor(self):
540 self.assertRaises(ValueError, self.semtype, value = -1)
541 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
542
543 def test_acquire(self):
544 sem = self.semtype(1)
545 sem.acquire()
546 sem.release()
547 sem = self.semtype(2)
548 sem.acquire()
549 sem.acquire()
550 sem.release()
551 sem.release()
552
553 def test_acquire_destroy(self):
554 sem = self.semtype()
555 sem.acquire()
556 del sem
557
558 def test_acquire_contended(self):
559 sem = self.semtype(7)
560 sem.acquire()
561 N = 10
562 results1 = []
563 results2 = []
564 phase_num = 0
565 def f():
566 sem.acquire()
567 results1.append(phase_num)
568 sem.acquire()
569 results2.append(phase_num)
570 b = Bunch(f, 10)
571 b.wait_for_started()
572 while len(results1) + len(results2) < 6:
573 _wait()
574 self.assertEqual(results1 + results2, [0] * 6)
575 phase_num = 1
576 for i in range(7):
577 sem.release()
578 while len(results1) + len(results2) < 13:
579 _wait()
580 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
581 phase_num = 2
582 for i in range(6):
583 sem.release()
584 while len(results1) + len(results2) < 19:
585 _wait()
586 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
587 # The semaphore is still locked
588 self.assertFalse(sem.acquire(False))
589 # Final release, to let the last thread finish
590 sem.release()
591 b.wait_for_finished()
592
593 def test_try_acquire(self):
594 sem = self.semtype(2)
595 self.assertTrue(sem.acquire(False))
596 self.assertTrue(sem.acquire(False))
597 self.assertFalse(sem.acquire(False))
598 sem.release()
599 self.assertTrue(sem.acquire(False))
600
601 def test_try_acquire_contended(self):
602 sem = self.semtype(4)
603 sem.acquire()
604 results = []
605 def f():
606 results.append(sem.acquire(False))
607 results.append(sem.acquire(False))
608 Bunch(f, 5).wait_for_finished()
609 # There can be a thread switch between acquiring the semaphore and
610 # appending the result, therefore results will not necessarily be
611 # ordered.
612 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
613
Antoine Pitrou0454af92010-04-17 23:51:58 +0000614 def test_acquire_timeout(self):
615 sem = self.semtype(2)
616 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
617 self.assertTrue(sem.acquire(timeout=0.005))
618 self.assertTrue(sem.acquire(timeout=0.005))
619 self.assertFalse(sem.acquire(timeout=0.005))
620 sem.release()
621 self.assertTrue(sem.acquire(timeout=0.005))
622 t = time.time()
623 self.assertFalse(sem.acquire(timeout=0.5))
624 dt = time.time() - t
625 self.assertTimeout(dt, 0.5)
626
Antoine Pitrou557934f2009-11-06 22:41:14 +0000627 def test_default_value(self):
628 # The default initial value is 1.
629 sem = self.semtype()
630 sem.acquire()
631 def f():
632 sem.acquire()
633 sem.release()
634 b = Bunch(f, 1)
635 b.wait_for_started()
636 _wait()
637 self.assertFalse(b.finished)
638 sem.release()
639 b.wait_for_finished()
640
641 def test_with(self):
642 sem = self.semtype(2)
643 def _with(err=None):
644 with sem:
645 self.assertTrue(sem.acquire(False))
646 sem.release()
647 with sem:
648 self.assertFalse(sem.acquire(False))
649 if err:
650 raise err
651 _with()
652 self.assertTrue(sem.acquire(False))
653 sem.release()
654 self.assertRaises(TypeError, _with, TypeError)
655 self.assertTrue(sem.acquire(False))
656 sem.release()
657
658class SemaphoreTests(BaseSemaphoreTests):
659 """
660 Tests for unbounded semaphores.
661 """
662
663 def test_release_unacquired(self):
664 # Unbounded releases are allowed and increment the semaphore's value
665 sem = self.semtype(1)
666 sem.release()
667 sem.acquire()
668 sem.acquire()
669 sem.release()
670
671
672class BoundedSemaphoreTests(BaseSemaphoreTests):
673 """
674 Tests for bounded semaphores.
675 """
676
677 def test_release_unacquired(self):
678 # Cannot go past the initial value
679 sem = self.semtype()
680 self.assertRaises(ValueError, sem.release)
681 sem.acquire()
682 sem.release()
683 self.assertRaises(ValueError, sem.release)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000684
685
686class BarrierTests(BaseTestCase):
687 """
688 Tests for Barrier objects.
689 """
690 N = 5
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000691 defaultTimeout = 2.0
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000692
693 def setUp(self):
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000694 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000695 def tearDown(self):
696 self.barrier.abort()
697
698 def run_threads(self, f):
699 b = Bunch(f, self.N-1)
700 f()
701 b.wait_for_finished()
702
703 def multipass(self, results, n):
704 m = self.barrier.parties
705 self.assertEqual(m, self.N)
706 for i in range(n):
707 results[0].append(True)
708 self.assertEqual(len(results[1]), i * m)
709 self.barrier.wait()
710 results[1].append(True)
711 self.assertEqual(len(results[0]), (i + 1) * m)
712 self.barrier.wait()
713 self.assertEqual(self.barrier.n_waiting, 0)
714 self.assertFalse(self.barrier.broken)
715
716 def test_barrier(self, passes=1):
717 """
718 Test that a barrier is passed in lockstep
719 """
720 results = [[],[]]
721 def f():
722 self.multipass(results, passes)
723 self.run_threads(f)
724
725 def test_barrier_10(self):
726 """
727 Test that a barrier works for 10 consecutive runs
728 """
729 return self.test_barrier(10)
730
731 def test_wait_return(self):
732 """
733 test the return value from barrier.wait
734 """
735 results = []
736 def f():
737 r = self.barrier.wait()
738 results.append(r)
739
740 self.run_threads(f)
741 self.assertEqual(sum(results), sum(range(self.N)))
742
743 def test_action(self):
744 """
745 Test the 'action' callback
746 """
747 results = []
748 def action():
749 results.append(True)
750 barrier = self.barriertype(self.N, action)
751 def f():
752 barrier.wait()
753 self.assertEqual(len(results), 1)
754
755 self.run_threads(f)
756
757 def test_abort(self):
758 """
759 Test that an abort will put the barrier in a broken state
760 """
761 results1 = []
762 results2 = []
763 def f():
764 try:
765 i = self.barrier.wait()
766 if i == self.N//2:
767 raise RuntimeError
768 self.barrier.wait()
769 results1.append(True)
770 except threading.BrokenBarrierError:
771 results2.append(True)
772 except RuntimeError:
773 self.barrier.abort()
774 pass
775
776 self.run_threads(f)
777 self.assertEqual(len(results1), 0)
778 self.assertEqual(len(results2), self.N-1)
779 self.assertTrue(self.barrier.broken)
780
781 def test_reset(self):
782 """
783 Test that a 'reset' on a barrier frees the waiting threads
784 """
785 results1 = []
786 results2 = []
787 results3 = []
788 def f():
789 i = self.barrier.wait()
790 if i == self.N//2:
791 # Wait until the other threads are all in the barrier.
792 while self.barrier.n_waiting < self.N-1:
793 time.sleep(0.001)
794 self.barrier.reset()
795 else:
796 try:
797 self.barrier.wait()
798 results1.append(True)
799 except threading.BrokenBarrierError:
800 results2.append(True)
801 # Now, pass the barrier again
802 self.barrier.wait()
803 results3.append(True)
804
805 self.run_threads(f)
806 self.assertEqual(len(results1), 0)
807 self.assertEqual(len(results2), self.N-1)
808 self.assertEqual(len(results3), self.N)
809
810
811 def test_abort_and_reset(self):
812 """
813 Test that a barrier can be reset after being broken.
814 """
815 results1 = []
816 results2 = []
817 results3 = []
818 barrier2 = self.barriertype(self.N)
819 def f():
820 try:
821 i = self.barrier.wait()
822 if i == self.N//2:
823 raise RuntimeError
824 self.barrier.wait()
825 results1.append(True)
826 except threading.BrokenBarrierError:
827 results2.append(True)
828 except RuntimeError:
829 self.barrier.abort()
830 pass
831 # Synchronize and reset the barrier. Must synchronize first so
832 # that everyone has left it when we reset, and after so that no
833 # one enters it before the reset.
834 if barrier2.wait() == self.N//2:
835 self.barrier.reset()
836 barrier2.wait()
837 self.barrier.wait()
838 results3.append(True)
839
840 self.run_threads(f)
841 self.assertEqual(len(results1), 0)
842 self.assertEqual(len(results2), self.N-1)
843 self.assertEqual(len(results3), self.N)
844
845 def test_timeout(self):
846 """
847 Test wait(timeout)
848 """
849 def f():
850 i = self.barrier.wait()
851 if i == self.N // 2:
852 # One thread is late!
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000853 time.sleep(1.0)
854 # Default timeout is 2.0, so this is shorter.
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000855 self.assertRaises(threading.BrokenBarrierError,
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000856 self.barrier.wait, 0.5)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000857 self.run_threads(f)
858
859 def test_default_timeout(self):
860 """
861 Test the barrier's default timeout
862 """
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200863 # create a barrier with a low default timeout
864 barrier = self.barriertype(self.N, timeout=0.3)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000865 def f():
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000866 i = barrier.wait()
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000867 if i == self.N // 2:
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200868 # One thread is later than the default timeout of 0.3s.
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000869 time.sleep(1.0)
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000870 self.assertRaises(threading.BrokenBarrierError, barrier.wait)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000871 self.run_threads(f)
872
873 def test_single_thread(self):
874 b = self.barriertype(1)
875 b.wait()
876 b.wait()