blob: a64aa18cd3aaae45b83b479a060fe8e1435c6edc [file] [log] [blame]
Antoine Pitrou557934f2009-11-06 22:41:14 +00001"""
2Various tests for synchronization primitives.
3"""
4
5import sys
6import time
Victor Stinner2a129742011-05-30 23:02:52 +02007from _thread import start_new_thread, TIMEOUT_MAX
Antoine Pitrou557934f2009-11-06 22:41:14 +00008import threading
9import unittest
10
11from test import support
12
13
14def _wait():
15 # A crude wait/yield function not relying on synchronization primitives.
16 time.sleep(0.01)
17
18class Bunch(object):
19 """
20 A bunch of threads.
21 """
22 def __init__(self, f, n, wait_before_exit=False):
23 """
24 Construct a bunch of `n` threads running the same function `f`.
25 If `wait_before_exit` is True, the threads won't terminate until
26 do_finish() is called.
27 """
28 self.f = f
29 self.n = n
30 self.started = []
31 self.finished = []
32 self._can_exit = not wait_before_exit
33 def task():
Victor Stinner2a129742011-05-30 23:02:52 +020034 tid = threading.get_ident()
Antoine Pitrou557934f2009-11-06 22:41:14 +000035 self.started.append(tid)
36 try:
37 f()
38 finally:
39 self.finished.append(tid)
40 while not self._can_exit:
41 _wait()
Serhiy Storchaka9db55002015-03-28 20:38:37 +020042 try:
43 for i in range(n):
44 start_new_thread(task, ())
45 except:
46 self._can_exit = True
47 raise
Antoine Pitrou557934f2009-11-06 22:41:14 +000048
49 def wait_for_started(self):
50 while len(self.started) < self.n:
51 _wait()
52
53 def wait_for_finished(self):
54 while len(self.finished) < self.n:
55 _wait()
56
57 def do_finish(self):
58 self._can_exit = True
59
60
61class BaseTestCase(unittest.TestCase):
62 def setUp(self):
63 self._threads = support.threading_setup()
64
65 def tearDown(self):
66 support.threading_cleanup(*self._threads)
67 support.reap_children()
68
Antoine Pitrou7c3e5772010-04-14 15:44:10 +000069 def assertTimeout(self, actual, expected):
70 # The waiting and/or time.time() can be imprecise, which
71 # is why comparing to the expected value would sometimes fail
72 # (especially under Windows).
73 self.assertGreaterEqual(actual, expected * 0.6)
74 # Test nothing insane happened
75 self.assertLess(actual, expected * 10.0)
76
Antoine Pitrou557934f2009-11-06 22:41:14 +000077
78class BaseLockTests(BaseTestCase):
79 """
80 Tests for both recursive and non-recursive locks.
81 """
82
83 def test_constructor(self):
84 lock = self.locktype()
85 del lock
86
Christian Heimesc5d95b12013-07-30 15:54:39 +020087 def test_repr(self):
88 lock = self.locktype()
Raymond Hettinger62f4dad2014-05-25 18:22:35 -070089 self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
90 del lock
91
92 def test_locked_repr(self):
93 lock = self.locktype()
94 lock.acquire()
95 self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
Christian Heimesc5d95b12013-07-30 15:54:39 +020096 del lock
97
Antoine Pitrou557934f2009-11-06 22:41:14 +000098 def test_acquire_destroy(self):
99 lock = self.locktype()
100 lock.acquire()
101 del lock
102
103 def test_acquire_release(self):
104 lock = self.locktype()
105 lock.acquire()
106 lock.release()
107 del lock
108
109 def test_try_acquire(self):
110 lock = self.locktype()
111 self.assertTrue(lock.acquire(False))
112 lock.release()
113
114 def test_try_acquire_contended(self):
115 lock = self.locktype()
116 lock.acquire()
117 result = []
118 def f():
119 result.append(lock.acquire(False))
120 Bunch(f, 1).wait_for_finished()
121 self.assertFalse(result[0])
122 lock.release()
123
124 def test_acquire_contended(self):
125 lock = self.locktype()
126 lock.acquire()
127 N = 5
128 def f():
129 lock.acquire()
130 lock.release()
131
132 b = Bunch(f, N)
133 b.wait_for_started()
134 _wait()
135 self.assertEqual(len(b.finished), 0)
136 lock.release()
137 b.wait_for_finished()
138 self.assertEqual(len(b.finished), N)
139
140 def test_with(self):
141 lock = self.locktype()
142 def f():
143 lock.acquire()
144 lock.release()
145 def _with(err=None):
146 with lock:
147 if err is not None:
148 raise err
149 _with()
150 # Check the lock is unacquired
151 Bunch(f, 1).wait_for_finished()
152 self.assertRaises(TypeError, _with, TypeError)
153 # Check the lock is unacquired
154 Bunch(f, 1).wait_for_finished()
155
Antoine Pitroub0872682009-11-09 16:08:16 +0000156 def test_thread_leak(self):
157 # The lock shouldn't leak a Thread instance when used from a foreign
158 # (non-threading) thread.
159 lock = self.locktype()
160 def f():
161 lock.acquire()
162 lock.release()
163 n = len(threading.enumerate())
164 # We run many threads in the hope that existing threads ids won't
165 # be recycled.
166 Bunch(f, 15).wait_for_finished()
Antoine Pitrou45fdb452011-04-04 21:59:09 +0200167 if len(threading.enumerate()) != n:
168 # There is a small window during which a Thread instance's
169 # target function has finished running, but the Thread is still
170 # alive and registered. Avoid spurious failures by waiting a
171 # bit more (seen on a buildbot).
172 time.sleep(0.4)
173 self.assertEqual(n, len(threading.enumerate()))
Antoine Pitroub0872682009-11-09 16:08:16 +0000174
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000175 def test_timeout(self):
176 lock = self.locktype()
177 # Can't set timeout if not blocking
178 self.assertRaises(ValueError, lock.acquire, 0, 1)
179 # Invalid timeout values
180 self.assertRaises(ValueError, lock.acquire, timeout=-100)
181 self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
182 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
183 # TIMEOUT_MAX is ok
184 lock.acquire(timeout=TIMEOUT_MAX)
185 lock.release()
186 t1 = time.time()
187 self.assertTrue(lock.acquire(timeout=5))
188 t2 = time.time()
189 # Just a sanity test that it didn't actually wait for the timeout.
190 self.assertLess(t2 - t1, 5)
191 results = []
192 def f():
193 t1 = time.time()
194 results.append(lock.acquire(timeout=0.5))
195 t2 = time.time()
196 results.append(t2 - t1)
197 Bunch(f, 1).wait_for_finished()
198 self.assertFalse(results[0])
199 self.assertTimeout(results[1], 0.5)
200
Antoine Pitrou557934f2009-11-06 22:41:14 +0000201
202class LockTests(BaseLockTests):
203 """
204 Tests for non-recursive, weak locks
205 (which can be acquired and released from different threads).
206 """
207 def test_reacquire(self):
208 # Lock needs to be released before re-acquiring.
209 lock = self.locktype()
210 phase = []
211 def f():
212 lock.acquire()
213 phase.append(None)
214 lock.acquire()
215 phase.append(None)
216 start_new_thread(f, ())
217 while len(phase) == 0:
218 _wait()
219 _wait()
220 self.assertEqual(len(phase), 1)
221 lock.release()
222 while len(phase) == 1:
223 _wait()
224 self.assertEqual(len(phase), 2)
225
226 def test_different_thread(self):
227 # Lock can be released from a different thread.
228 lock = self.locktype()
229 lock.acquire()
230 def f():
231 lock.release()
232 b = Bunch(f, 1)
233 b.wait_for_finished()
234 lock.acquire()
235 lock.release()
236
Antoine Pitrou7899acf2011-03-31 01:00:32 +0200237 def test_state_after_timeout(self):
238 # Issue #11618: check that lock is in a proper state after a
239 # (non-zero) timeout.
240 lock = self.locktype()
241 lock.acquire()
242 self.assertFalse(lock.acquire(timeout=0.01))
243 lock.release()
244 self.assertFalse(lock.locked())
245 self.assertTrue(lock.acquire(blocking=False))
246
Antoine Pitrou557934f2009-11-06 22:41:14 +0000247
248class RLockTests(BaseLockTests):
249 """
250 Tests for recursive locks.
251 """
252 def test_reacquire(self):
253 lock = self.locktype()
254 lock.acquire()
255 lock.acquire()
256 lock.release()
257 lock.acquire()
258 lock.release()
259 lock.release()
260
261 def test_release_unacquired(self):
262 # Cannot release an unacquired lock
263 lock = self.locktype()
264 self.assertRaises(RuntimeError, lock.release)
265 lock.acquire()
266 lock.acquire()
267 lock.release()
268 lock.acquire()
269 lock.release()
270 lock.release()
271 self.assertRaises(RuntimeError, lock.release)
Antoine Pitrouea3eb882012-05-17 18:55:59 +0200272
273 def test_release_save_unacquired(self):
274 # Cannot _release_save an unacquired lock
275 lock = self.locktype()
276 self.assertRaises(RuntimeError, lock._release_save)
277 lock.acquire()
278 lock.acquire()
279 lock.release()
280 lock.acquire()
281 lock.release()
282 lock.release()
Victor Stinnerc2824d42011-04-24 23:41:33 +0200283 self.assertRaises(RuntimeError, lock._release_save)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000284
285 def test_different_thread(self):
286 # Cannot release from a different thread
287 lock = self.locktype()
288 def f():
289 lock.acquire()
290 b = Bunch(f, 1, True)
291 try:
292 self.assertRaises(RuntimeError, lock.release)
293 finally:
294 b.do_finish()
295
296 def test__is_owned(self):
297 lock = self.locktype()
298 self.assertFalse(lock._is_owned())
299 lock.acquire()
300 self.assertTrue(lock._is_owned())
301 lock.acquire()
302 self.assertTrue(lock._is_owned())
303 result = []
304 def f():
305 result.append(lock._is_owned())
306 Bunch(f, 1).wait_for_finished()
307 self.assertFalse(result[0])
308 lock.release()
309 self.assertTrue(lock._is_owned())
310 lock.release()
311 self.assertFalse(lock._is_owned())
312
313
314class EventTests(BaseTestCase):
315 """
316 Tests for Event objects.
317 """
318
319 def test_is_set(self):
320 evt = self.eventtype()
321 self.assertFalse(evt.is_set())
322 evt.set()
323 self.assertTrue(evt.is_set())
324 evt.set()
325 self.assertTrue(evt.is_set())
326 evt.clear()
327 self.assertFalse(evt.is_set())
328 evt.clear()
329 self.assertFalse(evt.is_set())
330
331 def _check_notify(self, evt):
332 # All threads get notified
333 N = 5
334 results1 = []
335 results2 = []
336 def f():
337 results1.append(evt.wait())
338 results2.append(evt.wait())
339 b = Bunch(f, N)
340 b.wait_for_started()
341 _wait()
342 self.assertEqual(len(results1), 0)
343 evt.set()
344 b.wait_for_finished()
345 self.assertEqual(results1, [True] * N)
346 self.assertEqual(results2, [True] * N)
347
348 def test_notify(self):
349 evt = self.eventtype()
350 self._check_notify(evt)
351 # Another time, after an explicit clear()
352 evt.set()
353 evt.clear()
354 self._check_notify(evt)
355
356 def test_timeout(self):
357 evt = self.eventtype()
358 results1 = []
359 results2 = []
360 N = 5
361 def f():
362 results1.append(evt.wait(0.0))
363 t1 = time.time()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000364 r = evt.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000365 t2 = time.time()
366 results2.append((r, t2 - t1))
367 Bunch(f, N).wait_for_finished()
368 self.assertEqual(results1, [False] * N)
369 for r, dt in results2:
370 self.assertFalse(r)
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000371 self.assertTimeout(dt, 0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000372 # The event is set
373 results1 = []
374 results2 = []
375 evt.set()
376 Bunch(f, N).wait_for_finished()
377 self.assertEqual(results1, [True] * N)
378 for r, dt in results2:
379 self.assertTrue(r)
380
Charles-François Natalided03482012-01-07 18:24:56 +0100381 def test_set_and_clear(self):
382 # Issue #13502: check that wait() returns true even when the event is
383 # cleared before the waiting thread is woken up.
384 evt = self.eventtype()
385 results = []
386 N = 5
387 def f():
388 results.append(evt.wait(1))
389 b = Bunch(f, N)
390 b.wait_for_started()
391 time.sleep(0.5)
392 evt.set()
393 evt.clear()
394 b.wait_for_finished()
395 self.assertEqual(results, [True] * N)
396
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700397 def test_reset_internal_locks(self):
Berker Peksag6d34bbb2016-04-29 17:25:29 +0300398 # ensure that condition is still using a Lock after reset
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700399 evt = self.eventtype()
Berker Peksag6d34bbb2016-04-29 17:25:29 +0300400 with evt._cond:
401 self.assertFalse(evt._cond.acquire(False))
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700402 evt._reset_internal_locks()
Berker Peksag6d34bbb2016-04-29 17:25:29 +0300403 with evt._cond:
404 self.assertFalse(evt._cond.acquire(False))
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700405
Antoine Pitrou557934f2009-11-06 22:41:14 +0000406
407class ConditionTests(BaseTestCase):
408 """
409 Tests for condition variables.
410 """
411
412 def test_acquire(self):
413 cond = self.condtype()
414 # Be default we have an RLock: the condition can be acquired multiple
415 # times.
416 cond.acquire()
417 cond.acquire()
418 cond.release()
419 cond.release()
420 lock = threading.Lock()
421 cond = self.condtype(lock)
422 cond.acquire()
423 self.assertFalse(lock.acquire(False))
424 cond.release()
425 self.assertTrue(lock.acquire(False))
426 self.assertFalse(cond.acquire(False))
427 lock.release()
428 with cond:
429 self.assertFalse(lock.acquire(False))
430
431 def test_unacquired_wait(self):
432 cond = self.condtype()
433 self.assertRaises(RuntimeError, cond.wait)
434
435 def test_unacquired_notify(self):
436 cond = self.condtype()
437 self.assertRaises(RuntimeError, cond.notify)
438
439 def _check_notify(self, cond):
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000440 # Note that this test is sensitive to timing. If the worker threads
441 # don't execute in a timely fashion, the main thread may think they
442 # are further along then they are. The main thread therefore issues
443 # _wait() statements to try to make sure that it doesn't race ahead
444 # of the workers.
445 # Secondly, this test assumes that condition variables are not subject
446 # to spurious wakeups. The absence of spurious wakeups is an implementation
447 # detail of Condition Cariables in current CPython, but in general, not
448 # a guaranteed property of condition variables as a programming
449 # construct. In particular, it is possible that this can no longer
450 # be conveniently guaranteed should their implementation ever change.
Antoine Pitrou557934f2009-11-06 22:41:14 +0000451 N = 5
452 results1 = []
453 results2 = []
454 phase_num = 0
455 def f():
456 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000457 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000458 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000459 results1.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000460 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000461 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000462 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000463 results2.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000464 b = Bunch(f, N)
465 b.wait_for_started()
466 _wait()
467 self.assertEqual(results1, [])
468 # Notify 3 threads at first
469 cond.acquire()
470 cond.notify(3)
471 _wait()
472 phase_num = 1
473 cond.release()
474 while len(results1) < 3:
475 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000476 self.assertEqual(results1, [(True, 1)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000477 self.assertEqual(results2, [])
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000478 # first wait, to ensure all workers settle into cond.wait() before
479 # we continue. See issue #8799
480 _wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000481 # Notify 5 threads: they might be in their first or second wait
482 cond.acquire()
483 cond.notify(5)
484 _wait()
485 phase_num = 2
486 cond.release()
487 while len(results1) + len(results2) < 8:
488 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000489 self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
490 self.assertEqual(results2, [(True, 2)] * 3)
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000491 _wait() # make sure all workers settle into cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000492 # Notify all threads: they are all in their second wait
493 cond.acquire()
494 cond.notify_all()
495 _wait()
496 phase_num = 3
497 cond.release()
498 while len(results2) < 5:
499 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000500 self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
501 self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000502 b.wait_for_finished()
503
504 def test_notify(self):
505 cond = self.condtype()
506 self._check_notify(cond)
507 # A second time, to check internal state is still ok.
508 self._check_notify(cond)
509
510 def test_timeout(self):
511 cond = self.condtype()
512 results = []
513 N = 5
514 def f():
515 cond.acquire()
516 t1 = time.time()
Georg Brandlb9a43912010-10-28 09:03:20 +0000517 result = cond.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000518 t2 = time.time()
519 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000520 results.append((t2 - t1, result))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000521 Bunch(f, N).wait_for_finished()
Georg Brandlb9a43912010-10-28 09:03:20 +0000522 self.assertEqual(len(results), N)
523 for dt, result in results:
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000524 self.assertTimeout(dt, 0.5)
Georg Brandlb9a43912010-10-28 09:03:20 +0000525 # Note that conceptually (that"s the condition variable protocol)
526 # a wait() may succeed even if no one notifies us and before any
527 # timeout occurs. Spurious wakeups can occur.
528 # This makes it hard to verify the result value.
529 # In practice, this implementation has no spurious wakeups.
530 self.assertFalse(result)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000531
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000532 def test_waitfor(self):
533 cond = self.condtype()
534 state = 0
535 def f():
536 with cond:
537 result = cond.wait_for(lambda : state==4)
538 self.assertTrue(result)
539 self.assertEqual(state, 4)
540 b = Bunch(f, 1)
541 b.wait_for_started()
Victor Stinner3349bca2011-05-18 00:16:14 +0200542 for i in range(4):
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000543 time.sleep(0.01)
544 with cond:
545 state += 1
546 cond.notify()
547 b.wait_for_finished()
548
549 def test_waitfor_timeout(self):
550 cond = self.condtype()
551 state = 0
552 success = []
553 def f():
554 with cond:
555 dt = time.time()
556 result = cond.wait_for(lambda : state==4, timeout=0.1)
557 dt = time.time() - dt
558 self.assertFalse(result)
559 self.assertTimeout(dt, 0.1)
560 success.append(None)
561 b = Bunch(f, 1)
562 b.wait_for_started()
563 # Only increment 3 times, so state == 4 is never reached.
564 for i in range(3):
565 time.sleep(0.01)
566 with cond:
567 state += 1
568 cond.notify()
569 b.wait_for_finished()
570 self.assertEqual(len(success), 1)
571
Antoine Pitrou557934f2009-11-06 22:41:14 +0000572
573class BaseSemaphoreTests(BaseTestCase):
574 """
575 Common tests for {bounded, unbounded} semaphore objects.
576 """
577
578 def test_constructor(self):
579 self.assertRaises(ValueError, self.semtype, value = -1)
580 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
581
582 def test_acquire(self):
583 sem = self.semtype(1)
584 sem.acquire()
585 sem.release()
586 sem = self.semtype(2)
587 sem.acquire()
588 sem.acquire()
589 sem.release()
590 sem.release()
591
592 def test_acquire_destroy(self):
593 sem = self.semtype()
594 sem.acquire()
595 del sem
596
597 def test_acquire_contended(self):
598 sem = self.semtype(7)
599 sem.acquire()
600 N = 10
601 results1 = []
602 results2 = []
603 phase_num = 0
604 def f():
605 sem.acquire()
606 results1.append(phase_num)
607 sem.acquire()
608 results2.append(phase_num)
609 b = Bunch(f, 10)
610 b.wait_for_started()
611 while len(results1) + len(results2) < 6:
612 _wait()
613 self.assertEqual(results1 + results2, [0] * 6)
614 phase_num = 1
615 for i in range(7):
616 sem.release()
617 while len(results1) + len(results2) < 13:
618 _wait()
619 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
620 phase_num = 2
621 for i in range(6):
622 sem.release()
623 while len(results1) + len(results2) < 19:
624 _wait()
625 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
626 # The semaphore is still locked
627 self.assertFalse(sem.acquire(False))
628 # Final release, to let the last thread finish
629 sem.release()
630 b.wait_for_finished()
631
632 def test_try_acquire(self):
633 sem = self.semtype(2)
634 self.assertTrue(sem.acquire(False))
635 self.assertTrue(sem.acquire(False))
636 self.assertFalse(sem.acquire(False))
637 sem.release()
638 self.assertTrue(sem.acquire(False))
639
640 def test_try_acquire_contended(self):
641 sem = self.semtype(4)
642 sem.acquire()
643 results = []
644 def f():
645 results.append(sem.acquire(False))
646 results.append(sem.acquire(False))
647 Bunch(f, 5).wait_for_finished()
648 # There can be a thread switch between acquiring the semaphore and
649 # appending the result, therefore results will not necessarily be
650 # ordered.
651 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
652
Antoine Pitrou0454af92010-04-17 23:51:58 +0000653 def test_acquire_timeout(self):
654 sem = self.semtype(2)
655 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
656 self.assertTrue(sem.acquire(timeout=0.005))
657 self.assertTrue(sem.acquire(timeout=0.005))
658 self.assertFalse(sem.acquire(timeout=0.005))
659 sem.release()
660 self.assertTrue(sem.acquire(timeout=0.005))
661 t = time.time()
662 self.assertFalse(sem.acquire(timeout=0.5))
663 dt = time.time() - t
664 self.assertTimeout(dt, 0.5)
665
Antoine Pitrou557934f2009-11-06 22:41:14 +0000666 def test_default_value(self):
667 # The default initial value is 1.
668 sem = self.semtype()
669 sem.acquire()
670 def f():
671 sem.acquire()
672 sem.release()
673 b = Bunch(f, 1)
674 b.wait_for_started()
675 _wait()
676 self.assertFalse(b.finished)
677 sem.release()
678 b.wait_for_finished()
679
680 def test_with(self):
681 sem = self.semtype(2)
682 def _with(err=None):
683 with sem:
684 self.assertTrue(sem.acquire(False))
685 sem.release()
686 with sem:
687 self.assertFalse(sem.acquire(False))
688 if err:
689 raise err
690 _with()
691 self.assertTrue(sem.acquire(False))
692 sem.release()
693 self.assertRaises(TypeError, _with, TypeError)
694 self.assertTrue(sem.acquire(False))
695 sem.release()
696
697class SemaphoreTests(BaseSemaphoreTests):
698 """
699 Tests for unbounded semaphores.
700 """
701
702 def test_release_unacquired(self):
703 # Unbounded releases are allowed and increment the semaphore's value
704 sem = self.semtype(1)
705 sem.release()
706 sem.acquire()
707 sem.acquire()
708 sem.release()
709
710
711class BoundedSemaphoreTests(BaseSemaphoreTests):
712 """
713 Tests for bounded semaphores.
714 """
715
716 def test_release_unacquired(self):
717 # Cannot go past the initial value
718 sem = self.semtype()
719 self.assertRaises(ValueError, sem.release)
720 sem.acquire()
721 sem.release()
722 self.assertRaises(ValueError, sem.release)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000723
724
725class BarrierTests(BaseTestCase):
726 """
727 Tests for Barrier objects.
728 """
729 N = 5
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000730 defaultTimeout = 2.0
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000731
732 def setUp(self):
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000733 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000734 def tearDown(self):
735 self.barrier.abort()
736
737 def run_threads(self, f):
738 b = Bunch(f, self.N-1)
739 f()
740 b.wait_for_finished()
741
742 def multipass(self, results, n):
743 m = self.barrier.parties
744 self.assertEqual(m, self.N)
745 for i in range(n):
746 results[0].append(True)
747 self.assertEqual(len(results[1]), i * m)
748 self.barrier.wait()
749 results[1].append(True)
750 self.assertEqual(len(results[0]), (i + 1) * m)
751 self.barrier.wait()
752 self.assertEqual(self.barrier.n_waiting, 0)
753 self.assertFalse(self.barrier.broken)
754
755 def test_barrier(self, passes=1):
756 """
757 Test that a barrier is passed in lockstep
758 """
759 results = [[],[]]
760 def f():
761 self.multipass(results, passes)
762 self.run_threads(f)
763
764 def test_barrier_10(self):
765 """
766 Test that a barrier works for 10 consecutive runs
767 """
768 return self.test_barrier(10)
769
770 def test_wait_return(self):
771 """
772 test the return value from barrier.wait
773 """
774 results = []
775 def f():
776 r = self.barrier.wait()
777 results.append(r)
778
779 self.run_threads(f)
780 self.assertEqual(sum(results), sum(range(self.N)))
781
782 def test_action(self):
783 """
784 Test the 'action' callback
785 """
786 results = []
787 def action():
788 results.append(True)
789 barrier = self.barriertype(self.N, action)
790 def f():
791 barrier.wait()
792 self.assertEqual(len(results), 1)
793
794 self.run_threads(f)
795
796 def test_abort(self):
797 """
798 Test that an abort will put the barrier in a broken state
799 """
800 results1 = []
801 results2 = []
802 def f():
803 try:
804 i = self.barrier.wait()
805 if i == self.N//2:
806 raise RuntimeError
807 self.barrier.wait()
808 results1.append(True)
809 except threading.BrokenBarrierError:
810 results2.append(True)
811 except RuntimeError:
812 self.barrier.abort()
813 pass
814
815 self.run_threads(f)
816 self.assertEqual(len(results1), 0)
817 self.assertEqual(len(results2), self.N-1)
818 self.assertTrue(self.barrier.broken)
819
820 def test_reset(self):
821 """
822 Test that a 'reset' on a barrier frees the waiting threads
823 """
824 results1 = []
825 results2 = []
826 results3 = []
827 def f():
828 i = self.barrier.wait()
829 if i == self.N//2:
830 # Wait until the other threads are all in the barrier.
831 while self.barrier.n_waiting < self.N-1:
832 time.sleep(0.001)
833 self.barrier.reset()
834 else:
835 try:
836 self.barrier.wait()
837 results1.append(True)
838 except threading.BrokenBarrierError:
839 results2.append(True)
840 # Now, pass the barrier again
841 self.barrier.wait()
842 results3.append(True)
843
844 self.run_threads(f)
845 self.assertEqual(len(results1), 0)
846 self.assertEqual(len(results2), self.N-1)
847 self.assertEqual(len(results3), self.N)
848
849
850 def test_abort_and_reset(self):
851 """
852 Test that a barrier can be reset after being broken.
853 """
854 results1 = []
855 results2 = []
856 results3 = []
857 barrier2 = self.barriertype(self.N)
858 def f():
859 try:
860 i = self.barrier.wait()
861 if i == self.N//2:
862 raise RuntimeError
863 self.barrier.wait()
864 results1.append(True)
865 except threading.BrokenBarrierError:
866 results2.append(True)
867 except RuntimeError:
868 self.barrier.abort()
869 pass
870 # Synchronize and reset the barrier. Must synchronize first so
871 # that everyone has left it when we reset, and after so that no
872 # one enters it before the reset.
873 if barrier2.wait() == self.N//2:
874 self.barrier.reset()
875 barrier2.wait()
876 self.barrier.wait()
877 results3.append(True)
878
879 self.run_threads(f)
880 self.assertEqual(len(results1), 0)
881 self.assertEqual(len(results2), self.N-1)
882 self.assertEqual(len(results3), self.N)
883
884 def test_timeout(self):
885 """
886 Test wait(timeout)
887 """
888 def f():
889 i = self.barrier.wait()
890 if i == self.N // 2:
891 # One thread is late!
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000892 time.sleep(1.0)
893 # Default timeout is 2.0, so this is shorter.
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000894 self.assertRaises(threading.BrokenBarrierError,
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000895 self.barrier.wait, 0.5)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000896 self.run_threads(f)
897
898 def test_default_timeout(self):
899 """
900 Test the barrier's default timeout
901 """
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200902 # create a barrier with a low default timeout
903 barrier = self.barriertype(self.N, timeout=0.3)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000904 def f():
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000905 i = barrier.wait()
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000906 if i == self.N // 2:
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200907 # One thread is later than the default timeout of 0.3s.
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000908 time.sleep(1.0)
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000909 self.assertRaises(threading.BrokenBarrierError, barrier.wait)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000910 self.run_threads(f)
911
912 def test_single_thread(self):
913 b = self.barriertype(1)
914 b.wait()
915 b.wait()