blob: 462ecefb3639dee7e9a8d785b7d8e9bf8d8a18e8 [file] [log] [blame]
Antoine Pitrou557934f2009-11-06 22:41:14 +00001"""
2Various tests for synchronization primitives.
3"""
4
5import sys
6import time
Victor Stinner2a129742011-05-30 23:02:52 +02007from _thread import start_new_thread, TIMEOUT_MAX
Antoine Pitrou557934f2009-11-06 22:41:14 +00008import threading
9import unittest
10
11from test import support
12
13
14def _wait():
15 # A crude wait/yield function not relying on synchronization primitives.
16 time.sleep(0.01)
17
18class Bunch(object):
19 """
20 A bunch of threads.
21 """
22 def __init__(self, f, n, wait_before_exit=False):
23 """
24 Construct a bunch of `n` threads running the same function `f`.
25 If `wait_before_exit` is True, the threads won't terminate until
26 do_finish() is called.
27 """
28 self.f = f
29 self.n = n
30 self.started = []
31 self.finished = []
32 self._can_exit = not wait_before_exit
33 def task():
Victor Stinner2a129742011-05-30 23:02:52 +020034 tid = threading.get_ident()
Antoine Pitrou557934f2009-11-06 22:41:14 +000035 self.started.append(tid)
36 try:
37 f()
38 finally:
39 self.finished.append(tid)
40 while not self._can_exit:
41 _wait()
Serhiy Storchaka9db55002015-03-28 20:38:37 +020042 try:
43 for i in range(n):
44 start_new_thread(task, ())
45 except:
46 self._can_exit = True
47 raise
Antoine Pitrou557934f2009-11-06 22:41:14 +000048
49 def wait_for_started(self):
50 while len(self.started) < self.n:
51 _wait()
52
53 def wait_for_finished(self):
54 while len(self.finished) < self.n:
55 _wait()
56
57 def do_finish(self):
58 self._can_exit = True
59
60
61class BaseTestCase(unittest.TestCase):
62 def setUp(self):
63 self._threads = support.threading_setup()
64
65 def tearDown(self):
66 support.threading_cleanup(*self._threads)
67 support.reap_children()
68
Antoine Pitrou7c3e5772010-04-14 15:44:10 +000069 def assertTimeout(self, actual, expected):
70 # The waiting and/or time.time() can be imprecise, which
71 # is why comparing to the expected value would sometimes fail
72 # (especially under Windows).
73 self.assertGreaterEqual(actual, expected * 0.6)
74 # Test nothing insane happened
75 self.assertLess(actual, expected * 10.0)
76
Antoine Pitrou557934f2009-11-06 22:41:14 +000077
78class BaseLockTests(BaseTestCase):
79 """
80 Tests for both recursive and non-recursive locks.
81 """
82
83 def test_constructor(self):
84 lock = self.locktype()
85 del lock
86
Christian Heimesc5d95b12013-07-30 15:54:39 +020087 def test_repr(self):
88 lock = self.locktype()
89 repr(lock)
90 del lock
91
Antoine Pitrou557934f2009-11-06 22:41:14 +000092 def test_acquire_destroy(self):
93 lock = self.locktype()
94 lock.acquire()
95 del lock
96
97 def test_acquire_release(self):
98 lock = self.locktype()
99 lock.acquire()
100 lock.release()
101 del lock
102
103 def test_try_acquire(self):
104 lock = self.locktype()
105 self.assertTrue(lock.acquire(False))
106 lock.release()
107
108 def test_try_acquire_contended(self):
109 lock = self.locktype()
110 lock.acquire()
111 result = []
112 def f():
113 result.append(lock.acquire(False))
114 Bunch(f, 1).wait_for_finished()
115 self.assertFalse(result[0])
116 lock.release()
117
118 def test_acquire_contended(self):
119 lock = self.locktype()
120 lock.acquire()
121 N = 5
122 def f():
123 lock.acquire()
124 lock.release()
125
126 b = Bunch(f, N)
127 b.wait_for_started()
128 _wait()
129 self.assertEqual(len(b.finished), 0)
130 lock.release()
131 b.wait_for_finished()
132 self.assertEqual(len(b.finished), N)
133
134 def test_with(self):
135 lock = self.locktype()
136 def f():
137 lock.acquire()
138 lock.release()
139 def _with(err=None):
140 with lock:
141 if err is not None:
142 raise err
143 _with()
144 # Check the lock is unacquired
145 Bunch(f, 1).wait_for_finished()
146 self.assertRaises(TypeError, _with, TypeError)
147 # Check the lock is unacquired
148 Bunch(f, 1).wait_for_finished()
149
Antoine Pitroub0872682009-11-09 16:08:16 +0000150 def test_thread_leak(self):
151 # The lock shouldn't leak a Thread instance when used from a foreign
152 # (non-threading) thread.
153 lock = self.locktype()
154 def f():
155 lock.acquire()
156 lock.release()
157 n = len(threading.enumerate())
158 # We run many threads in the hope that existing threads ids won't
159 # be recycled.
160 Bunch(f, 15).wait_for_finished()
Antoine Pitrou45fdb452011-04-04 21:59:09 +0200161 if len(threading.enumerate()) != n:
162 # There is a small window during which a Thread instance's
163 # target function has finished running, but the Thread is still
164 # alive and registered. Avoid spurious failures by waiting a
165 # bit more (seen on a buildbot).
166 time.sleep(0.4)
167 self.assertEqual(n, len(threading.enumerate()))
Antoine Pitroub0872682009-11-09 16:08:16 +0000168
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000169 def test_timeout(self):
170 lock = self.locktype()
171 # Can't set timeout if not blocking
172 self.assertRaises(ValueError, lock.acquire, 0, 1)
173 # Invalid timeout values
174 self.assertRaises(ValueError, lock.acquire, timeout=-100)
175 self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
176 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
177 # TIMEOUT_MAX is ok
178 lock.acquire(timeout=TIMEOUT_MAX)
179 lock.release()
180 t1 = time.time()
181 self.assertTrue(lock.acquire(timeout=5))
182 t2 = time.time()
183 # Just a sanity test that it didn't actually wait for the timeout.
184 self.assertLess(t2 - t1, 5)
185 results = []
186 def f():
187 t1 = time.time()
188 results.append(lock.acquire(timeout=0.5))
189 t2 = time.time()
190 results.append(t2 - t1)
191 Bunch(f, 1).wait_for_finished()
192 self.assertFalse(results[0])
193 self.assertTimeout(results[1], 0.5)
194
Antoine Pitrou557934f2009-11-06 22:41:14 +0000195
196class LockTests(BaseLockTests):
197 """
198 Tests for non-recursive, weak locks
199 (which can be acquired and released from different threads).
200 """
201 def test_reacquire(self):
202 # Lock needs to be released before re-acquiring.
203 lock = self.locktype()
204 phase = []
205 def f():
206 lock.acquire()
207 phase.append(None)
208 lock.acquire()
209 phase.append(None)
210 start_new_thread(f, ())
211 while len(phase) == 0:
212 _wait()
213 _wait()
214 self.assertEqual(len(phase), 1)
215 lock.release()
216 while len(phase) == 1:
217 _wait()
218 self.assertEqual(len(phase), 2)
219
220 def test_different_thread(self):
221 # Lock can be released from a different thread.
222 lock = self.locktype()
223 lock.acquire()
224 def f():
225 lock.release()
226 b = Bunch(f, 1)
227 b.wait_for_finished()
228 lock.acquire()
229 lock.release()
230
Antoine Pitrou7899acf2011-03-31 01:00:32 +0200231 def test_state_after_timeout(self):
232 # Issue #11618: check that lock is in a proper state after a
233 # (non-zero) timeout.
234 lock = self.locktype()
235 lock.acquire()
236 self.assertFalse(lock.acquire(timeout=0.01))
237 lock.release()
238 self.assertFalse(lock.locked())
239 self.assertTrue(lock.acquire(blocking=False))
240
Antoine Pitrou557934f2009-11-06 22:41:14 +0000241
242class RLockTests(BaseLockTests):
243 """
244 Tests for recursive locks.
245 """
246 def test_reacquire(self):
247 lock = self.locktype()
248 lock.acquire()
249 lock.acquire()
250 lock.release()
251 lock.acquire()
252 lock.release()
253 lock.release()
254
255 def test_release_unacquired(self):
256 # Cannot release an unacquired lock
257 lock = self.locktype()
258 self.assertRaises(RuntimeError, lock.release)
259 lock.acquire()
260 lock.acquire()
261 lock.release()
262 lock.acquire()
263 lock.release()
264 lock.release()
265 self.assertRaises(RuntimeError, lock.release)
Antoine Pitrouea3eb882012-05-17 18:55:59 +0200266
267 def test_release_save_unacquired(self):
268 # Cannot _release_save an unacquired lock
269 lock = self.locktype()
270 self.assertRaises(RuntimeError, lock._release_save)
271 lock.acquire()
272 lock.acquire()
273 lock.release()
274 lock.acquire()
275 lock.release()
276 lock.release()
Victor Stinnerc2824d42011-04-24 23:41:33 +0200277 self.assertRaises(RuntimeError, lock._release_save)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000278
279 def test_different_thread(self):
280 # Cannot release from a different thread
281 lock = self.locktype()
282 def f():
283 lock.acquire()
284 b = Bunch(f, 1, True)
285 try:
286 self.assertRaises(RuntimeError, lock.release)
287 finally:
288 b.do_finish()
289
290 def test__is_owned(self):
291 lock = self.locktype()
292 self.assertFalse(lock._is_owned())
293 lock.acquire()
294 self.assertTrue(lock._is_owned())
295 lock.acquire()
296 self.assertTrue(lock._is_owned())
297 result = []
298 def f():
299 result.append(lock._is_owned())
300 Bunch(f, 1).wait_for_finished()
301 self.assertFalse(result[0])
302 lock.release()
303 self.assertTrue(lock._is_owned())
304 lock.release()
305 self.assertFalse(lock._is_owned())
306
307
308class EventTests(BaseTestCase):
309 """
310 Tests for Event objects.
311 """
312
313 def test_is_set(self):
314 evt = self.eventtype()
315 self.assertFalse(evt.is_set())
316 evt.set()
317 self.assertTrue(evt.is_set())
318 evt.set()
319 self.assertTrue(evt.is_set())
320 evt.clear()
321 self.assertFalse(evt.is_set())
322 evt.clear()
323 self.assertFalse(evt.is_set())
324
325 def _check_notify(self, evt):
326 # All threads get notified
327 N = 5
328 results1 = []
329 results2 = []
330 def f():
331 results1.append(evt.wait())
332 results2.append(evt.wait())
333 b = Bunch(f, N)
334 b.wait_for_started()
335 _wait()
336 self.assertEqual(len(results1), 0)
337 evt.set()
338 b.wait_for_finished()
339 self.assertEqual(results1, [True] * N)
340 self.assertEqual(results2, [True] * N)
341
342 def test_notify(self):
343 evt = self.eventtype()
344 self._check_notify(evt)
345 # Another time, after an explicit clear()
346 evt.set()
347 evt.clear()
348 self._check_notify(evt)
349
350 def test_timeout(self):
351 evt = self.eventtype()
352 results1 = []
353 results2 = []
354 N = 5
355 def f():
356 results1.append(evt.wait(0.0))
357 t1 = time.time()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000358 r = evt.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000359 t2 = time.time()
360 results2.append((r, t2 - t1))
361 Bunch(f, N).wait_for_finished()
362 self.assertEqual(results1, [False] * N)
363 for r, dt in results2:
364 self.assertFalse(r)
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000365 self.assertTimeout(dt, 0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000366 # The event is set
367 results1 = []
368 results2 = []
369 evt.set()
370 Bunch(f, N).wait_for_finished()
371 self.assertEqual(results1, [True] * N)
372 for r, dt in results2:
373 self.assertTrue(r)
374
Charles-François Natalided03482012-01-07 18:24:56 +0100375 def test_set_and_clear(self):
376 # Issue #13502: check that wait() returns true even when the event is
377 # cleared before the waiting thread is woken up.
378 evt = self.eventtype()
379 results = []
380 N = 5
381 def f():
382 results.append(evt.wait(1))
383 b = Bunch(f, N)
384 b.wait_for_started()
385 time.sleep(0.5)
386 evt.set()
387 evt.clear()
388 b.wait_for_finished()
389 self.assertEqual(results, [True] * N)
390
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700391 def test_reset_internal_locks(self):
392 evt = self.eventtype()
393 old_lock = evt._cond._lock
394 evt._reset_internal_locks()
395 new_lock = evt._cond._lock
396 self.assertIsNot(new_lock, old_lock)
397 self.assertIs(type(new_lock), type(old_lock))
398
Antoine Pitrou557934f2009-11-06 22:41:14 +0000399
400class ConditionTests(BaseTestCase):
401 """
402 Tests for condition variables.
403 """
404
405 def test_acquire(self):
406 cond = self.condtype()
407 # Be default we have an RLock: the condition can be acquired multiple
408 # times.
409 cond.acquire()
410 cond.acquire()
411 cond.release()
412 cond.release()
413 lock = threading.Lock()
414 cond = self.condtype(lock)
415 cond.acquire()
416 self.assertFalse(lock.acquire(False))
417 cond.release()
418 self.assertTrue(lock.acquire(False))
419 self.assertFalse(cond.acquire(False))
420 lock.release()
421 with cond:
422 self.assertFalse(lock.acquire(False))
423
424 def test_unacquired_wait(self):
425 cond = self.condtype()
426 self.assertRaises(RuntimeError, cond.wait)
427
428 def test_unacquired_notify(self):
429 cond = self.condtype()
430 self.assertRaises(RuntimeError, cond.notify)
431
432 def _check_notify(self, cond):
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000433 # Note that this test is sensitive to timing. If the worker threads
434 # don't execute in a timely fashion, the main thread may think they
435 # are further along then they are. The main thread therefore issues
436 # _wait() statements to try to make sure that it doesn't race ahead
437 # of the workers.
438 # Secondly, this test assumes that condition variables are not subject
439 # to spurious wakeups. The absence of spurious wakeups is an implementation
440 # detail of Condition Cariables in current CPython, but in general, not
441 # a guaranteed property of condition variables as a programming
442 # construct. In particular, it is possible that this can no longer
443 # be conveniently guaranteed should their implementation ever change.
Antoine Pitrou557934f2009-11-06 22:41:14 +0000444 N = 5
445 results1 = []
446 results2 = []
447 phase_num = 0
448 def f():
449 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000450 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000451 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000452 results1.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000453 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000454 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000455 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000456 results2.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000457 b = Bunch(f, N)
458 b.wait_for_started()
459 _wait()
460 self.assertEqual(results1, [])
461 # Notify 3 threads at first
462 cond.acquire()
463 cond.notify(3)
464 _wait()
465 phase_num = 1
466 cond.release()
467 while len(results1) < 3:
468 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000469 self.assertEqual(results1, [(True, 1)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000470 self.assertEqual(results2, [])
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000471 # first wait, to ensure all workers settle into cond.wait() before
472 # we continue. See issue #8799
473 _wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000474 # Notify 5 threads: they might be in their first or second wait
475 cond.acquire()
476 cond.notify(5)
477 _wait()
478 phase_num = 2
479 cond.release()
480 while len(results1) + len(results2) < 8:
481 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000482 self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
483 self.assertEqual(results2, [(True, 2)] * 3)
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000484 _wait() # make sure all workers settle into cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000485 # Notify all threads: they are all in their second wait
486 cond.acquire()
487 cond.notify_all()
488 _wait()
489 phase_num = 3
490 cond.release()
491 while len(results2) < 5:
492 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000493 self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
494 self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000495 b.wait_for_finished()
496
497 def test_notify(self):
498 cond = self.condtype()
499 self._check_notify(cond)
500 # A second time, to check internal state is still ok.
501 self._check_notify(cond)
502
503 def test_timeout(self):
504 cond = self.condtype()
505 results = []
506 N = 5
507 def f():
508 cond.acquire()
509 t1 = time.time()
Georg Brandlb9a43912010-10-28 09:03:20 +0000510 result = cond.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000511 t2 = time.time()
512 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000513 results.append((t2 - t1, result))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000514 Bunch(f, N).wait_for_finished()
Georg Brandlb9a43912010-10-28 09:03:20 +0000515 self.assertEqual(len(results), N)
516 for dt, result in results:
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000517 self.assertTimeout(dt, 0.5)
Georg Brandlb9a43912010-10-28 09:03:20 +0000518 # Note that conceptually (that"s the condition variable protocol)
519 # a wait() may succeed even if no one notifies us and before any
520 # timeout occurs. Spurious wakeups can occur.
521 # This makes it hard to verify the result value.
522 # In practice, this implementation has no spurious wakeups.
523 self.assertFalse(result)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000524
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000525 def test_waitfor(self):
526 cond = self.condtype()
527 state = 0
528 def f():
529 with cond:
530 result = cond.wait_for(lambda : state==4)
531 self.assertTrue(result)
532 self.assertEqual(state, 4)
533 b = Bunch(f, 1)
534 b.wait_for_started()
Victor Stinner3349bca2011-05-18 00:16:14 +0200535 for i in range(4):
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000536 time.sleep(0.01)
537 with cond:
538 state += 1
539 cond.notify()
540 b.wait_for_finished()
541
542 def test_waitfor_timeout(self):
543 cond = self.condtype()
544 state = 0
545 success = []
546 def f():
547 with cond:
548 dt = time.time()
549 result = cond.wait_for(lambda : state==4, timeout=0.1)
550 dt = time.time() - dt
551 self.assertFalse(result)
552 self.assertTimeout(dt, 0.1)
553 success.append(None)
554 b = Bunch(f, 1)
555 b.wait_for_started()
556 # Only increment 3 times, so state == 4 is never reached.
557 for i in range(3):
558 time.sleep(0.01)
559 with cond:
560 state += 1
561 cond.notify()
562 b.wait_for_finished()
563 self.assertEqual(len(success), 1)
564
Antoine Pitrou557934f2009-11-06 22:41:14 +0000565
566class BaseSemaphoreTests(BaseTestCase):
567 """
568 Common tests for {bounded, unbounded} semaphore objects.
569 """
570
571 def test_constructor(self):
572 self.assertRaises(ValueError, self.semtype, value = -1)
573 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
574
575 def test_acquire(self):
576 sem = self.semtype(1)
577 sem.acquire()
578 sem.release()
579 sem = self.semtype(2)
580 sem.acquire()
581 sem.acquire()
582 sem.release()
583 sem.release()
584
585 def test_acquire_destroy(self):
586 sem = self.semtype()
587 sem.acquire()
588 del sem
589
590 def test_acquire_contended(self):
591 sem = self.semtype(7)
592 sem.acquire()
593 N = 10
594 results1 = []
595 results2 = []
596 phase_num = 0
597 def f():
598 sem.acquire()
599 results1.append(phase_num)
600 sem.acquire()
601 results2.append(phase_num)
602 b = Bunch(f, 10)
603 b.wait_for_started()
604 while len(results1) + len(results2) < 6:
605 _wait()
606 self.assertEqual(results1 + results2, [0] * 6)
607 phase_num = 1
608 for i in range(7):
609 sem.release()
610 while len(results1) + len(results2) < 13:
611 _wait()
612 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
613 phase_num = 2
614 for i in range(6):
615 sem.release()
616 while len(results1) + len(results2) < 19:
617 _wait()
618 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
619 # The semaphore is still locked
620 self.assertFalse(sem.acquire(False))
621 # Final release, to let the last thread finish
622 sem.release()
623 b.wait_for_finished()
624
625 def test_try_acquire(self):
626 sem = self.semtype(2)
627 self.assertTrue(sem.acquire(False))
628 self.assertTrue(sem.acquire(False))
629 self.assertFalse(sem.acquire(False))
630 sem.release()
631 self.assertTrue(sem.acquire(False))
632
633 def test_try_acquire_contended(self):
634 sem = self.semtype(4)
635 sem.acquire()
636 results = []
637 def f():
638 results.append(sem.acquire(False))
639 results.append(sem.acquire(False))
640 Bunch(f, 5).wait_for_finished()
641 # There can be a thread switch between acquiring the semaphore and
642 # appending the result, therefore results will not necessarily be
643 # ordered.
644 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
645
Antoine Pitrou0454af92010-04-17 23:51:58 +0000646 def test_acquire_timeout(self):
647 sem = self.semtype(2)
648 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
649 self.assertTrue(sem.acquire(timeout=0.005))
650 self.assertTrue(sem.acquire(timeout=0.005))
651 self.assertFalse(sem.acquire(timeout=0.005))
652 sem.release()
653 self.assertTrue(sem.acquire(timeout=0.005))
654 t = time.time()
655 self.assertFalse(sem.acquire(timeout=0.5))
656 dt = time.time() - t
657 self.assertTimeout(dt, 0.5)
658
Antoine Pitrou557934f2009-11-06 22:41:14 +0000659 def test_default_value(self):
660 # The default initial value is 1.
661 sem = self.semtype()
662 sem.acquire()
663 def f():
664 sem.acquire()
665 sem.release()
666 b = Bunch(f, 1)
667 b.wait_for_started()
668 _wait()
669 self.assertFalse(b.finished)
670 sem.release()
671 b.wait_for_finished()
672
673 def test_with(self):
674 sem = self.semtype(2)
675 def _with(err=None):
676 with sem:
677 self.assertTrue(sem.acquire(False))
678 sem.release()
679 with sem:
680 self.assertFalse(sem.acquire(False))
681 if err:
682 raise err
683 _with()
684 self.assertTrue(sem.acquire(False))
685 sem.release()
686 self.assertRaises(TypeError, _with, TypeError)
687 self.assertTrue(sem.acquire(False))
688 sem.release()
689
690class SemaphoreTests(BaseSemaphoreTests):
691 """
692 Tests for unbounded semaphores.
693 """
694
695 def test_release_unacquired(self):
696 # Unbounded releases are allowed and increment the semaphore's value
697 sem = self.semtype(1)
698 sem.release()
699 sem.acquire()
700 sem.acquire()
701 sem.release()
702
703
704class BoundedSemaphoreTests(BaseSemaphoreTests):
705 """
706 Tests for bounded semaphores.
707 """
708
709 def test_release_unacquired(self):
710 # Cannot go past the initial value
711 sem = self.semtype()
712 self.assertRaises(ValueError, sem.release)
713 sem.acquire()
714 sem.release()
715 self.assertRaises(ValueError, sem.release)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000716
717
718class BarrierTests(BaseTestCase):
719 """
720 Tests for Barrier objects.
721 """
722 N = 5
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000723 defaultTimeout = 2.0
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000724
725 def setUp(self):
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000726 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000727 def tearDown(self):
728 self.barrier.abort()
729
730 def run_threads(self, f):
731 b = Bunch(f, self.N-1)
732 f()
733 b.wait_for_finished()
734
735 def multipass(self, results, n):
736 m = self.barrier.parties
737 self.assertEqual(m, self.N)
738 for i in range(n):
739 results[0].append(True)
740 self.assertEqual(len(results[1]), i * m)
741 self.barrier.wait()
742 results[1].append(True)
743 self.assertEqual(len(results[0]), (i + 1) * m)
744 self.barrier.wait()
745 self.assertEqual(self.barrier.n_waiting, 0)
746 self.assertFalse(self.barrier.broken)
747
748 def test_barrier(self, passes=1):
749 """
750 Test that a barrier is passed in lockstep
751 """
752 results = [[],[]]
753 def f():
754 self.multipass(results, passes)
755 self.run_threads(f)
756
757 def test_barrier_10(self):
758 """
759 Test that a barrier works for 10 consecutive runs
760 """
761 return self.test_barrier(10)
762
763 def test_wait_return(self):
764 """
765 test the return value from barrier.wait
766 """
767 results = []
768 def f():
769 r = self.barrier.wait()
770 results.append(r)
771
772 self.run_threads(f)
773 self.assertEqual(sum(results), sum(range(self.N)))
774
775 def test_action(self):
776 """
777 Test the 'action' callback
778 """
779 results = []
780 def action():
781 results.append(True)
782 barrier = self.barriertype(self.N, action)
783 def f():
784 barrier.wait()
785 self.assertEqual(len(results), 1)
786
787 self.run_threads(f)
788
789 def test_abort(self):
790 """
791 Test that an abort will put the barrier in a broken state
792 """
793 results1 = []
794 results2 = []
795 def f():
796 try:
797 i = self.barrier.wait()
798 if i == self.N//2:
799 raise RuntimeError
800 self.barrier.wait()
801 results1.append(True)
802 except threading.BrokenBarrierError:
803 results2.append(True)
804 except RuntimeError:
805 self.barrier.abort()
806 pass
807
808 self.run_threads(f)
809 self.assertEqual(len(results1), 0)
810 self.assertEqual(len(results2), self.N-1)
811 self.assertTrue(self.barrier.broken)
812
813 def test_reset(self):
814 """
815 Test that a 'reset' on a barrier frees the waiting threads
816 """
817 results1 = []
818 results2 = []
819 results3 = []
820 def f():
821 i = self.barrier.wait()
822 if i == self.N//2:
823 # Wait until the other threads are all in the barrier.
824 while self.barrier.n_waiting < self.N-1:
825 time.sleep(0.001)
826 self.barrier.reset()
827 else:
828 try:
829 self.barrier.wait()
830 results1.append(True)
831 except threading.BrokenBarrierError:
832 results2.append(True)
833 # Now, pass the barrier again
834 self.barrier.wait()
835 results3.append(True)
836
837 self.run_threads(f)
838 self.assertEqual(len(results1), 0)
839 self.assertEqual(len(results2), self.N-1)
840 self.assertEqual(len(results3), self.N)
841
842
843 def test_abort_and_reset(self):
844 """
845 Test that a barrier can be reset after being broken.
846 """
847 results1 = []
848 results2 = []
849 results3 = []
850 barrier2 = self.barriertype(self.N)
851 def f():
852 try:
853 i = self.barrier.wait()
854 if i == self.N//2:
855 raise RuntimeError
856 self.barrier.wait()
857 results1.append(True)
858 except threading.BrokenBarrierError:
859 results2.append(True)
860 except RuntimeError:
861 self.barrier.abort()
862 pass
863 # Synchronize and reset the barrier. Must synchronize first so
864 # that everyone has left it when we reset, and after so that no
865 # one enters it before the reset.
866 if barrier2.wait() == self.N//2:
867 self.barrier.reset()
868 barrier2.wait()
869 self.barrier.wait()
870 results3.append(True)
871
872 self.run_threads(f)
873 self.assertEqual(len(results1), 0)
874 self.assertEqual(len(results2), self.N-1)
875 self.assertEqual(len(results3), self.N)
876
877 def test_timeout(self):
878 """
879 Test wait(timeout)
880 """
881 def f():
882 i = self.barrier.wait()
883 if i == self.N // 2:
884 # One thread is late!
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000885 time.sleep(1.0)
886 # Default timeout is 2.0, so this is shorter.
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000887 self.assertRaises(threading.BrokenBarrierError,
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000888 self.barrier.wait, 0.5)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000889 self.run_threads(f)
890
891 def test_default_timeout(self):
892 """
893 Test the barrier's default timeout
894 """
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200895 # create a barrier with a low default timeout
896 barrier = self.barriertype(self.N, timeout=0.3)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000897 def f():
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000898 i = barrier.wait()
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000899 if i == self.N // 2:
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200900 # One thread is later than the default timeout of 0.3s.
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000901 time.sleep(1.0)
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000902 self.assertRaises(threading.BrokenBarrierError, barrier.wait)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000903 self.run_threads(f)
904
905 def test_single_thread(self):
906 b = self.barriertype(1)
907 b.wait()
908 b.wait()