blob: 4fa154f734d07021a698055d70e9974b6130b35c [file] [log] [blame]
Antoine Pitrou557934f2009-11-06 22:41:14 +00001"""
2Various tests for synchronization primitives.
3"""
4
5import sys
6import time
Victor Stinner2a129742011-05-30 23:02:52 +02007from _thread import start_new_thread, TIMEOUT_MAX
Antoine Pitrou557934f2009-11-06 22:41:14 +00008import threading
9import unittest
Raymond Hettinger7836a272015-10-09 00:03:51 -040010import weakref
Antoine Pitrou557934f2009-11-06 22:41:14 +000011
12from test import support
13
14
15def _wait():
16 # A crude wait/yield function not relying on synchronization primitives.
17 time.sleep(0.01)
18
19class Bunch(object):
20 """
21 A bunch of threads.
22 """
23 def __init__(self, f, n, wait_before_exit=False):
24 """
25 Construct a bunch of `n` threads running the same function `f`.
26 If `wait_before_exit` is True, the threads won't terminate until
27 do_finish() is called.
28 """
29 self.f = f
30 self.n = n
31 self.started = []
32 self.finished = []
33 self._can_exit = not wait_before_exit
34 def task():
Victor Stinner2a129742011-05-30 23:02:52 +020035 tid = threading.get_ident()
Antoine Pitrou557934f2009-11-06 22:41:14 +000036 self.started.append(tid)
37 try:
38 f()
39 finally:
40 self.finished.append(tid)
41 while not self._can_exit:
42 _wait()
Serhiy Storchaka9db55002015-03-28 20:38:37 +020043 try:
44 for i in range(n):
45 start_new_thread(task, ())
46 except:
47 self._can_exit = True
48 raise
Antoine Pitrou557934f2009-11-06 22:41:14 +000049
50 def wait_for_started(self):
51 while len(self.started) < self.n:
52 _wait()
53
54 def wait_for_finished(self):
55 while len(self.finished) < self.n:
56 _wait()
57
58 def do_finish(self):
59 self._can_exit = True
60
61
62class BaseTestCase(unittest.TestCase):
63 def setUp(self):
64 self._threads = support.threading_setup()
65
66 def tearDown(self):
67 support.threading_cleanup(*self._threads)
68 support.reap_children()
69
Antoine Pitrou7c3e5772010-04-14 15:44:10 +000070 def assertTimeout(self, actual, expected):
71 # The waiting and/or time.time() can be imprecise, which
72 # is why comparing to the expected value would sometimes fail
73 # (especially under Windows).
74 self.assertGreaterEqual(actual, expected * 0.6)
75 # Test nothing insane happened
76 self.assertLess(actual, expected * 10.0)
77
Antoine Pitrou557934f2009-11-06 22:41:14 +000078
79class BaseLockTests(BaseTestCase):
80 """
81 Tests for both recursive and non-recursive locks.
82 """
83
84 def test_constructor(self):
85 lock = self.locktype()
86 del lock
87
Christian Heimesc5d95b12013-07-30 15:54:39 +020088 def test_repr(self):
89 lock = self.locktype()
Raymond Hettinger62f4dad2014-05-25 18:22:35 -070090 self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
91 del lock
92
93 def test_locked_repr(self):
94 lock = self.locktype()
95 lock.acquire()
96 self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
Christian Heimesc5d95b12013-07-30 15:54:39 +020097 del lock
98
Antoine Pitrou557934f2009-11-06 22:41:14 +000099 def test_acquire_destroy(self):
100 lock = self.locktype()
101 lock.acquire()
102 del lock
103
104 def test_acquire_release(self):
105 lock = self.locktype()
106 lock.acquire()
107 lock.release()
108 del lock
109
110 def test_try_acquire(self):
111 lock = self.locktype()
112 self.assertTrue(lock.acquire(False))
113 lock.release()
114
115 def test_try_acquire_contended(self):
116 lock = self.locktype()
117 lock.acquire()
118 result = []
119 def f():
120 result.append(lock.acquire(False))
121 Bunch(f, 1).wait_for_finished()
122 self.assertFalse(result[0])
123 lock.release()
124
125 def test_acquire_contended(self):
126 lock = self.locktype()
127 lock.acquire()
128 N = 5
129 def f():
130 lock.acquire()
131 lock.release()
132
133 b = Bunch(f, N)
134 b.wait_for_started()
135 _wait()
136 self.assertEqual(len(b.finished), 0)
137 lock.release()
138 b.wait_for_finished()
139 self.assertEqual(len(b.finished), N)
140
141 def test_with(self):
142 lock = self.locktype()
143 def f():
144 lock.acquire()
145 lock.release()
146 def _with(err=None):
147 with lock:
148 if err is not None:
149 raise err
150 _with()
151 # Check the lock is unacquired
152 Bunch(f, 1).wait_for_finished()
153 self.assertRaises(TypeError, _with, TypeError)
154 # Check the lock is unacquired
155 Bunch(f, 1).wait_for_finished()
156
Antoine Pitroub0872682009-11-09 16:08:16 +0000157 def test_thread_leak(self):
158 # The lock shouldn't leak a Thread instance when used from a foreign
159 # (non-threading) thread.
160 lock = self.locktype()
161 def f():
162 lock.acquire()
163 lock.release()
164 n = len(threading.enumerate())
165 # We run many threads in the hope that existing threads ids won't
166 # be recycled.
167 Bunch(f, 15).wait_for_finished()
Antoine Pitrou45fdb452011-04-04 21:59:09 +0200168 if len(threading.enumerate()) != n:
169 # There is a small window during which a Thread instance's
170 # target function has finished running, but the Thread is still
171 # alive and registered. Avoid spurious failures by waiting a
172 # bit more (seen on a buildbot).
173 time.sleep(0.4)
174 self.assertEqual(n, len(threading.enumerate()))
Antoine Pitroub0872682009-11-09 16:08:16 +0000175
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000176 def test_timeout(self):
177 lock = self.locktype()
178 # Can't set timeout if not blocking
179 self.assertRaises(ValueError, lock.acquire, 0, 1)
180 # Invalid timeout values
181 self.assertRaises(ValueError, lock.acquire, timeout=-100)
182 self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
183 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
184 # TIMEOUT_MAX is ok
185 lock.acquire(timeout=TIMEOUT_MAX)
186 lock.release()
187 t1 = time.time()
188 self.assertTrue(lock.acquire(timeout=5))
189 t2 = time.time()
190 # Just a sanity test that it didn't actually wait for the timeout.
191 self.assertLess(t2 - t1, 5)
192 results = []
193 def f():
194 t1 = time.time()
195 results.append(lock.acquire(timeout=0.5))
196 t2 = time.time()
197 results.append(t2 - t1)
198 Bunch(f, 1).wait_for_finished()
199 self.assertFalse(results[0])
200 self.assertTimeout(results[1], 0.5)
201
Raymond Hettinger7836a272015-10-09 00:03:51 -0400202 def test_weakref_exists(self):
203 lock = self.locktype()
204 ref = weakref.ref(lock)
205 self.assertIsNotNone(ref())
206
207 def test_weakref_deleted(self):
208 lock = self.locktype()
209 ref = weakref.ref(lock)
210 del lock
211 self.assertIsNone(ref())
212
Antoine Pitrou557934f2009-11-06 22:41:14 +0000213
214class LockTests(BaseLockTests):
215 """
216 Tests for non-recursive, weak locks
217 (which can be acquired and released from different threads).
218 """
219 def test_reacquire(self):
220 # Lock needs to be released before re-acquiring.
221 lock = self.locktype()
222 phase = []
223 def f():
224 lock.acquire()
225 phase.append(None)
226 lock.acquire()
227 phase.append(None)
228 start_new_thread(f, ())
229 while len(phase) == 0:
230 _wait()
231 _wait()
232 self.assertEqual(len(phase), 1)
233 lock.release()
234 while len(phase) == 1:
235 _wait()
236 self.assertEqual(len(phase), 2)
237
238 def test_different_thread(self):
239 # Lock can be released from a different thread.
240 lock = self.locktype()
241 lock.acquire()
242 def f():
243 lock.release()
244 b = Bunch(f, 1)
245 b.wait_for_finished()
246 lock.acquire()
247 lock.release()
248
Antoine Pitrou7899acf2011-03-31 01:00:32 +0200249 def test_state_after_timeout(self):
250 # Issue #11618: check that lock is in a proper state after a
251 # (non-zero) timeout.
252 lock = self.locktype()
253 lock.acquire()
254 self.assertFalse(lock.acquire(timeout=0.01))
255 lock.release()
256 self.assertFalse(lock.locked())
257 self.assertTrue(lock.acquire(blocking=False))
258
Antoine Pitrou557934f2009-11-06 22:41:14 +0000259
260class RLockTests(BaseLockTests):
261 """
262 Tests for recursive locks.
263 """
264 def test_reacquire(self):
265 lock = self.locktype()
266 lock.acquire()
267 lock.acquire()
268 lock.release()
269 lock.acquire()
270 lock.release()
271 lock.release()
272
273 def test_release_unacquired(self):
274 # Cannot release an unacquired lock
275 lock = self.locktype()
276 self.assertRaises(RuntimeError, lock.release)
277 lock.acquire()
278 lock.acquire()
279 lock.release()
280 lock.acquire()
281 lock.release()
282 lock.release()
283 self.assertRaises(RuntimeError, lock.release)
Antoine Pitrouea3eb882012-05-17 18:55:59 +0200284
285 def test_release_save_unacquired(self):
286 # Cannot _release_save an unacquired lock
287 lock = self.locktype()
288 self.assertRaises(RuntimeError, lock._release_save)
289 lock.acquire()
290 lock.acquire()
291 lock.release()
292 lock.acquire()
293 lock.release()
294 lock.release()
Victor Stinnerc2824d42011-04-24 23:41:33 +0200295 self.assertRaises(RuntimeError, lock._release_save)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000296
297 def test_different_thread(self):
298 # Cannot release from a different thread
299 lock = self.locktype()
300 def f():
301 lock.acquire()
302 b = Bunch(f, 1, True)
303 try:
304 self.assertRaises(RuntimeError, lock.release)
305 finally:
306 b.do_finish()
307
308 def test__is_owned(self):
309 lock = self.locktype()
310 self.assertFalse(lock._is_owned())
311 lock.acquire()
312 self.assertTrue(lock._is_owned())
313 lock.acquire()
314 self.assertTrue(lock._is_owned())
315 result = []
316 def f():
317 result.append(lock._is_owned())
318 Bunch(f, 1).wait_for_finished()
319 self.assertFalse(result[0])
320 lock.release()
321 self.assertTrue(lock._is_owned())
322 lock.release()
323 self.assertFalse(lock._is_owned())
324
325
326class EventTests(BaseTestCase):
327 """
328 Tests for Event objects.
329 """
330
331 def test_is_set(self):
332 evt = self.eventtype()
333 self.assertFalse(evt.is_set())
334 evt.set()
335 self.assertTrue(evt.is_set())
336 evt.set()
337 self.assertTrue(evt.is_set())
338 evt.clear()
339 self.assertFalse(evt.is_set())
340 evt.clear()
341 self.assertFalse(evt.is_set())
342
343 def _check_notify(self, evt):
344 # All threads get notified
345 N = 5
346 results1 = []
347 results2 = []
348 def f():
349 results1.append(evt.wait())
350 results2.append(evt.wait())
351 b = Bunch(f, N)
352 b.wait_for_started()
353 _wait()
354 self.assertEqual(len(results1), 0)
355 evt.set()
356 b.wait_for_finished()
357 self.assertEqual(results1, [True] * N)
358 self.assertEqual(results2, [True] * N)
359
360 def test_notify(self):
361 evt = self.eventtype()
362 self._check_notify(evt)
363 # Another time, after an explicit clear()
364 evt.set()
365 evt.clear()
366 self._check_notify(evt)
367
368 def test_timeout(self):
369 evt = self.eventtype()
370 results1 = []
371 results2 = []
372 N = 5
373 def f():
374 results1.append(evt.wait(0.0))
375 t1 = time.time()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000376 r = evt.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000377 t2 = time.time()
378 results2.append((r, t2 - t1))
379 Bunch(f, N).wait_for_finished()
380 self.assertEqual(results1, [False] * N)
381 for r, dt in results2:
382 self.assertFalse(r)
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000383 self.assertTimeout(dt, 0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000384 # The event is set
385 results1 = []
386 results2 = []
387 evt.set()
388 Bunch(f, N).wait_for_finished()
389 self.assertEqual(results1, [True] * N)
390 for r, dt in results2:
391 self.assertTrue(r)
392
Charles-François Natalided03482012-01-07 18:24:56 +0100393 def test_set_and_clear(self):
394 # Issue #13502: check that wait() returns true even when the event is
395 # cleared before the waiting thread is woken up.
396 evt = self.eventtype()
397 results = []
398 N = 5
399 def f():
400 results.append(evt.wait(1))
401 b = Bunch(f, N)
402 b.wait_for_started()
403 time.sleep(0.5)
404 evt.set()
405 evt.clear()
406 b.wait_for_finished()
407 self.assertEqual(results, [True] * N)
408
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700409 def test_reset_internal_locks(self):
Berker Peksag6d34bbb2016-04-29 17:25:29 +0300410 # ensure that condition is still using a Lock after reset
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700411 evt = self.eventtype()
Berker Peksag6d34bbb2016-04-29 17:25:29 +0300412 with evt._cond:
413 self.assertFalse(evt._cond.acquire(False))
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700414 evt._reset_internal_locks()
Berker Peksag6d34bbb2016-04-29 17:25:29 +0300415 with evt._cond:
416 self.assertFalse(evt._cond.acquire(False))
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700417
Antoine Pitrou557934f2009-11-06 22:41:14 +0000418
419class ConditionTests(BaseTestCase):
420 """
421 Tests for condition variables.
422 """
423
424 def test_acquire(self):
425 cond = self.condtype()
426 # Be default we have an RLock: the condition can be acquired multiple
427 # times.
428 cond.acquire()
429 cond.acquire()
430 cond.release()
431 cond.release()
432 lock = threading.Lock()
433 cond = self.condtype(lock)
434 cond.acquire()
435 self.assertFalse(lock.acquire(False))
436 cond.release()
437 self.assertTrue(lock.acquire(False))
438 self.assertFalse(cond.acquire(False))
439 lock.release()
440 with cond:
441 self.assertFalse(lock.acquire(False))
442
443 def test_unacquired_wait(self):
444 cond = self.condtype()
445 self.assertRaises(RuntimeError, cond.wait)
446
447 def test_unacquired_notify(self):
448 cond = self.condtype()
449 self.assertRaises(RuntimeError, cond.notify)
450
451 def _check_notify(self, cond):
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000452 # Note that this test is sensitive to timing. If the worker threads
453 # don't execute in a timely fashion, the main thread may think they
454 # are further along then they are. The main thread therefore issues
455 # _wait() statements to try to make sure that it doesn't race ahead
456 # of the workers.
457 # Secondly, this test assumes that condition variables are not subject
458 # to spurious wakeups. The absence of spurious wakeups is an implementation
459 # detail of Condition Cariables in current CPython, but in general, not
460 # a guaranteed property of condition variables as a programming
461 # construct. In particular, it is possible that this can no longer
462 # be conveniently guaranteed should their implementation ever change.
Antoine Pitrou557934f2009-11-06 22:41:14 +0000463 N = 5
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300464 ready = []
Antoine Pitrou557934f2009-11-06 22:41:14 +0000465 results1 = []
466 results2 = []
467 phase_num = 0
468 def f():
469 cond.acquire()
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300470 ready.append(phase_num)
Georg Brandlb9a43912010-10-28 09:03:20 +0000471 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000472 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000473 results1.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000474 cond.acquire()
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300475 ready.append(phase_num)
Georg Brandlb9a43912010-10-28 09:03:20 +0000476 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000477 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000478 results2.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000479 b = Bunch(f, N)
480 b.wait_for_started()
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300481 # first wait, to ensure all workers settle into cond.wait() before
482 # we continue. See issues #8799 and #30727.
483 while len(ready) < 5:
484 _wait()
485 ready.clear()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000486 self.assertEqual(results1, [])
487 # Notify 3 threads at first
488 cond.acquire()
489 cond.notify(3)
490 _wait()
491 phase_num = 1
492 cond.release()
493 while len(results1) < 3:
494 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000495 self.assertEqual(results1, [(True, 1)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000496 self.assertEqual(results2, [])
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300497 # make sure all awaken workers settle into cond.wait()
498 while len(ready) < 3:
499 _wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000500 # Notify 5 threads: they might be in their first or second wait
501 cond.acquire()
502 cond.notify(5)
503 _wait()
504 phase_num = 2
505 cond.release()
506 while len(results1) + len(results2) < 8:
507 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000508 self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
509 self.assertEqual(results2, [(True, 2)] * 3)
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300510 # make sure all workers settle into cond.wait()
511 while len(ready) < 5:
512 _wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000513 # Notify all threads: they are all in their second wait
514 cond.acquire()
515 cond.notify_all()
516 _wait()
517 phase_num = 3
518 cond.release()
519 while len(results2) < 5:
520 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000521 self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
522 self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000523 b.wait_for_finished()
524
525 def test_notify(self):
526 cond = self.condtype()
527 self._check_notify(cond)
528 # A second time, to check internal state is still ok.
529 self._check_notify(cond)
530
531 def test_timeout(self):
532 cond = self.condtype()
533 results = []
534 N = 5
535 def f():
536 cond.acquire()
537 t1 = time.time()
Georg Brandlb9a43912010-10-28 09:03:20 +0000538 result = cond.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000539 t2 = time.time()
540 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000541 results.append((t2 - t1, result))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000542 Bunch(f, N).wait_for_finished()
Georg Brandlb9a43912010-10-28 09:03:20 +0000543 self.assertEqual(len(results), N)
544 for dt, result in results:
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000545 self.assertTimeout(dt, 0.5)
Georg Brandlb9a43912010-10-28 09:03:20 +0000546 # Note that conceptually (that"s the condition variable protocol)
547 # a wait() may succeed even if no one notifies us and before any
548 # timeout occurs. Spurious wakeups can occur.
549 # This makes it hard to verify the result value.
550 # In practice, this implementation has no spurious wakeups.
551 self.assertFalse(result)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000552
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000553 def test_waitfor(self):
554 cond = self.condtype()
555 state = 0
556 def f():
557 with cond:
558 result = cond.wait_for(lambda : state==4)
559 self.assertTrue(result)
560 self.assertEqual(state, 4)
561 b = Bunch(f, 1)
562 b.wait_for_started()
Victor Stinner3349bca2011-05-18 00:16:14 +0200563 for i in range(4):
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000564 time.sleep(0.01)
565 with cond:
566 state += 1
567 cond.notify()
568 b.wait_for_finished()
569
570 def test_waitfor_timeout(self):
571 cond = self.condtype()
572 state = 0
573 success = []
574 def f():
575 with cond:
576 dt = time.time()
577 result = cond.wait_for(lambda : state==4, timeout=0.1)
578 dt = time.time() - dt
579 self.assertFalse(result)
580 self.assertTimeout(dt, 0.1)
581 success.append(None)
582 b = Bunch(f, 1)
583 b.wait_for_started()
584 # Only increment 3 times, so state == 4 is never reached.
585 for i in range(3):
586 time.sleep(0.01)
587 with cond:
588 state += 1
589 cond.notify()
590 b.wait_for_finished()
591 self.assertEqual(len(success), 1)
592
Antoine Pitrou557934f2009-11-06 22:41:14 +0000593
594class BaseSemaphoreTests(BaseTestCase):
595 """
596 Common tests for {bounded, unbounded} semaphore objects.
597 """
598
599 def test_constructor(self):
600 self.assertRaises(ValueError, self.semtype, value = -1)
601 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
602
603 def test_acquire(self):
604 sem = self.semtype(1)
605 sem.acquire()
606 sem.release()
607 sem = self.semtype(2)
608 sem.acquire()
609 sem.acquire()
610 sem.release()
611 sem.release()
612
613 def test_acquire_destroy(self):
614 sem = self.semtype()
615 sem.acquire()
616 del sem
617
618 def test_acquire_contended(self):
619 sem = self.semtype(7)
620 sem.acquire()
621 N = 10
622 results1 = []
623 results2 = []
624 phase_num = 0
625 def f():
626 sem.acquire()
627 results1.append(phase_num)
628 sem.acquire()
629 results2.append(phase_num)
630 b = Bunch(f, 10)
631 b.wait_for_started()
632 while len(results1) + len(results2) < 6:
633 _wait()
634 self.assertEqual(results1 + results2, [0] * 6)
635 phase_num = 1
636 for i in range(7):
637 sem.release()
638 while len(results1) + len(results2) < 13:
639 _wait()
640 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
641 phase_num = 2
642 for i in range(6):
643 sem.release()
644 while len(results1) + len(results2) < 19:
645 _wait()
646 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
647 # The semaphore is still locked
648 self.assertFalse(sem.acquire(False))
649 # Final release, to let the last thread finish
650 sem.release()
651 b.wait_for_finished()
652
653 def test_try_acquire(self):
654 sem = self.semtype(2)
655 self.assertTrue(sem.acquire(False))
656 self.assertTrue(sem.acquire(False))
657 self.assertFalse(sem.acquire(False))
658 sem.release()
659 self.assertTrue(sem.acquire(False))
660
661 def test_try_acquire_contended(self):
662 sem = self.semtype(4)
663 sem.acquire()
664 results = []
665 def f():
666 results.append(sem.acquire(False))
667 results.append(sem.acquire(False))
668 Bunch(f, 5).wait_for_finished()
669 # There can be a thread switch between acquiring the semaphore and
670 # appending the result, therefore results will not necessarily be
671 # ordered.
672 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
673
Antoine Pitrou0454af92010-04-17 23:51:58 +0000674 def test_acquire_timeout(self):
675 sem = self.semtype(2)
676 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
677 self.assertTrue(sem.acquire(timeout=0.005))
678 self.assertTrue(sem.acquire(timeout=0.005))
679 self.assertFalse(sem.acquire(timeout=0.005))
680 sem.release()
681 self.assertTrue(sem.acquire(timeout=0.005))
682 t = time.time()
683 self.assertFalse(sem.acquire(timeout=0.5))
684 dt = time.time() - t
685 self.assertTimeout(dt, 0.5)
686
Antoine Pitrou557934f2009-11-06 22:41:14 +0000687 def test_default_value(self):
688 # The default initial value is 1.
689 sem = self.semtype()
690 sem.acquire()
691 def f():
692 sem.acquire()
693 sem.release()
694 b = Bunch(f, 1)
695 b.wait_for_started()
696 _wait()
697 self.assertFalse(b.finished)
698 sem.release()
699 b.wait_for_finished()
700
701 def test_with(self):
702 sem = self.semtype(2)
703 def _with(err=None):
704 with sem:
705 self.assertTrue(sem.acquire(False))
706 sem.release()
707 with sem:
708 self.assertFalse(sem.acquire(False))
709 if err:
710 raise err
711 _with()
712 self.assertTrue(sem.acquire(False))
713 sem.release()
714 self.assertRaises(TypeError, _with, TypeError)
715 self.assertTrue(sem.acquire(False))
716 sem.release()
717
718class SemaphoreTests(BaseSemaphoreTests):
719 """
720 Tests for unbounded semaphores.
721 """
722
723 def test_release_unacquired(self):
724 # Unbounded releases are allowed and increment the semaphore's value
725 sem = self.semtype(1)
726 sem.release()
727 sem.acquire()
728 sem.acquire()
729 sem.release()
730
731
732class BoundedSemaphoreTests(BaseSemaphoreTests):
733 """
734 Tests for bounded semaphores.
735 """
736
737 def test_release_unacquired(self):
738 # Cannot go past the initial value
739 sem = self.semtype()
740 self.assertRaises(ValueError, sem.release)
741 sem.acquire()
742 sem.release()
743 self.assertRaises(ValueError, sem.release)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000744
745
746class BarrierTests(BaseTestCase):
747 """
748 Tests for Barrier objects.
749 """
750 N = 5
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000751 defaultTimeout = 2.0
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000752
753 def setUp(self):
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000754 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000755 def tearDown(self):
756 self.barrier.abort()
757
758 def run_threads(self, f):
759 b = Bunch(f, self.N-1)
760 f()
761 b.wait_for_finished()
762
763 def multipass(self, results, n):
764 m = self.barrier.parties
765 self.assertEqual(m, self.N)
766 for i in range(n):
767 results[0].append(True)
768 self.assertEqual(len(results[1]), i * m)
769 self.barrier.wait()
770 results[1].append(True)
771 self.assertEqual(len(results[0]), (i + 1) * m)
772 self.barrier.wait()
773 self.assertEqual(self.barrier.n_waiting, 0)
774 self.assertFalse(self.barrier.broken)
775
776 def test_barrier(self, passes=1):
777 """
778 Test that a barrier is passed in lockstep
779 """
780 results = [[],[]]
781 def f():
782 self.multipass(results, passes)
783 self.run_threads(f)
784
785 def test_barrier_10(self):
786 """
787 Test that a barrier works for 10 consecutive runs
788 """
789 return self.test_barrier(10)
790
791 def test_wait_return(self):
792 """
793 test the return value from barrier.wait
794 """
795 results = []
796 def f():
797 r = self.barrier.wait()
798 results.append(r)
799
800 self.run_threads(f)
801 self.assertEqual(sum(results), sum(range(self.N)))
802
803 def test_action(self):
804 """
805 Test the 'action' callback
806 """
807 results = []
808 def action():
809 results.append(True)
810 barrier = self.barriertype(self.N, action)
811 def f():
812 barrier.wait()
813 self.assertEqual(len(results), 1)
814
815 self.run_threads(f)
816
817 def test_abort(self):
818 """
819 Test that an abort will put the barrier in a broken state
820 """
821 results1 = []
822 results2 = []
823 def f():
824 try:
825 i = self.barrier.wait()
826 if i == self.N//2:
827 raise RuntimeError
828 self.barrier.wait()
829 results1.append(True)
830 except threading.BrokenBarrierError:
831 results2.append(True)
832 except RuntimeError:
833 self.barrier.abort()
834 pass
835
836 self.run_threads(f)
837 self.assertEqual(len(results1), 0)
838 self.assertEqual(len(results2), self.N-1)
839 self.assertTrue(self.barrier.broken)
840
841 def test_reset(self):
842 """
843 Test that a 'reset' on a barrier frees the waiting threads
844 """
845 results1 = []
846 results2 = []
847 results3 = []
848 def f():
849 i = self.barrier.wait()
850 if i == self.N//2:
851 # Wait until the other threads are all in the barrier.
852 while self.barrier.n_waiting < self.N-1:
853 time.sleep(0.001)
854 self.barrier.reset()
855 else:
856 try:
857 self.barrier.wait()
858 results1.append(True)
859 except threading.BrokenBarrierError:
860 results2.append(True)
861 # Now, pass the barrier again
862 self.barrier.wait()
863 results3.append(True)
864
865 self.run_threads(f)
866 self.assertEqual(len(results1), 0)
867 self.assertEqual(len(results2), self.N-1)
868 self.assertEqual(len(results3), self.N)
869
870
871 def test_abort_and_reset(self):
872 """
873 Test that a barrier can be reset after being broken.
874 """
875 results1 = []
876 results2 = []
877 results3 = []
878 barrier2 = self.barriertype(self.N)
879 def f():
880 try:
881 i = self.barrier.wait()
882 if i == self.N//2:
883 raise RuntimeError
884 self.barrier.wait()
885 results1.append(True)
886 except threading.BrokenBarrierError:
887 results2.append(True)
888 except RuntimeError:
889 self.barrier.abort()
890 pass
891 # Synchronize and reset the barrier. Must synchronize first so
892 # that everyone has left it when we reset, and after so that no
893 # one enters it before the reset.
894 if barrier2.wait() == self.N//2:
895 self.barrier.reset()
896 barrier2.wait()
897 self.barrier.wait()
898 results3.append(True)
899
900 self.run_threads(f)
901 self.assertEqual(len(results1), 0)
902 self.assertEqual(len(results2), self.N-1)
903 self.assertEqual(len(results3), self.N)
904
905 def test_timeout(self):
906 """
907 Test wait(timeout)
908 """
909 def f():
910 i = self.barrier.wait()
911 if i == self.N // 2:
912 # One thread is late!
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000913 time.sleep(1.0)
914 # Default timeout is 2.0, so this is shorter.
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000915 self.assertRaises(threading.BrokenBarrierError,
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000916 self.barrier.wait, 0.5)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000917 self.run_threads(f)
918
919 def test_default_timeout(self):
920 """
921 Test the barrier's default timeout
922 """
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200923 # create a barrier with a low default timeout
924 barrier = self.barriertype(self.N, timeout=0.3)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000925 def f():
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000926 i = barrier.wait()
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000927 if i == self.N // 2:
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200928 # One thread is later than the default timeout of 0.3s.
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000929 time.sleep(1.0)
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000930 self.assertRaises(threading.BrokenBarrierError, barrier.wait)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000931 self.run_threads(f)
932
933 def test_single_thread(self):
934 b = self.barriertype(1)
935 b.wait()
936 b.wait()