blob: a6cb3b169b6d7323a8686bfd171107feac2a3f86 [file] [log] [blame]
Antoine Pitrou557934f2009-11-06 22:41:14 +00001"""
2Various tests for synchronization primitives.
3"""
4
5import sys
6import time
Victor Stinner2a129742011-05-30 23:02:52 +02007from _thread import start_new_thread, TIMEOUT_MAX
Antoine Pitrou557934f2009-11-06 22:41:14 +00008import threading
9import unittest
Raymond Hettinger7836a272015-10-09 00:03:51 -040010import weakref
Antoine Pitrou557934f2009-11-06 22:41:14 +000011
12from test import support
13
14
15def _wait():
16 # A crude wait/yield function not relying on synchronization primitives.
17 time.sleep(0.01)
18
19class Bunch(object):
20 """
21 A bunch of threads.
22 """
23 def __init__(self, f, n, wait_before_exit=False):
24 """
25 Construct a bunch of `n` threads running the same function `f`.
26 If `wait_before_exit` is True, the threads won't terminate until
27 do_finish() is called.
28 """
29 self.f = f
30 self.n = n
31 self.started = []
32 self.finished = []
33 self._can_exit = not wait_before_exit
34 def task():
Victor Stinner2a129742011-05-30 23:02:52 +020035 tid = threading.get_ident()
Antoine Pitrou557934f2009-11-06 22:41:14 +000036 self.started.append(tid)
37 try:
38 f()
39 finally:
40 self.finished.append(tid)
41 while not self._can_exit:
42 _wait()
Serhiy Storchaka9db55002015-03-28 20:38:37 +020043 try:
44 for i in range(n):
45 start_new_thread(task, ())
46 except:
47 self._can_exit = True
48 raise
Antoine Pitrou557934f2009-11-06 22:41:14 +000049
50 def wait_for_started(self):
51 while len(self.started) < self.n:
52 _wait()
53
54 def wait_for_finished(self):
55 while len(self.finished) < self.n:
56 _wait()
57
58 def do_finish(self):
59 self._can_exit = True
60
61
62class BaseTestCase(unittest.TestCase):
63 def setUp(self):
64 self._threads = support.threading_setup()
65
66 def tearDown(self):
67 support.threading_cleanup(*self._threads)
68 support.reap_children()
69
Antoine Pitrou7c3e5772010-04-14 15:44:10 +000070 def assertTimeout(self, actual, expected):
71 # The waiting and/or time.time() can be imprecise, which
72 # is why comparing to the expected value would sometimes fail
73 # (especially under Windows).
74 self.assertGreaterEqual(actual, expected * 0.6)
75 # Test nothing insane happened
76 self.assertLess(actual, expected * 10.0)
77
Antoine Pitrou557934f2009-11-06 22:41:14 +000078
79class BaseLockTests(BaseTestCase):
80 """
81 Tests for both recursive and non-recursive locks.
82 """
83
84 def test_constructor(self):
85 lock = self.locktype()
86 del lock
87
Christian Heimesc5d95b12013-07-30 15:54:39 +020088 def test_repr(self):
89 lock = self.locktype()
Raymond Hettinger62f4dad2014-05-25 18:22:35 -070090 self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
91 del lock
92
93 def test_locked_repr(self):
94 lock = self.locktype()
95 lock.acquire()
96 self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
Christian Heimesc5d95b12013-07-30 15:54:39 +020097 del lock
98
Antoine Pitrou557934f2009-11-06 22:41:14 +000099 def test_acquire_destroy(self):
100 lock = self.locktype()
101 lock.acquire()
102 del lock
103
104 def test_acquire_release(self):
105 lock = self.locktype()
106 lock.acquire()
107 lock.release()
108 del lock
109
110 def test_try_acquire(self):
111 lock = self.locktype()
112 self.assertTrue(lock.acquire(False))
113 lock.release()
114
115 def test_try_acquire_contended(self):
116 lock = self.locktype()
117 lock.acquire()
118 result = []
119 def f():
120 result.append(lock.acquire(False))
121 Bunch(f, 1).wait_for_finished()
122 self.assertFalse(result[0])
123 lock.release()
124
125 def test_acquire_contended(self):
126 lock = self.locktype()
127 lock.acquire()
128 N = 5
129 def f():
130 lock.acquire()
131 lock.release()
132
133 b = Bunch(f, N)
134 b.wait_for_started()
135 _wait()
136 self.assertEqual(len(b.finished), 0)
137 lock.release()
138 b.wait_for_finished()
139 self.assertEqual(len(b.finished), N)
140
141 def test_with(self):
142 lock = self.locktype()
143 def f():
144 lock.acquire()
145 lock.release()
146 def _with(err=None):
147 with lock:
148 if err is not None:
149 raise err
150 _with()
151 # Check the lock is unacquired
152 Bunch(f, 1).wait_for_finished()
153 self.assertRaises(TypeError, _with, TypeError)
154 # Check the lock is unacquired
155 Bunch(f, 1).wait_for_finished()
156
Antoine Pitroub0872682009-11-09 16:08:16 +0000157 def test_thread_leak(self):
158 # The lock shouldn't leak a Thread instance when used from a foreign
159 # (non-threading) thread.
160 lock = self.locktype()
161 def f():
162 lock.acquire()
163 lock.release()
164 n = len(threading.enumerate())
165 # We run many threads in the hope that existing threads ids won't
166 # be recycled.
167 Bunch(f, 15).wait_for_finished()
Antoine Pitrou45fdb452011-04-04 21:59:09 +0200168 if len(threading.enumerate()) != n:
169 # There is a small window during which a Thread instance's
170 # target function has finished running, but the Thread is still
171 # alive and registered. Avoid spurious failures by waiting a
172 # bit more (seen on a buildbot).
173 time.sleep(0.4)
174 self.assertEqual(n, len(threading.enumerate()))
Antoine Pitroub0872682009-11-09 16:08:16 +0000175
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000176 def test_timeout(self):
177 lock = self.locktype()
178 # Can't set timeout if not blocking
179 self.assertRaises(ValueError, lock.acquire, 0, 1)
180 # Invalid timeout values
181 self.assertRaises(ValueError, lock.acquire, timeout=-100)
182 self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
183 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
184 # TIMEOUT_MAX is ok
185 lock.acquire(timeout=TIMEOUT_MAX)
186 lock.release()
187 t1 = time.time()
188 self.assertTrue(lock.acquire(timeout=5))
189 t2 = time.time()
190 # Just a sanity test that it didn't actually wait for the timeout.
191 self.assertLess(t2 - t1, 5)
192 results = []
193 def f():
194 t1 = time.time()
195 results.append(lock.acquire(timeout=0.5))
196 t2 = time.time()
197 results.append(t2 - t1)
198 Bunch(f, 1).wait_for_finished()
199 self.assertFalse(results[0])
200 self.assertTimeout(results[1], 0.5)
201
Raymond Hettinger7836a272015-10-09 00:03:51 -0400202 def test_weakref_exists(self):
203 lock = self.locktype()
204 ref = weakref.ref(lock)
205 self.assertIsNotNone(ref())
206
207 def test_weakref_deleted(self):
208 lock = self.locktype()
209 ref = weakref.ref(lock)
210 del lock
211 self.assertIsNone(ref())
212
Antoine Pitrou557934f2009-11-06 22:41:14 +0000213
214class LockTests(BaseLockTests):
215 """
216 Tests for non-recursive, weak locks
217 (which can be acquired and released from different threads).
218 """
219 def test_reacquire(self):
220 # Lock needs to be released before re-acquiring.
221 lock = self.locktype()
222 phase = []
223 def f():
224 lock.acquire()
225 phase.append(None)
226 lock.acquire()
227 phase.append(None)
228 start_new_thread(f, ())
229 while len(phase) == 0:
230 _wait()
231 _wait()
232 self.assertEqual(len(phase), 1)
233 lock.release()
234 while len(phase) == 1:
235 _wait()
236 self.assertEqual(len(phase), 2)
237
238 def test_different_thread(self):
239 # Lock can be released from a different thread.
240 lock = self.locktype()
241 lock.acquire()
242 def f():
243 lock.release()
244 b = Bunch(f, 1)
245 b.wait_for_finished()
246 lock.acquire()
247 lock.release()
248
Antoine Pitrou7899acf2011-03-31 01:00:32 +0200249 def test_state_after_timeout(self):
250 # Issue #11618: check that lock is in a proper state after a
251 # (non-zero) timeout.
252 lock = self.locktype()
253 lock.acquire()
254 self.assertFalse(lock.acquire(timeout=0.01))
255 lock.release()
256 self.assertFalse(lock.locked())
257 self.assertTrue(lock.acquire(blocking=False))
258
Antoine Pitrou557934f2009-11-06 22:41:14 +0000259
260class RLockTests(BaseLockTests):
261 """
262 Tests for recursive locks.
263 """
264 def test_reacquire(self):
265 lock = self.locktype()
266 lock.acquire()
267 lock.acquire()
268 lock.release()
269 lock.acquire()
270 lock.release()
271 lock.release()
272
273 def test_release_unacquired(self):
274 # Cannot release an unacquired lock
275 lock = self.locktype()
276 self.assertRaises(RuntimeError, lock.release)
277 lock.acquire()
278 lock.acquire()
279 lock.release()
280 lock.acquire()
281 lock.release()
282 lock.release()
283 self.assertRaises(RuntimeError, lock.release)
Antoine Pitrouea3eb882012-05-17 18:55:59 +0200284
285 def test_release_save_unacquired(self):
286 # Cannot _release_save an unacquired lock
287 lock = self.locktype()
288 self.assertRaises(RuntimeError, lock._release_save)
289 lock.acquire()
290 lock.acquire()
291 lock.release()
292 lock.acquire()
293 lock.release()
294 lock.release()
Victor Stinnerc2824d42011-04-24 23:41:33 +0200295 self.assertRaises(RuntimeError, lock._release_save)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000296
297 def test_different_thread(self):
298 # Cannot release from a different thread
299 lock = self.locktype()
300 def f():
301 lock.acquire()
302 b = Bunch(f, 1, True)
303 try:
304 self.assertRaises(RuntimeError, lock.release)
305 finally:
306 b.do_finish()
307
308 def test__is_owned(self):
309 lock = self.locktype()
310 self.assertFalse(lock._is_owned())
311 lock.acquire()
312 self.assertTrue(lock._is_owned())
313 lock.acquire()
314 self.assertTrue(lock._is_owned())
315 result = []
316 def f():
317 result.append(lock._is_owned())
318 Bunch(f, 1).wait_for_finished()
319 self.assertFalse(result[0])
320 lock.release()
321 self.assertTrue(lock._is_owned())
322 lock.release()
323 self.assertFalse(lock._is_owned())
324
325
326class EventTests(BaseTestCase):
327 """
328 Tests for Event objects.
329 """
330
331 def test_is_set(self):
332 evt = self.eventtype()
333 self.assertFalse(evt.is_set())
334 evt.set()
335 self.assertTrue(evt.is_set())
336 evt.set()
337 self.assertTrue(evt.is_set())
338 evt.clear()
339 self.assertFalse(evt.is_set())
340 evt.clear()
341 self.assertFalse(evt.is_set())
342
343 def _check_notify(self, evt):
344 # All threads get notified
345 N = 5
346 results1 = []
347 results2 = []
348 def f():
349 results1.append(evt.wait())
350 results2.append(evt.wait())
351 b = Bunch(f, N)
352 b.wait_for_started()
353 _wait()
354 self.assertEqual(len(results1), 0)
355 evt.set()
356 b.wait_for_finished()
357 self.assertEqual(results1, [True] * N)
358 self.assertEqual(results2, [True] * N)
359
360 def test_notify(self):
361 evt = self.eventtype()
362 self._check_notify(evt)
363 # Another time, after an explicit clear()
364 evt.set()
365 evt.clear()
366 self._check_notify(evt)
367
368 def test_timeout(self):
369 evt = self.eventtype()
370 results1 = []
371 results2 = []
372 N = 5
373 def f():
374 results1.append(evt.wait(0.0))
375 t1 = time.time()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000376 r = evt.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000377 t2 = time.time()
378 results2.append((r, t2 - t1))
379 Bunch(f, N).wait_for_finished()
380 self.assertEqual(results1, [False] * N)
381 for r, dt in results2:
382 self.assertFalse(r)
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000383 self.assertTimeout(dt, 0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000384 # The event is set
385 results1 = []
386 results2 = []
387 evt.set()
388 Bunch(f, N).wait_for_finished()
389 self.assertEqual(results1, [True] * N)
390 for r, dt in results2:
391 self.assertTrue(r)
392
Charles-François Natalided03482012-01-07 18:24:56 +0100393 def test_set_and_clear(self):
394 # Issue #13502: check that wait() returns true even when the event is
395 # cleared before the waiting thread is woken up.
396 evt = self.eventtype()
397 results = []
398 N = 5
399 def f():
400 results.append(evt.wait(1))
401 b = Bunch(f, N)
402 b.wait_for_started()
403 time.sleep(0.5)
404 evt.set()
405 evt.clear()
406 b.wait_for_finished()
407 self.assertEqual(results, [True] * N)
408
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700409 def test_reset_internal_locks(self):
Berker Peksag6d34bbb2016-04-29 17:25:29 +0300410 # ensure that condition is still using a Lock after reset
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700411 evt = self.eventtype()
Berker Peksag6d34bbb2016-04-29 17:25:29 +0300412 with evt._cond:
413 self.assertFalse(evt._cond.acquire(False))
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700414 evt._reset_internal_locks()
Berker Peksag6d34bbb2016-04-29 17:25:29 +0300415 with evt._cond:
416 self.assertFalse(evt._cond.acquire(False))
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700417
Antoine Pitrou557934f2009-11-06 22:41:14 +0000418
419class ConditionTests(BaseTestCase):
420 """
421 Tests for condition variables.
422 """
423
424 def test_acquire(self):
425 cond = self.condtype()
426 # Be default we have an RLock: the condition can be acquired multiple
427 # times.
428 cond.acquire()
429 cond.acquire()
430 cond.release()
431 cond.release()
432 lock = threading.Lock()
433 cond = self.condtype(lock)
434 cond.acquire()
435 self.assertFalse(lock.acquire(False))
436 cond.release()
437 self.assertTrue(lock.acquire(False))
438 self.assertFalse(cond.acquire(False))
439 lock.release()
440 with cond:
441 self.assertFalse(lock.acquire(False))
442
443 def test_unacquired_wait(self):
444 cond = self.condtype()
445 self.assertRaises(RuntimeError, cond.wait)
446
447 def test_unacquired_notify(self):
448 cond = self.condtype()
449 self.assertRaises(RuntimeError, cond.notify)
450
451 def _check_notify(self, cond):
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000452 # Note that this test is sensitive to timing. If the worker threads
453 # don't execute in a timely fashion, the main thread may think they
454 # are further along then they are. The main thread therefore issues
455 # _wait() statements to try to make sure that it doesn't race ahead
456 # of the workers.
457 # Secondly, this test assumes that condition variables are not subject
458 # to spurious wakeups. The absence of spurious wakeups is an implementation
459 # detail of Condition Cariables in current CPython, but in general, not
460 # a guaranteed property of condition variables as a programming
461 # construct. In particular, it is possible that this can no longer
462 # be conveniently guaranteed should their implementation ever change.
Antoine Pitrou557934f2009-11-06 22:41:14 +0000463 N = 5
464 results1 = []
465 results2 = []
466 phase_num = 0
467 def f():
468 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000469 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000470 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000471 results1.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000472 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000473 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000474 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000475 results2.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000476 b = Bunch(f, N)
477 b.wait_for_started()
478 _wait()
479 self.assertEqual(results1, [])
480 # Notify 3 threads at first
481 cond.acquire()
482 cond.notify(3)
483 _wait()
484 phase_num = 1
485 cond.release()
486 while len(results1) < 3:
487 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000488 self.assertEqual(results1, [(True, 1)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000489 self.assertEqual(results2, [])
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000490 # first wait, to ensure all workers settle into cond.wait() before
491 # we continue. See issue #8799
492 _wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000493 # Notify 5 threads: they might be in their first or second wait
494 cond.acquire()
495 cond.notify(5)
496 _wait()
497 phase_num = 2
498 cond.release()
499 while len(results1) + len(results2) < 8:
500 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000501 self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
502 self.assertEqual(results2, [(True, 2)] * 3)
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000503 _wait() # make sure all workers settle into cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000504 # Notify all threads: they are all in their second wait
505 cond.acquire()
506 cond.notify_all()
507 _wait()
508 phase_num = 3
509 cond.release()
510 while len(results2) < 5:
511 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000512 self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
513 self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000514 b.wait_for_finished()
515
516 def test_notify(self):
517 cond = self.condtype()
518 self._check_notify(cond)
519 # A second time, to check internal state is still ok.
520 self._check_notify(cond)
521
522 def test_timeout(self):
523 cond = self.condtype()
524 results = []
525 N = 5
526 def f():
527 cond.acquire()
528 t1 = time.time()
Georg Brandlb9a43912010-10-28 09:03:20 +0000529 result = cond.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000530 t2 = time.time()
531 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000532 results.append((t2 - t1, result))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000533 Bunch(f, N).wait_for_finished()
Georg Brandlb9a43912010-10-28 09:03:20 +0000534 self.assertEqual(len(results), N)
535 for dt, result in results:
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000536 self.assertTimeout(dt, 0.5)
Georg Brandlb9a43912010-10-28 09:03:20 +0000537 # Note that conceptually (that"s the condition variable protocol)
538 # a wait() may succeed even if no one notifies us and before any
539 # timeout occurs. Spurious wakeups can occur.
540 # This makes it hard to verify the result value.
541 # In practice, this implementation has no spurious wakeups.
542 self.assertFalse(result)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000543
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000544 def test_waitfor(self):
545 cond = self.condtype()
546 state = 0
547 def f():
548 with cond:
549 result = cond.wait_for(lambda : state==4)
550 self.assertTrue(result)
551 self.assertEqual(state, 4)
552 b = Bunch(f, 1)
553 b.wait_for_started()
Victor Stinner3349bca2011-05-18 00:16:14 +0200554 for i in range(4):
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000555 time.sleep(0.01)
556 with cond:
557 state += 1
558 cond.notify()
559 b.wait_for_finished()
560
561 def test_waitfor_timeout(self):
562 cond = self.condtype()
563 state = 0
564 success = []
565 def f():
566 with cond:
567 dt = time.time()
568 result = cond.wait_for(lambda : state==4, timeout=0.1)
569 dt = time.time() - dt
570 self.assertFalse(result)
571 self.assertTimeout(dt, 0.1)
572 success.append(None)
573 b = Bunch(f, 1)
574 b.wait_for_started()
575 # Only increment 3 times, so state == 4 is never reached.
576 for i in range(3):
577 time.sleep(0.01)
578 with cond:
579 state += 1
580 cond.notify()
581 b.wait_for_finished()
582 self.assertEqual(len(success), 1)
583
Antoine Pitrou557934f2009-11-06 22:41:14 +0000584
585class BaseSemaphoreTests(BaseTestCase):
586 """
587 Common tests for {bounded, unbounded} semaphore objects.
588 """
589
590 def test_constructor(self):
591 self.assertRaises(ValueError, self.semtype, value = -1)
592 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
593
594 def test_acquire(self):
595 sem = self.semtype(1)
596 sem.acquire()
597 sem.release()
598 sem = self.semtype(2)
599 sem.acquire()
600 sem.acquire()
601 sem.release()
602 sem.release()
603
604 def test_acquire_destroy(self):
605 sem = self.semtype()
606 sem.acquire()
607 del sem
608
609 def test_acquire_contended(self):
610 sem = self.semtype(7)
611 sem.acquire()
612 N = 10
613 results1 = []
614 results2 = []
615 phase_num = 0
616 def f():
617 sem.acquire()
618 results1.append(phase_num)
619 sem.acquire()
620 results2.append(phase_num)
621 b = Bunch(f, 10)
622 b.wait_for_started()
623 while len(results1) + len(results2) < 6:
624 _wait()
625 self.assertEqual(results1 + results2, [0] * 6)
626 phase_num = 1
627 for i in range(7):
628 sem.release()
629 while len(results1) + len(results2) < 13:
630 _wait()
631 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
632 phase_num = 2
633 for i in range(6):
634 sem.release()
635 while len(results1) + len(results2) < 19:
636 _wait()
637 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
638 # The semaphore is still locked
639 self.assertFalse(sem.acquire(False))
640 # Final release, to let the last thread finish
641 sem.release()
642 b.wait_for_finished()
643
644 def test_try_acquire(self):
645 sem = self.semtype(2)
646 self.assertTrue(sem.acquire(False))
647 self.assertTrue(sem.acquire(False))
648 self.assertFalse(sem.acquire(False))
649 sem.release()
650 self.assertTrue(sem.acquire(False))
651
652 def test_try_acquire_contended(self):
653 sem = self.semtype(4)
654 sem.acquire()
655 results = []
656 def f():
657 results.append(sem.acquire(False))
658 results.append(sem.acquire(False))
659 Bunch(f, 5).wait_for_finished()
660 # There can be a thread switch between acquiring the semaphore and
661 # appending the result, therefore results will not necessarily be
662 # ordered.
663 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
664
Antoine Pitrou0454af92010-04-17 23:51:58 +0000665 def test_acquire_timeout(self):
666 sem = self.semtype(2)
667 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
668 self.assertTrue(sem.acquire(timeout=0.005))
669 self.assertTrue(sem.acquire(timeout=0.005))
670 self.assertFalse(sem.acquire(timeout=0.005))
671 sem.release()
672 self.assertTrue(sem.acquire(timeout=0.005))
673 t = time.time()
674 self.assertFalse(sem.acquire(timeout=0.5))
675 dt = time.time() - t
676 self.assertTimeout(dt, 0.5)
677
Antoine Pitrou557934f2009-11-06 22:41:14 +0000678 def test_default_value(self):
679 # The default initial value is 1.
680 sem = self.semtype()
681 sem.acquire()
682 def f():
683 sem.acquire()
684 sem.release()
685 b = Bunch(f, 1)
686 b.wait_for_started()
687 _wait()
688 self.assertFalse(b.finished)
689 sem.release()
690 b.wait_for_finished()
691
692 def test_with(self):
693 sem = self.semtype(2)
694 def _with(err=None):
695 with sem:
696 self.assertTrue(sem.acquire(False))
697 sem.release()
698 with sem:
699 self.assertFalse(sem.acquire(False))
700 if err:
701 raise err
702 _with()
703 self.assertTrue(sem.acquire(False))
704 sem.release()
705 self.assertRaises(TypeError, _with, TypeError)
706 self.assertTrue(sem.acquire(False))
707 sem.release()
708
709class SemaphoreTests(BaseSemaphoreTests):
710 """
711 Tests for unbounded semaphores.
712 """
713
714 def test_release_unacquired(self):
715 # Unbounded releases are allowed and increment the semaphore's value
716 sem = self.semtype(1)
717 sem.release()
718 sem.acquire()
719 sem.acquire()
720 sem.release()
721
722
723class BoundedSemaphoreTests(BaseSemaphoreTests):
724 """
725 Tests for bounded semaphores.
726 """
727
728 def test_release_unacquired(self):
729 # Cannot go past the initial value
730 sem = self.semtype()
731 self.assertRaises(ValueError, sem.release)
732 sem.acquire()
733 sem.release()
734 self.assertRaises(ValueError, sem.release)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000735
736
737class BarrierTests(BaseTestCase):
738 """
739 Tests for Barrier objects.
740 """
741 N = 5
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000742 defaultTimeout = 2.0
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000743
744 def setUp(self):
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000745 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000746 def tearDown(self):
747 self.barrier.abort()
748
749 def run_threads(self, f):
750 b = Bunch(f, self.N-1)
751 f()
752 b.wait_for_finished()
753
754 def multipass(self, results, n):
755 m = self.barrier.parties
756 self.assertEqual(m, self.N)
757 for i in range(n):
758 results[0].append(True)
759 self.assertEqual(len(results[1]), i * m)
760 self.barrier.wait()
761 results[1].append(True)
762 self.assertEqual(len(results[0]), (i + 1) * m)
763 self.barrier.wait()
764 self.assertEqual(self.barrier.n_waiting, 0)
765 self.assertFalse(self.barrier.broken)
766
767 def test_barrier(self, passes=1):
768 """
769 Test that a barrier is passed in lockstep
770 """
771 results = [[],[]]
772 def f():
773 self.multipass(results, passes)
774 self.run_threads(f)
775
776 def test_barrier_10(self):
777 """
778 Test that a barrier works for 10 consecutive runs
779 """
780 return self.test_barrier(10)
781
782 def test_wait_return(self):
783 """
784 test the return value from barrier.wait
785 """
786 results = []
787 def f():
788 r = self.barrier.wait()
789 results.append(r)
790
791 self.run_threads(f)
792 self.assertEqual(sum(results), sum(range(self.N)))
793
794 def test_action(self):
795 """
796 Test the 'action' callback
797 """
798 results = []
799 def action():
800 results.append(True)
801 barrier = self.barriertype(self.N, action)
802 def f():
803 barrier.wait()
804 self.assertEqual(len(results), 1)
805
806 self.run_threads(f)
807
808 def test_abort(self):
809 """
810 Test that an abort will put the barrier in a broken state
811 """
812 results1 = []
813 results2 = []
814 def f():
815 try:
816 i = self.barrier.wait()
817 if i == self.N//2:
818 raise RuntimeError
819 self.barrier.wait()
820 results1.append(True)
821 except threading.BrokenBarrierError:
822 results2.append(True)
823 except RuntimeError:
824 self.barrier.abort()
825 pass
826
827 self.run_threads(f)
828 self.assertEqual(len(results1), 0)
829 self.assertEqual(len(results2), self.N-1)
830 self.assertTrue(self.barrier.broken)
831
832 def test_reset(self):
833 """
834 Test that a 'reset' on a barrier frees the waiting threads
835 """
836 results1 = []
837 results2 = []
838 results3 = []
839 def f():
840 i = self.barrier.wait()
841 if i == self.N//2:
842 # Wait until the other threads are all in the barrier.
843 while self.barrier.n_waiting < self.N-1:
844 time.sleep(0.001)
845 self.barrier.reset()
846 else:
847 try:
848 self.barrier.wait()
849 results1.append(True)
850 except threading.BrokenBarrierError:
851 results2.append(True)
852 # Now, pass the barrier again
853 self.barrier.wait()
854 results3.append(True)
855
856 self.run_threads(f)
857 self.assertEqual(len(results1), 0)
858 self.assertEqual(len(results2), self.N-1)
859 self.assertEqual(len(results3), self.N)
860
861
862 def test_abort_and_reset(self):
863 """
864 Test that a barrier can be reset after being broken.
865 """
866 results1 = []
867 results2 = []
868 results3 = []
869 barrier2 = self.barriertype(self.N)
870 def f():
871 try:
872 i = self.barrier.wait()
873 if i == self.N//2:
874 raise RuntimeError
875 self.barrier.wait()
876 results1.append(True)
877 except threading.BrokenBarrierError:
878 results2.append(True)
879 except RuntimeError:
880 self.barrier.abort()
881 pass
882 # Synchronize and reset the barrier. Must synchronize first so
883 # that everyone has left it when we reset, and after so that no
884 # one enters it before the reset.
885 if barrier2.wait() == self.N//2:
886 self.barrier.reset()
887 barrier2.wait()
888 self.barrier.wait()
889 results3.append(True)
890
891 self.run_threads(f)
892 self.assertEqual(len(results1), 0)
893 self.assertEqual(len(results2), self.N-1)
894 self.assertEqual(len(results3), self.N)
895
896 def test_timeout(self):
897 """
898 Test wait(timeout)
899 """
900 def f():
901 i = self.barrier.wait()
902 if i == self.N // 2:
903 # One thread is late!
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000904 time.sleep(1.0)
905 # Default timeout is 2.0, so this is shorter.
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000906 self.assertRaises(threading.BrokenBarrierError,
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000907 self.barrier.wait, 0.5)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000908 self.run_threads(f)
909
910 def test_default_timeout(self):
911 """
912 Test the barrier's default timeout
913 """
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200914 # create a barrier with a low default timeout
915 barrier = self.barriertype(self.N, timeout=0.3)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000916 def f():
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000917 i = barrier.wait()
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000918 if i == self.N // 2:
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200919 # One thread is later than the default timeout of 0.3s.
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000920 time.sleep(1.0)
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000921 self.assertRaises(threading.BrokenBarrierError, barrier.wait)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000922 self.run_threads(f)
923
924 def test_single_thread(self):
925 b = self.barriertype(1)
926 b.wait()
927 b.wait()