blob: 055bf28565d1f3f5537853a82ceb65e0fe7f3bec [file] [log] [blame]
Antoine Pitrou557934f2009-11-06 22:41:14 +00001"""
2Various tests for synchronization primitives.
3"""
4
5import sys
6import time
Victor Stinner2a129742011-05-30 23:02:52 +02007from _thread import start_new_thread, TIMEOUT_MAX
Antoine Pitrou557934f2009-11-06 22:41:14 +00008import threading
9import unittest
Raymond Hettinger7836a272015-10-09 00:03:51 -040010import weakref
Antoine Pitrou557934f2009-11-06 22:41:14 +000011
12from test import support
13
14
15def _wait():
16 # A crude wait/yield function not relying on synchronization primitives.
17 time.sleep(0.01)
18
19class Bunch(object):
20 """
21 A bunch of threads.
22 """
23 def __init__(self, f, n, wait_before_exit=False):
24 """
25 Construct a bunch of `n` threads running the same function `f`.
26 If `wait_before_exit` is True, the threads won't terminate until
27 do_finish() is called.
28 """
29 self.f = f
30 self.n = n
31 self.started = []
32 self.finished = []
33 self._can_exit = not wait_before_exit
34 def task():
Victor Stinner2a129742011-05-30 23:02:52 +020035 tid = threading.get_ident()
Antoine Pitrou557934f2009-11-06 22:41:14 +000036 self.started.append(tid)
37 try:
38 f()
39 finally:
40 self.finished.append(tid)
41 while not self._can_exit:
42 _wait()
Serhiy Storchaka9db55002015-03-28 20:38:37 +020043 try:
44 for i in range(n):
45 start_new_thread(task, ())
46 except:
47 self._can_exit = True
48 raise
Antoine Pitrou557934f2009-11-06 22:41:14 +000049
50 def wait_for_started(self):
51 while len(self.started) < self.n:
52 _wait()
53
54 def wait_for_finished(self):
55 while len(self.finished) < self.n:
56 _wait()
57
58 def do_finish(self):
59 self._can_exit = True
60
61
62class BaseTestCase(unittest.TestCase):
63 def setUp(self):
64 self._threads = support.threading_setup()
65
66 def tearDown(self):
67 support.threading_cleanup(*self._threads)
68 support.reap_children()
69
Antoine Pitrou7c3e5772010-04-14 15:44:10 +000070 def assertTimeout(self, actual, expected):
71 # The waiting and/or time.time() can be imprecise, which
72 # is why comparing to the expected value would sometimes fail
73 # (especially under Windows).
74 self.assertGreaterEqual(actual, expected * 0.6)
75 # Test nothing insane happened
76 self.assertLess(actual, expected * 10.0)
77
Antoine Pitrou557934f2009-11-06 22:41:14 +000078
79class BaseLockTests(BaseTestCase):
80 """
81 Tests for both recursive and non-recursive locks.
82 """
83
84 def test_constructor(self):
85 lock = self.locktype()
86 del lock
87
Christian Heimesc5d95b12013-07-30 15:54:39 +020088 def test_repr(self):
89 lock = self.locktype()
Raymond Hettinger62f4dad2014-05-25 18:22:35 -070090 self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
91 del lock
92
93 def test_locked_repr(self):
94 lock = self.locktype()
95 lock.acquire()
96 self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
Christian Heimesc5d95b12013-07-30 15:54:39 +020097 del lock
98
Antoine Pitrou557934f2009-11-06 22:41:14 +000099 def test_acquire_destroy(self):
100 lock = self.locktype()
101 lock.acquire()
102 del lock
103
104 def test_acquire_release(self):
105 lock = self.locktype()
106 lock.acquire()
107 lock.release()
108 del lock
109
110 def test_try_acquire(self):
111 lock = self.locktype()
112 self.assertTrue(lock.acquire(False))
113 lock.release()
114
115 def test_try_acquire_contended(self):
116 lock = self.locktype()
117 lock.acquire()
118 result = []
119 def f():
120 result.append(lock.acquire(False))
121 Bunch(f, 1).wait_for_finished()
122 self.assertFalse(result[0])
123 lock.release()
124
125 def test_acquire_contended(self):
126 lock = self.locktype()
127 lock.acquire()
128 N = 5
129 def f():
130 lock.acquire()
131 lock.release()
132
133 b = Bunch(f, N)
134 b.wait_for_started()
135 _wait()
136 self.assertEqual(len(b.finished), 0)
137 lock.release()
138 b.wait_for_finished()
139 self.assertEqual(len(b.finished), N)
140
141 def test_with(self):
142 lock = self.locktype()
143 def f():
144 lock.acquire()
145 lock.release()
146 def _with(err=None):
147 with lock:
148 if err is not None:
149 raise err
150 _with()
151 # Check the lock is unacquired
152 Bunch(f, 1).wait_for_finished()
153 self.assertRaises(TypeError, _with, TypeError)
154 # Check the lock is unacquired
155 Bunch(f, 1).wait_for_finished()
156
Antoine Pitroub0872682009-11-09 16:08:16 +0000157 def test_thread_leak(self):
158 # The lock shouldn't leak a Thread instance when used from a foreign
159 # (non-threading) thread.
160 lock = self.locktype()
161 def f():
162 lock.acquire()
163 lock.release()
164 n = len(threading.enumerate())
165 # We run many threads in the hope that existing threads ids won't
166 # be recycled.
167 Bunch(f, 15).wait_for_finished()
Antoine Pitrou45fdb452011-04-04 21:59:09 +0200168 if len(threading.enumerate()) != n:
169 # There is a small window during which a Thread instance's
170 # target function has finished running, but the Thread is still
171 # alive and registered. Avoid spurious failures by waiting a
172 # bit more (seen on a buildbot).
173 time.sleep(0.4)
174 self.assertEqual(n, len(threading.enumerate()))
Antoine Pitroub0872682009-11-09 16:08:16 +0000175
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000176 def test_timeout(self):
177 lock = self.locktype()
178 # Can't set timeout if not blocking
179 self.assertRaises(ValueError, lock.acquire, 0, 1)
180 # Invalid timeout values
181 self.assertRaises(ValueError, lock.acquire, timeout=-100)
182 self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
183 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
184 # TIMEOUT_MAX is ok
185 lock.acquire(timeout=TIMEOUT_MAX)
186 lock.release()
187 t1 = time.time()
188 self.assertTrue(lock.acquire(timeout=5))
189 t2 = time.time()
190 # Just a sanity test that it didn't actually wait for the timeout.
191 self.assertLess(t2 - t1, 5)
192 results = []
193 def f():
194 t1 = time.time()
195 results.append(lock.acquire(timeout=0.5))
196 t2 = time.time()
197 results.append(t2 - t1)
198 Bunch(f, 1).wait_for_finished()
199 self.assertFalse(results[0])
200 self.assertTimeout(results[1], 0.5)
201
Raymond Hettinger7836a272015-10-09 00:03:51 -0400202 def test_weakref_exists(self):
203 lock = self.locktype()
204 ref = weakref.ref(lock)
205 self.assertIsNotNone(ref())
206
207 def test_weakref_deleted(self):
208 lock = self.locktype()
209 ref = weakref.ref(lock)
210 del lock
211 self.assertIsNone(ref())
212
Antoine Pitrou557934f2009-11-06 22:41:14 +0000213
214class LockTests(BaseLockTests):
215 """
216 Tests for non-recursive, weak locks
217 (which can be acquired and released from different threads).
218 """
219 def test_reacquire(self):
220 # Lock needs to be released before re-acquiring.
221 lock = self.locktype()
222 phase = []
223 def f():
224 lock.acquire()
225 phase.append(None)
226 lock.acquire()
227 phase.append(None)
228 start_new_thread(f, ())
229 while len(phase) == 0:
230 _wait()
231 _wait()
232 self.assertEqual(len(phase), 1)
233 lock.release()
234 while len(phase) == 1:
235 _wait()
236 self.assertEqual(len(phase), 2)
237
238 def test_different_thread(self):
239 # Lock can be released from a different thread.
240 lock = self.locktype()
241 lock.acquire()
242 def f():
243 lock.release()
244 b = Bunch(f, 1)
245 b.wait_for_finished()
246 lock.acquire()
247 lock.release()
248
Antoine Pitrou7899acf2011-03-31 01:00:32 +0200249 def test_state_after_timeout(self):
250 # Issue #11618: check that lock is in a proper state after a
251 # (non-zero) timeout.
252 lock = self.locktype()
253 lock.acquire()
254 self.assertFalse(lock.acquire(timeout=0.01))
255 lock.release()
256 self.assertFalse(lock.locked())
257 self.assertTrue(lock.acquire(blocking=False))
258
Antoine Pitrou557934f2009-11-06 22:41:14 +0000259
260class RLockTests(BaseLockTests):
261 """
262 Tests for recursive locks.
263 """
264 def test_reacquire(self):
265 lock = self.locktype()
266 lock.acquire()
267 lock.acquire()
268 lock.release()
269 lock.acquire()
270 lock.release()
271 lock.release()
272
273 def test_release_unacquired(self):
274 # Cannot release an unacquired lock
275 lock = self.locktype()
276 self.assertRaises(RuntimeError, lock.release)
277 lock.acquire()
278 lock.acquire()
279 lock.release()
280 lock.acquire()
281 lock.release()
282 lock.release()
283 self.assertRaises(RuntimeError, lock.release)
Antoine Pitrouea3eb882012-05-17 18:55:59 +0200284
285 def test_release_save_unacquired(self):
286 # Cannot _release_save an unacquired lock
287 lock = self.locktype()
288 self.assertRaises(RuntimeError, lock._release_save)
289 lock.acquire()
290 lock.acquire()
291 lock.release()
292 lock.acquire()
293 lock.release()
294 lock.release()
Victor Stinnerc2824d42011-04-24 23:41:33 +0200295 self.assertRaises(RuntimeError, lock._release_save)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000296
297 def test_different_thread(self):
298 # Cannot release from a different thread
299 lock = self.locktype()
300 def f():
301 lock.acquire()
302 b = Bunch(f, 1, True)
303 try:
304 self.assertRaises(RuntimeError, lock.release)
305 finally:
306 b.do_finish()
307
308 def test__is_owned(self):
309 lock = self.locktype()
310 self.assertFalse(lock._is_owned())
311 lock.acquire()
312 self.assertTrue(lock._is_owned())
313 lock.acquire()
314 self.assertTrue(lock._is_owned())
315 result = []
316 def f():
317 result.append(lock._is_owned())
318 Bunch(f, 1).wait_for_finished()
319 self.assertFalse(result[0])
320 lock.release()
321 self.assertTrue(lock._is_owned())
322 lock.release()
323 self.assertFalse(lock._is_owned())
324
325
326class EventTests(BaseTestCase):
327 """
328 Tests for Event objects.
329 """
330
331 def test_is_set(self):
332 evt = self.eventtype()
333 self.assertFalse(evt.is_set())
334 evt.set()
335 self.assertTrue(evt.is_set())
336 evt.set()
337 self.assertTrue(evt.is_set())
338 evt.clear()
339 self.assertFalse(evt.is_set())
340 evt.clear()
341 self.assertFalse(evt.is_set())
342
343 def _check_notify(self, evt):
344 # All threads get notified
345 N = 5
346 results1 = []
347 results2 = []
348 def f():
349 results1.append(evt.wait())
350 results2.append(evt.wait())
351 b = Bunch(f, N)
352 b.wait_for_started()
353 _wait()
354 self.assertEqual(len(results1), 0)
355 evt.set()
356 b.wait_for_finished()
357 self.assertEqual(results1, [True] * N)
358 self.assertEqual(results2, [True] * N)
359
360 def test_notify(self):
361 evt = self.eventtype()
362 self._check_notify(evt)
363 # Another time, after an explicit clear()
364 evt.set()
365 evt.clear()
366 self._check_notify(evt)
367
368 def test_timeout(self):
369 evt = self.eventtype()
370 results1 = []
371 results2 = []
372 N = 5
373 def f():
374 results1.append(evt.wait(0.0))
375 t1 = time.time()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000376 r = evt.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000377 t2 = time.time()
378 results2.append((r, t2 - t1))
379 Bunch(f, N).wait_for_finished()
380 self.assertEqual(results1, [False] * N)
381 for r, dt in results2:
382 self.assertFalse(r)
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000383 self.assertTimeout(dt, 0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000384 # The event is set
385 results1 = []
386 results2 = []
387 evt.set()
388 Bunch(f, N).wait_for_finished()
389 self.assertEqual(results1, [True] * N)
390 for r, dt in results2:
391 self.assertTrue(r)
392
Charles-François Natalided03482012-01-07 18:24:56 +0100393 def test_set_and_clear(self):
394 # Issue #13502: check that wait() returns true even when the event is
395 # cleared before the waiting thread is woken up.
396 evt = self.eventtype()
397 results = []
398 N = 5
399 def f():
400 results.append(evt.wait(1))
401 b = Bunch(f, N)
402 b.wait_for_started()
403 time.sleep(0.5)
404 evt.set()
405 evt.clear()
406 b.wait_for_finished()
407 self.assertEqual(results, [True] * N)
408
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700409 def test_reset_internal_locks(self):
410 evt = self.eventtype()
411 old_lock = evt._cond._lock
412 evt._reset_internal_locks()
413 new_lock = evt._cond._lock
414 self.assertIsNot(new_lock, old_lock)
415 self.assertIs(type(new_lock), type(old_lock))
416
Antoine Pitrou557934f2009-11-06 22:41:14 +0000417
418class ConditionTests(BaseTestCase):
419 """
420 Tests for condition variables.
421 """
422
423 def test_acquire(self):
424 cond = self.condtype()
425 # Be default we have an RLock: the condition can be acquired multiple
426 # times.
427 cond.acquire()
428 cond.acquire()
429 cond.release()
430 cond.release()
431 lock = threading.Lock()
432 cond = self.condtype(lock)
433 cond.acquire()
434 self.assertFalse(lock.acquire(False))
435 cond.release()
436 self.assertTrue(lock.acquire(False))
437 self.assertFalse(cond.acquire(False))
438 lock.release()
439 with cond:
440 self.assertFalse(lock.acquire(False))
441
442 def test_unacquired_wait(self):
443 cond = self.condtype()
444 self.assertRaises(RuntimeError, cond.wait)
445
446 def test_unacquired_notify(self):
447 cond = self.condtype()
448 self.assertRaises(RuntimeError, cond.notify)
449
450 def _check_notify(self, cond):
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000451 # Note that this test is sensitive to timing. If the worker threads
452 # don't execute in a timely fashion, the main thread may think they
453 # are further along then they are. The main thread therefore issues
454 # _wait() statements to try to make sure that it doesn't race ahead
455 # of the workers.
456 # Secondly, this test assumes that condition variables are not subject
457 # to spurious wakeups. The absence of spurious wakeups is an implementation
458 # detail of Condition Cariables in current CPython, but in general, not
459 # a guaranteed property of condition variables as a programming
460 # construct. In particular, it is possible that this can no longer
461 # be conveniently guaranteed should their implementation ever change.
Antoine Pitrou557934f2009-11-06 22:41:14 +0000462 N = 5
463 results1 = []
464 results2 = []
465 phase_num = 0
466 def f():
467 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000468 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000469 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000470 results1.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000471 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000472 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000473 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000474 results2.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000475 b = Bunch(f, N)
476 b.wait_for_started()
477 _wait()
478 self.assertEqual(results1, [])
479 # Notify 3 threads at first
480 cond.acquire()
481 cond.notify(3)
482 _wait()
483 phase_num = 1
484 cond.release()
485 while len(results1) < 3:
486 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000487 self.assertEqual(results1, [(True, 1)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000488 self.assertEqual(results2, [])
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000489 # first wait, to ensure all workers settle into cond.wait() before
490 # we continue. See issue #8799
491 _wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000492 # Notify 5 threads: they might be in their first or second wait
493 cond.acquire()
494 cond.notify(5)
495 _wait()
496 phase_num = 2
497 cond.release()
498 while len(results1) + len(results2) < 8:
499 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000500 self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
501 self.assertEqual(results2, [(True, 2)] * 3)
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000502 _wait() # make sure all workers settle into cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000503 # Notify all threads: they are all in their second wait
504 cond.acquire()
505 cond.notify_all()
506 _wait()
507 phase_num = 3
508 cond.release()
509 while len(results2) < 5:
510 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000511 self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
512 self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000513 b.wait_for_finished()
514
515 def test_notify(self):
516 cond = self.condtype()
517 self._check_notify(cond)
518 # A second time, to check internal state is still ok.
519 self._check_notify(cond)
520
521 def test_timeout(self):
522 cond = self.condtype()
523 results = []
524 N = 5
525 def f():
526 cond.acquire()
527 t1 = time.time()
Georg Brandlb9a43912010-10-28 09:03:20 +0000528 result = cond.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000529 t2 = time.time()
530 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000531 results.append((t2 - t1, result))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000532 Bunch(f, N).wait_for_finished()
Georg Brandlb9a43912010-10-28 09:03:20 +0000533 self.assertEqual(len(results), N)
534 for dt, result in results:
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000535 self.assertTimeout(dt, 0.5)
Georg Brandlb9a43912010-10-28 09:03:20 +0000536 # Note that conceptually (that"s the condition variable protocol)
537 # a wait() may succeed even if no one notifies us and before any
538 # timeout occurs. Spurious wakeups can occur.
539 # This makes it hard to verify the result value.
540 # In practice, this implementation has no spurious wakeups.
541 self.assertFalse(result)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000542
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000543 def test_waitfor(self):
544 cond = self.condtype()
545 state = 0
546 def f():
547 with cond:
548 result = cond.wait_for(lambda : state==4)
549 self.assertTrue(result)
550 self.assertEqual(state, 4)
551 b = Bunch(f, 1)
552 b.wait_for_started()
Victor Stinner3349bca2011-05-18 00:16:14 +0200553 for i in range(4):
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000554 time.sleep(0.01)
555 with cond:
556 state += 1
557 cond.notify()
558 b.wait_for_finished()
559
560 def test_waitfor_timeout(self):
561 cond = self.condtype()
562 state = 0
563 success = []
564 def f():
565 with cond:
566 dt = time.time()
567 result = cond.wait_for(lambda : state==4, timeout=0.1)
568 dt = time.time() - dt
569 self.assertFalse(result)
570 self.assertTimeout(dt, 0.1)
571 success.append(None)
572 b = Bunch(f, 1)
573 b.wait_for_started()
574 # Only increment 3 times, so state == 4 is never reached.
575 for i in range(3):
576 time.sleep(0.01)
577 with cond:
578 state += 1
579 cond.notify()
580 b.wait_for_finished()
581 self.assertEqual(len(success), 1)
582
Antoine Pitrou557934f2009-11-06 22:41:14 +0000583
584class BaseSemaphoreTests(BaseTestCase):
585 """
586 Common tests for {bounded, unbounded} semaphore objects.
587 """
588
589 def test_constructor(self):
590 self.assertRaises(ValueError, self.semtype, value = -1)
591 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
592
593 def test_acquire(self):
594 sem = self.semtype(1)
595 sem.acquire()
596 sem.release()
597 sem = self.semtype(2)
598 sem.acquire()
599 sem.acquire()
600 sem.release()
601 sem.release()
602
603 def test_acquire_destroy(self):
604 sem = self.semtype()
605 sem.acquire()
606 del sem
607
608 def test_acquire_contended(self):
609 sem = self.semtype(7)
610 sem.acquire()
611 N = 10
612 results1 = []
613 results2 = []
614 phase_num = 0
615 def f():
616 sem.acquire()
617 results1.append(phase_num)
618 sem.acquire()
619 results2.append(phase_num)
620 b = Bunch(f, 10)
621 b.wait_for_started()
622 while len(results1) + len(results2) < 6:
623 _wait()
624 self.assertEqual(results1 + results2, [0] * 6)
625 phase_num = 1
626 for i in range(7):
627 sem.release()
628 while len(results1) + len(results2) < 13:
629 _wait()
630 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
631 phase_num = 2
632 for i in range(6):
633 sem.release()
634 while len(results1) + len(results2) < 19:
635 _wait()
636 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
637 # The semaphore is still locked
638 self.assertFalse(sem.acquire(False))
639 # Final release, to let the last thread finish
640 sem.release()
641 b.wait_for_finished()
642
643 def test_try_acquire(self):
644 sem = self.semtype(2)
645 self.assertTrue(sem.acquire(False))
646 self.assertTrue(sem.acquire(False))
647 self.assertFalse(sem.acquire(False))
648 sem.release()
649 self.assertTrue(sem.acquire(False))
650
651 def test_try_acquire_contended(self):
652 sem = self.semtype(4)
653 sem.acquire()
654 results = []
655 def f():
656 results.append(sem.acquire(False))
657 results.append(sem.acquire(False))
658 Bunch(f, 5).wait_for_finished()
659 # There can be a thread switch between acquiring the semaphore and
660 # appending the result, therefore results will not necessarily be
661 # ordered.
662 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
663
Antoine Pitrou0454af92010-04-17 23:51:58 +0000664 def test_acquire_timeout(self):
665 sem = self.semtype(2)
666 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
667 self.assertTrue(sem.acquire(timeout=0.005))
668 self.assertTrue(sem.acquire(timeout=0.005))
669 self.assertFalse(sem.acquire(timeout=0.005))
670 sem.release()
671 self.assertTrue(sem.acquire(timeout=0.005))
672 t = time.time()
673 self.assertFalse(sem.acquire(timeout=0.5))
674 dt = time.time() - t
675 self.assertTimeout(dt, 0.5)
676
Antoine Pitrou557934f2009-11-06 22:41:14 +0000677 def test_default_value(self):
678 # The default initial value is 1.
679 sem = self.semtype()
680 sem.acquire()
681 def f():
682 sem.acquire()
683 sem.release()
684 b = Bunch(f, 1)
685 b.wait_for_started()
686 _wait()
687 self.assertFalse(b.finished)
688 sem.release()
689 b.wait_for_finished()
690
691 def test_with(self):
692 sem = self.semtype(2)
693 def _with(err=None):
694 with sem:
695 self.assertTrue(sem.acquire(False))
696 sem.release()
697 with sem:
698 self.assertFalse(sem.acquire(False))
699 if err:
700 raise err
701 _with()
702 self.assertTrue(sem.acquire(False))
703 sem.release()
704 self.assertRaises(TypeError, _with, TypeError)
705 self.assertTrue(sem.acquire(False))
706 sem.release()
707
708class SemaphoreTests(BaseSemaphoreTests):
709 """
710 Tests for unbounded semaphores.
711 """
712
713 def test_release_unacquired(self):
714 # Unbounded releases are allowed and increment the semaphore's value
715 sem = self.semtype(1)
716 sem.release()
717 sem.acquire()
718 sem.acquire()
719 sem.release()
720
721
722class BoundedSemaphoreTests(BaseSemaphoreTests):
723 """
724 Tests for bounded semaphores.
725 """
726
727 def test_release_unacquired(self):
728 # Cannot go past the initial value
729 sem = self.semtype()
730 self.assertRaises(ValueError, sem.release)
731 sem.acquire()
732 sem.release()
733 self.assertRaises(ValueError, sem.release)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000734
735
736class BarrierTests(BaseTestCase):
737 """
738 Tests for Barrier objects.
739 """
740 N = 5
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000741 defaultTimeout = 2.0
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000742
743 def setUp(self):
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000744 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000745 def tearDown(self):
746 self.barrier.abort()
747
748 def run_threads(self, f):
749 b = Bunch(f, self.N-1)
750 f()
751 b.wait_for_finished()
752
753 def multipass(self, results, n):
754 m = self.barrier.parties
755 self.assertEqual(m, self.N)
756 for i in range(n):
757 results[0].append(True)
758 self.assertEqual(len(results[1]), i * m)
759 self.barrier.wait()
760 results[1].append(True)
761 self.assertEqual(len(results[0]), (i + 1) * m)
762 self.barrier.wait()
763 self.assertEqual(self.barrier.n_waiting, 0)
764 self.assertFalse(self.barrier.broken)
765
766 def test_barrier(self, passes=1):
767 """
768 Test that a barrier is passed in lockstep
769 """
770 results = [[],[]]
771 def f():
772 self.multipass(results, passes)
773 self.run_threads(f)
774
775 def test_barrier_10(self):
776 """
777 Test that a barrier works for 10 consecutive runs
778 """
779 return self.test_barrier(10)
780
781 def test_wait_return(self):
782 """
783 test the return value from barrier.wait
784 """
785 results = []
786 def f():
787 r = self.barrier.wait()
788 results.append(r)
789
790 self.run_threads(f)
791 self.assertEqual(sum(results), sum(range(self.N)))
792
793 def test_action(self):
794 """
795 Test the 'action' callback
796 """
797 results = []
798 def action():
799 results.append(True)
800 barrier = self.barriertype(self.N, action)
801 def f():
802 barrier.wait()
803 self.assertEqual(len(results), 1)
804
805 self.run_threads(f)
806
807 def test_abort(self):
808 """
809 Test that an abort will put the barrier in a broken state
810 """
811 results1 = []
812 results2 = []
813 def f():
814 try:
815 i = self.barrier.wait()
816 if i == self.N//2:
817 raise RuntimeError
818 self.barrier.wait()
819 results1.append(True)
820 except threading.BrokenBarrierError:
821 results2.append(True)
822 except RuntimeError:
823 self.barrier.abort()
824 pass
825
826 self.run_threads(f)
827 self.assertEqual(len(results1), 0)
828 self.assertEqual(len(results2), self.N-1)
829 self.assertTrue(self.barrier.broken)
830
831 def test_reset(self):
832 """
833 Test that a 'reset' on a barrier frees the waiting threads
834 """
835 results1 = []
836 results2 = []
837 results3 = []
838 def f():
839 i = self.barrier.wait()
840 if i == self.N//2:
841 # Wait until the other threads are all in the barrier.
842 while self.barrier.n_waiting < self.N-1:
843 time.sleep(0.001)
844 self.barrier.reset()
845 else:
846 try:
847 self.barrier.wait()
848 results1.append(True)
849 except threading.BrokenBarrierError:
850 results2.append(True)
851 # Now, pass the barrier again
852 self.barrier.wait()
853 results3.append(True)
854
855 self.run_threads(f)
856 self.assertEqual(len(results1), 0)
857 self.assertEqual(len(results2), self.N-1)
858 self.assertEqual(len(results3), self.N)
859
860
861 def test_abort_and_reset(self):
862 """
863 Test that a barrier can be reset after being broken.
864 """
865 results1 = []
866 results2 = []
867 results3 = []
868 barrier2 = self.barriertype(self.N)
869 def f():
870 try:
871 i = self.barrier.wait()
872 if i == self.N//2:
873 raise RuntimeError
874 self.barrier.wait()
875 results1.append(True)
876 except threading.BrokenBarrierError:
877 results2.append(True)
878 except RuntimeError:
879 self.barrier.abort()
880 pass
881 # Synchronize and reset the barrier. Must synchronize first so
882 # that everyone has left it when we reset, and after so that no
883 # one enters it before the reset.
884 if barrier2.wait() == self.N//2:
885 self.barrier.reset()
886 barrier2.wait()
887 self.barrier.wait()
888 results3.append(True)
889
890 self.run_threads(f)
891 self.assertEqual(len(results1), 0)
892 self.assertEqual(len(results2), self.N-1)
893 self.assertEqual(len(results3), self.N)
894
895 def test_timeout(self):
896 """
897 Test wait(timeout)
898 """
899 def f():
900 i = self.barrier.wait()
901 if i == self.N // 2:
902 # One thread is late!
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000903 time.sleep(1.0)
904 # Default timeout is 2.0, so this is shorter.
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000905 self.assertRaises(threading.BrokenBarrierError,
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000906 self.barrier.wait, 0.5)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000907 self.run_threads(f)
908
909 def test_default_timeout(self):
910 """
911 Test the barrier's default timeout
912 """
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200913 # create a barrier with a low default timeout
914 barrier = self.barriertype(self.N, timeout=0.3)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000915 def f():
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000916 i = barrier.wait()
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000917 if i == self.N // 2:
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200918 # One thread is later than the default timeout of 0.3s.
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000919 time.sleep(1.0)
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000920 self.assertRaises(threading.BrokenBarrierError, barrier.wait)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000921 self.run_threads(f)
922
923 def test_single_thread(self):
924 b = self.barriertype(1)
925 b.wait()
926 b.wait()