blob: 42a7d8216352a7351eb7df21a6a474b87f5b1999 [file] [log] [blame]
Antoine Pitrou557934f2009-11-06 22:41:14 +00001"""
2Various tests for synchronization primitives.
3"""
4
5import sys
6import time
Victor Stinner2a129742011-05-30 23:02:52 +02007from _thread import start_new_thread, TIMEOUT_MAX
Antoine Pitrou557934f2009-11-06 22:41:14 +00008import threading
9import unittest
10
11from test import support
12
13
14def _wait():
15 # A crude wait/yield function not relying on synchronization primitives.
16 time.sleep(0.01)
17
18class Bunch(object):
19 """
20 A bunch of threads.
21 """
22 def __init__(self, f, n, wait_before_exit=False):
23 """
24 Construct a bunch of `n` threads running the same function `f`.
25 If `wait_before_exit` is True, the threads won't terminate until
26 do_finish() is called.
27 """
28 self.f = f
29 self.n = n
30 self.started = []
31 self.finished = []
32 self._can_exit = not wait_before_exit
33 def task():
Victor Stinner2a129742011-05-30 23:02:52 +020034 tid = threading.get_ident()
Antoine Pitrou557934f2009-11-06 22:41:14 +000035 self.started.append(tid)
36 try:
37 f()
38 finally:
39 self.finished.append(tid)
40 while not self._can_exit:
41 _wait()
Serhiy Storchaka9db55002015-03-28 20:38:37 +020042 try:
43 for i in range(n):
44 start_new_thread(task, ())
45 except:
46 self._can_exit = True
47 raise
Antoine Pitrou557934f2009-11-06 22:41:14 +000048
49 def wait_for_started(self):
50 while len(self.started) < self.n:
51 _wait()
52
53 def wait_for_finished(self):
54 while len(self.finished) < self.n:
55 _wait()
56
57 def do_finish(self):
58 self._can_exit = True
59
60
61class BaseTestCase(unittest.TestCase):
62 def setUp(self):
63 self._threads = support.threading_setup()
64
65 def tearDown(self):
66 support.threading_cleanup(*self._threads)
67 support.reap_children()
68
Antoine Pitrou7c3e5772010-04-14 15:44:10 +000069 def assertTimeout(self, actual, expected):
70 # The waiting and/or time.time() can be imprecise, which
71 # is why comparing to the expected value would sometimes fail
72 # (especially under Windows).
73 self.assertGreaterEqual(actual, expected * 0.6)
74 # Test nothing insane happened
75 self.assertLess(actual, expected * 10.0)
76
Antoine Pitrou557934f2009-11-06 22:41:14 +000077
78class BaseLockTests(BaseTestCase):
79 """
80 Tests for both recursive and non-recursive locks.
81 """
82
83 def test_constructor(self):
84 lock = self.locktype()
85 del lock
86
Christian Heimesc5d95b12013-07-30 15:54:39 +020087 def test_repr(self):
88 lock = self.locktype()
89 repr(lock)
90 del lock
91
Antoine Pitrou557934f2009-11-06 22:41:14 +000092 def test_acquire_destroy(self):
93 lock = self.locktype()
94 lock.acquire()
95 del lock
96
97 def test_acquire_release(self):
98 lock = self.locktype()
99 lock.acquire()
100 lock.release()
101 del lock
102
103 def test_try_acquire(self):
104 lock = self.locktype()
105 self.assertTrue(lock.acquire(False))
106 lock.release()
107
108 def test_try_acquire_contended(self):
109 lock = self.locktype()
110 lock.acquire()
111 result = []
112 def f():
113 result.append(lock.acquire(False))
114 Bunch(f, 1).wait_for_finished()
115 self.assertFalse(result[0])
116 lock.release()
117
118 def test_acquire_contended(self):
119 lock = self.locktype()
120 lock.acquire()
121 N = 5
122 def f():
123 lock.acquire()
124 lock.release()
125
126 b = Bunch(f, N)
127 b.wait_for_started()
128 _wait()
129 self.assertEqual(len(b.finished), 0)
130 lock.release()
131 b.wait_for_finished()
132 self.assertEqual(len(b.finished), N)
133
134 def test_with(self):
135 lock = self.locktype()
136 def f():
137 lock.acquire()
138 lock.release()
139 def _with(err=None):
140 with lock:
141 if err is not None:
142 raise err
143 _with()
144 # Check the lock is unacquired
145 Bunch(f, 1).wait_for_finished()
146 self.assertRaises(TypeError, _with, TypeError)
147 # Check the lock is unacquired
148 Bunch(f, 1).wait_for_finished()
149
Antoine Pitroub0872682009-11-09 16:08:16 +0000150 def test_thread_leak(self):
151 # The lock shouldn't leak a Thread instance when used from a foreign
152 # (non-threading) thread.
153 lock = self.locktype()
154 def f():
155 lock.acquire()
156 lock.release()
157 n = len(threading.enumerate())
158 # We run many threads in the hope that existing threads ids won't
159 # be recycled.
160 Bunch(f, 15).wait_for_finished()
Antoine Pitrou45fdb452011-04-04 21:59:09 +0200161 if len(threading.enumerate()) != n:
162 # There is a small window during which a Thread instance's
163 # target function has finished running, but the Thread is still
164 # alive and registered. Avoid spurious failures by waiting a
165 # bit more (seen on a buildbot).
166 time.sleep(0.4)
167 self.assertEqual(n, len(threading.enumerate()))
Antoine Pitroub0872682009-11-09 16:08:16 +0000168
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000169 def test_timeout(self):
170 lock = self.locktype()
171 # Can't set timeout if not blocking
172 self.assertRaises(ValueError, lock.acquire, 0, 1)
173 # Invalid timeout values
174 self.assertRaises(ValueError, lock.acquire, timeout=-100)
175 self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
176 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
177 # TIMEOUT_MAX is ok
178 lock.acquire(timeout=TIMEOUT_MAX)
179 lock.release()
180 t1 = time.time()
181 self.assertTrue(lock.acquire(timeout=5))
182 t2 = time.time()
183 # Just a sanity test that it didn't actually wait for the timeout.
184 self.assertLess(t2 - t1, 5)
185 results = []
186 def f():
187 t1 = time.time()
188 results.append(lock.acquire(timeout=0.5))
189 t2 = time.time()
190 results.append(t2 - t1)
191 Bunch(f, 1).wait_for_finished()
192 self.assertFalse(results[0])
193 self.assertTimeout(results[1], 0.5)
194
Antoine Pitrou557934f2009-11-06 22:41:14 +0000195
196class LockTests(BaseLockTests):
197 """
198 Tests for non-recursive, weak locks
199 (which can be acquired and released from different threads).
200 """
201 def test_reacquire(self):
202 # Lock needs to be released before re-acquiring.
203 lock = self.locktype()
204 phase = []
205 def f():
206 lock.acquire()
207 phase.append(None)
208 lock.acquire()
209 phase.append(None)
210 start_new_thread(f, ())
211 while len(phase) == 0:
212 _wait()
213 _wait()
214 self.assertEqual(len(phase), 1)
215 lock.release()
216 while len(phase) == 1:
217 _wait()
218 self.assertEqual(len(phase), 2)
219
220 def test_different_thread(self):
221 # Lock can be released from a different thread.
222 lock = self.locktype()
223 lock.acquire()
224 def f():
225 lock.release()
226 b = Bunch(f, 1)
227 b.wait_for_finished()
228 lock.acquire()
229 lock.release()
230
Antoine Pitrou7899acf2011-03-31 01:00:32 +0200231 def test_state_after_timeout(self):
232 # Issue #11618: check that lock is in a proper state after a
233 # (non-zero) timeout.
234 lock = self.locktype()
235 lock.acquire()
236 self.assertFalse(lock.acquire(timeout=0.01))
237 lock.release()
238 self.assertFalse(lock.locked())
239 self.assertTrue(lock.acquire(blocking=False))
240
Antoine Pitrou557934f2009-11-06 22:41:14 +0000241
242class RLockTests(BaseLockTests):
243 """
244 Tests for recursive locks.
245 """
246 def test_reacquire(self):
247 lock = self.locktype()
248 lock.acquire()
249 lock.acquire()
250 lock.release()
251 lock.acquire()
252 lock.release()
253 lock.release()
254
255 def test_release_unacquired(self):
256 # Cannot release an unacquired lock
257 lock = self.locktype()
258 self.assertRaises(RuntimeError, lock.release)
259 lock.acquire()
260 lock.acquire()
261 lock.release()
262 lock.acquire()
263 lock.release()
264 lock.release()
265 self.assertRaises(RuntimeError, lock.release)
Antoine Pitrouea3eb882012-05-17 18:55:59 +0200266
267 def test_release_save_unacquired(self):
268 # Cannot _release_save an unacquired lock
269 lock = self.locktype()
270 self.assertRaises(RuntimeError, lock._release_save)
271 lock.acquire()
272 lock.acquire()
273 lock.release()
274 lock.acquire()
275 lock.release()
276 lock.release()
Victor Stinnerc2824d42011-04-24 23:41:33 +0200277 self.assertRaises(RuntimeError, lock._release_save)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000278
279 def test_different_thread(self):
280 # Cannot release from a different thread
281 lock = self.locktype()
282 def f():
283 lock.acquire()
284 b = Bunch(f, 1, True)
285 try:
286 self.assertRaises(RuntimeError, lock.release)
287 finally:
288 b.do_finish()
289
290 def test__is_owned(self):
291 lock = self.locktype()
292 self.assertFalse(lock._is_owned())
293 lock.acquire()
294 self.assertTrue(lock._is_owned())
295 lock.acquire()
296 self.assertTrue(lock._is_owned())
297 result = []
298 def f():
299 result.append(lock._is_owned())
300 Bunch(f, 1).wait_for_finished()
301 self.assertFalse(result[0])
302 lock.release()
303 self.assertTrue(lock._is_owned())
304 lock.release()
305 self.assertFalse(lock._is_owned())
306
307
308class EventTests(BaseTestCase):
309 """
310 Tests for Event objects.
311 """
312
313 def test_is_set(self):
314 evt = self.eventtype()
315 self.assertFalse(evt.is_set())
316 evt.set()
317 self.assertTrue(evt.is_set())
318 evt.set()
319 self.assertTrue(evt.is_set())
320 evt.clear()
321 self.assertFalse(evt.is_set())
322 evt.clear()
323 self.assertFalse(evt.is_set())
324
325 def _check_notify(self, evt):
326 # All threads get notified
327 N = 5
328 results1 = []
329 results2 = []
330 def f():
331 results1.append(evt.wait())
332 results2.append(evt.wait())
333 b = Bunch(f, N)
334 b.wait_for_started()
335 _wait()
336 self.assertEqual(len(results1), 0)
337 evt.set()
338 b.wait_for_finished()
339 self.assertEqual(results1, [True] * N)
340 self.assertEqual(results2, [True] * N)
341
342 def test_notify(self):
343 evt = self.eventtype()
344 self._check_notify(evt)
345 # Another time, after an explicit clear()
346 evt.set()
347 evt.clear()
348 self._check_notify(evt)
349
350 def test_timeout(self):
351 evt = self.eventtype()
352 results1 = []
353 results2 = []
354 N = 5
355 def f():
356 results1.append(evt.wait(0.0))
357 t1 = time.time()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000358 r = evt.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000359 t2 = time.time()
360 results2.append((r, t2 - t1))
361 Bunch(f, N).wait_for_finished()
362 self.assertEqual(results1, [False] * N)
363 for r, dt in results2:
364 self.assertFalse(r)
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000365 self.assertTimeout(dt, 0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000366 # The event is set
367 results1 = []
368 results2 = []
369 evt.set()
370 Bunch(f, N).wait_for_finished()
371 self.assertEqual(results1, [True] * N)
372 for r, dt in results2:
373 self.assertTrue(r)
374
Charles-François Natalided03482012-01-07 18:24:56 +0100375 def test_set_and_clear(self):
376 # Issue #13502: check that wait() returns true even when the event is
377 # cleared before the waiting thread is woken up.
378 evt = self.eventtype()
379 results = []
380 N = 5
381 def f():
382 results.append(evt.wait(1))
383 b = Bunch(f, N)
384 b.wait_for_started()
385 time.sleep(0.5)
386 evt.set()
387 evt.clear()
388 b.wait_for_finished()
389 self.assertEqual(results, [True] * N)
390
Antoine Pitrou557934f2009-11-06 22:41:14 +0000391
392class ConditionTests(BaseTestCase):
393 """
394 Tests for condition variables.
395 """
396
397 def test_acquire(self):
398 cond = self.condtype()
399 # Be default we have an RLock: the condition can be acquired multiple
400 # times.
401 cond.acquire()
402 cond.acquire()
403 cond.release()
404 cond.release()
405 lock = threading.Lock()
406 cond = self.condtype(lock)
407 cond.acquire()
408 self.assertFalse(lock.acquire(False))
409 cond.release()
410 self.assertTrue(lock.acquire(False))
411 self.assertFalse(cond.acquire(False))
412 lock.release()
413 with cond:
414 self.assertFalse(lock.acquire(False))
415
416 def test_unacquired_wait(self):
417 cond = self.condtype()
418 self.assertRaises(RuntimeError, cond.wait)
419
420 def test_unacquired_notify(self):
421 cond = self.condtype()
422 self.assertRaises(RuntimeError, cond.notify)
423
424 def _check_notify(self, cond):
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000425 # Note that this test is sensitive to timing. If the worker threads
426 # don't execute in a timely fashion, the main thread may think they
427 # are further along then they are. The main thread therefore issues
428 # _wait() statements to try to make sure that it doesn't race ahead
429 # of the workers.
430 # Secondly, this test assumes that condition variables are not subject
431 # to spurious wakeups. The absence of spurious wakeups is an implementation
432 # detail of Condition Cariables in current CPython, but in general, not
433 # a guaranteed property of condition variables as a programming
434 # construct. In particular, it is possible that this can no longer
435 # be conveniently guaranteed should their implementation ever change.
Antoine Pitrou557934f2009-11-06 22:41:14 +0000436 N = 5
437 results1 = []
438 results2 = []
439 phase_num = 0
440 def f():
441 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000442 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000443 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000444 results1.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000445 cond.acquire()
Georg Brandlb9a43912010-10-28 09:03:20 +0000446 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000447 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000448 results2.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000449 b = Bunch(f, N)
450 b.wait_for_started()
451 _wait()
452 self.assertEqual(results1, [])
453 # Notify 3 threads at first
454 cond.acquire()
455 cond.notify(3)
456 _wait()
457 phase_num = 1
458 cond.release()
459 while len(results1) < 3:
460 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000461 self.assertEqual(results1, [(True, 1)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000462 self.assertEqual(results2, [])
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000463 # first wait, to ensure all workers settle into cond.wait() before
464 # we continue. See issue #8799
465 _wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000466 # Notify 5 threads: they might be in their first or second wait
467 cond.acquire()
468 cond.notify(5)
469 _wait()
470 phase_num = 2
471 cond.release()
472 while len(results1) + len(results2) < 8:
473 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000474 self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
475 self.assertEqual(results2, [(True, 2)] * 3)
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000476 _wait() # make sure all workers settle into cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000477 # Notify all threads: they are all in their second wait
478 cond.acquire()
479 cond.notify_all()
480 _wait()
481 phase_num = 3
482 cond.release()
483 while len(results2) < 5:
484 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000485 self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
486 self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000487 b.wait_for_finished()
488
489 def test_notify(self):
490 cond = self.condtype()
491 self._check_notify(cond)
492 # A second time, to check internal state is still ok.
493 self._check_notify(cond)
494
495 def test_timeout(self):
496 cond = self.condtype()
497 results = []
498 N = 5
499 def f():
500 cond.acquire()
501 t1 = time.time()
Georg Brandlb9a43912010-10-28 09:03:20 +0000502 result = cond.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000503 t2 = time.time()
504 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000505 results.append((t2 - t1, result))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000506 Bunch(f, N).wait_for_finished()
Georg Brandlb9a43912010-10-28 09:03:20 +0000507 self.assertEqual(len(results), N)
508 for dt, result in results:
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000509 self.assertTimeout(dt, 0.5)
Georg Brandlb9a43912010-10-28 09:03:20 +0000510 # Note that conceptually (that"s the condition variable protocol)
511 # a wait() may succeed even if no one notifies us and before any
512 # timeout occurs. Spurious wakeups can occur.
513 # This makes it hard to verify the result value.
514 # In practice, this implementation has no spurious wakeups.
515 self.assertFalse(result)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000516
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000517 def test_waitfor(self):
518 cond = self.condtype()
519 state = 0
520 def f():
521 with cond:
522 result = cond.wait_for(lambda : state==4)
523 self.assertTrue(result)
524 self.assertEqual(state, 4)
525 b = Bunch(f, 1)
526 b.wait_for_started()
Victor Stinner3349bca2011-05-18 00:16:14 +0200527 for i in range(4):
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000528 time.sleep(0.01)
529 with cond:
530 state += 1
531 cond.notify()
532 b.wait_for_finished()
533
534 def test_waitfor_timeout(self):
535 cond = self.condtype()
536 state = 0
537 success = []
538 def f():
539 with cond:
540 dt = time.time()
541 result = cond.wait_for(lambda : state==4, timeout=0.1)
542 dt = time.time() - dt
543 self.assertFalse(result)
544 self.assertTimeout(dt, 0.1)
545 success.append(None)
546 b = Bunch(f, 1)
547 b.wait_for_started()
548 # Only increment 3 times, so state == 4 is never reached.
549 for i in range(3):
550 time.sleep(0.01)
551 with cond:
552 state += 1
553 cond.notify()
554 b.wait_for_finished()
555 self.assertEqual(len(success), 1)
556
Antoine Pitrou557934f2009-11-06 22:41:14 +0000557
558class BaseSemaphoreTests(BaseTestCase):
559 """
560 Common tests for {bounded, unbounded} semaphore objects.
561 """
562
563 def test_constructor(self):
564 self.assertRaises(ValueError, self.semtype, value = -1)
565 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
566
567 def test_acquire(self):
568 sem = self.semtype(1)
569 sem.acquire()
570 sem.release()
571 sem = self.semtype(2)
572 sem.acquire()
573 sem.acquire()
574 sem.release()
575 sem.release()
576
577 def test_acquire_destroy(self):
578 sem = self.semtype()
579 sem.acquire()
580 del sem
581
582 def test_acquire_contended(self):
583 sem = self.semtype(7)
584 sem.acquire()
585 N = 10
586 results1 = []
587 results2 = []
588 phase_num = 0
589 def f():
590 sem.acquire()
591 results1.append(phase_num)
592 sem.acquire()
593 results2.append(phase_num)
594 b = Bunch(f, 10)
595 b.wait_for_started()
596 while len(results1) + len(results2) < 6:
597 _wait()
598 self.assertEqual(results1 + results2, [0] * 6)
599 phase_num = 1
600 for i in range(7):
601 sem.release()
602 while len(results1) + len(results2) < 13:
603 _wait()
604 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
605 phase_num = 2
606 for i in range(6):
607 sem.release()
608 while len(results1) + len(results2) < 19:
609 _wait()
610 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
611 # The semaphore is still locked
612 self.assertFalse(sem.acquire(False))
613 # Final release, to let the last thread finish
614 sem.release()
615 b.wait_for_finished()
616
617 def test_try_acquire(self):
618 sem = self.semtype(2)
619 self.assertTrue(sem.acquire(False))
620 self.assertTrue(sem.acquire(False))
621 self.assertFalse(sem.acquire(False))
622 sem.release()
623 self.assertTrue(sem.acquire(False))
624
625 def test_try_acquire_contended(self):
626 sem = self.semtype(4)
627 sem.acquire()
628 results = []
629 def f():
630 results.append(sem.acquire(False))
631 results.append(sem.acquire(False))
632 Bunch(f, 5).wait_for_finished()
633 # There can be a thread switch between acquiring the semaphore and
634 # appending the result, therefore results will not necessarily be
635 # ordered.
636 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
637
Antoine Pitrou0454af92010-04-17 23:51:58 +0000638 def test_acquire_timeout(self):
639 sem = self.semtype(2)
640 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
641 self.assertTrue(sem.acquire(timeout=0.005))
642 self.assertTrue(sem.acquire(timeout=0.005))
643 self.assertFalse(sem.acquire(timeout=0.005))
644 sem.release()
645 self.assertTrue(sem.acquire(timeout=0.005))
646 t = time.time()
647 self.assertFalse(sem.acquire(timeout=0.5))
648 dt = time.time() - t
649 self.assertTimeout(dt, 0.5)
650
Antoine Pitrou557934f2009-11-06 22:41:14 +0000651 def test_default_value(self):
652 # The default initial value is 1.
653 sem = self.semtype()
654 sem.acquire()
655 def f():
656 sem.acquire()
657 sem.release()
658 b = Bunch(f, 1)
659 b.wait_for_started()
660 _wait()
661 self.assertFalse(b.finished)
662 sem.release()
663 b.wait_for_finished()
664
665 def test_with(self):
666 sem = self.semtype(2)
667 def _with(err=None):
668 with sem:
669 self.assertTrue(sem.acquire(False))
670 sem.release()
671 with sem:
672 self.assertFalse(sem.acquire(False))
673 if err:
674 raise err
675 _with()
676 self.assertTrue(sem.acquire(False))
677 sem.release()
678 self.assertRaises(TypeError, _with, TypeError)
679 self.assertTrue(sem.acquire(False))
680 sem.release()
681
682class SemaphoreTests(BaseSemaphoreTests):
683 """
684 Tests for unbounded semaphores.
685 """
686
687 def test_release_unacquired(self):
688 # Unbounded releases are allowed and increment the semaphore's value
689 sem = self.semtype(1)
690 sem.release()
691 sem.acquire()
692 sem.acquire()
693 sem.release()
694
695
696class BoundedSemaphoreTests(BaseSemaphoreTests):
697 """
698 Tests for bounded semaphores.
699 """
700
701 def test_release_unacquired(self):
702 # Cannot go past the initial value
703 sem = self.semtype()
704 self.assertRaises(ValueError, sem.release)
705 sem.acquire()
706 sem.release()
707 self.assertRaises(ValueError, sem.release)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000708
709
710class BarrierTests(BaseTestCase):
711 """
712 Tests for Barrier objects.
713 """
714 N = 5
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000715 defaultTimeout = 2.0
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000716
717 def setUp(self):
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000718 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000719 def tearDown(self):
720 self.barrier.abort()
721
722 def run_threads(self, f):
723 b = Bunch(f, self.N-1)
724 f()
725 b.wait_for_finished()
726
727 def multipass(self, results, n):
728 m = self.barrier.parties
729 self.assertEqual(m, self.N)
730 for i in range(n):
731 results[0].append(True)
732 self.assertEqual(len(results[1]), i * m)
733 self.barrier.wait()
734 results[1].append(True)
735 self.assertEqual(len(results[0]), (i + 1) * m)
736 self.barrier.wait()
737 self.assertEqual(self.barrier.n_waiting, 0)
738 self.assertFalse(self.barrier.broken)
739
740 def test_barrier(self, passes=1):
741 """
742 Test that a barrier is passed in lockstep
743 """
744 results = [[],[]]
745 def f():
746 self.multipass(results, passes)
747 self.run_threads(f)
748
749 def test_barrier_10(self):
750 """
751 Test that a barrier works for 10 consecutive runs
752 """
753 return self.test_barrier(10)
754
755 def test_wait_return(self):
756 """
757 test the return value from barrier.wait
758 """
759 results = []
760 def f():
761 r = self.barrier.wait()
762 results.append(r)
763
764 self.run_threads(f)
765 self.assertEqual(sum(results), sum(range(self.N)))
766
767 def test_action(self):
768 """
769 Test the 'action' callback
770 """
771 results = []
772 def action():
773 results.append(True)
774 barrier = self.barriertype(self.N, action)
775 def f():
776 barrier.wait()
777 self.assertEqual(len(results), 1)
778
779 self.run_threads(f)
780
781 def test_abort(self):
782 """
783 Test that an abort will put the barrier in a broken state
784 """
785 results1 = []
786 results2 = []
787 def f():
788 try:
789 i = self.barrier.wait()
790 if i == self.N//2:
791 raise RuntimeError
792 self.barrier.wait()
793 results1.append(True)
794 except threading.BrokenBarrierError:
795 results2.append(True)
796 except RuntimeError:
797 self.barrier.abort()
798 pass
799
800 self.run_threads(f)
801 self.assertEqual(len(results1), 0)
802 self.assertEqual(len(results2), self.N-1)
803 self.assertTrue(self.barrier.broken)
804
805 def test_reset(self):
806 """
807 Test that a 'reset' on a barrier frees the waiting threads
808 """
809 results1 = []
810 results2 = []
811 results3 = []
812 def f():
813 i = self.barrier.wait()
814 if i == self.N//2:
815 # Wait until the other threads are all in the barrier.
816 while self.barrier.n_waiting < self.N-1:
817 time.sleep(0.001)
818 self.barrier.reset()
819 else:
820 try:
821 self.barrier.wait()
822 results1.append(True)
823 except threading.BrokenBarrierError:
824 results2.append(True)
825 # Now, pass the barrier again
826 self.barrier.wait()
827 results3.append(True)
828
829 self.run_threads(f)
830 self.assertEqual(len(results1), 0)
831 self.assertEqual(len(results2), self.N-1)
832 self.assertEqual(len(results3), self.N)
833
834
835 def test_abort_and_reset(self):
836 """
837 Test that a barrier can be reset after being broken.
838 """
839 results1 = []
840 results2 = []
841 results3 = []
842 barrier2 = self.barriertype(self.N)
843 def f():
844 try:
845 i = self.barrier.wait()
846 if i == self.N//2:
847 raise RuntimeError
848 self.barrier.wait()
849 results1.append(True)
850 except threading.BrokenBarrierError:
851 results2.append(True)
852 except RuntimeError:
853 self.barrier.abort()
854 pass
855 # Synchronize and reset the barrier. Must synchronize first so
856 # that everyone has left it when we reset, and after so that no
857 # one enters it before the reset.
858 if barrier2.wait() == self.N//2:
859 self.barrier.reset()
860 barrier2.wait()
861 self.barrier.wait()
862 results3.append(True)
863
864 self.run_threads(f)
865 self.assertEqual(len(results1), 0)
866 self.assertEqual(len(results2), self.N-1)
867 self.assertEqual(len(results3), self.N)
868
869 def test_timeout(self):
870 """
871 Test wait(timeout)
872 """
873 def f():
874 i = self.barrier.wait()
875 if i == self.N // 2:
876 # One thread is late!
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000877 time.sleep(1.0)
878 # Default timeout is 2.0, so this is shorter.
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000879 self.assertRaises(threading.BrokenBarrierError,
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000880 self.barrier.wait, 0.5)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000881 self.run_threads(f)
882
883 def test_default_timeout(self):
884 """
885 Test the barrier's default timeout
886 """
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200887 # create a barrier with a low default timeout
888 barrier = self.barriertype(self.N, timeout=0.3)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000889 def f():
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000890 i = barrier.wait()
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000891 if i == self.N // 2:
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200892 # One thread is later than the default timeout of 0.3s.
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000893 time.sleep(1.0)
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000894 self.assertRaises(threading.BrokenBarrierError, barrier.wait)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000895 self.run_threads(f)
896
897 def test_single_thread(self):
898 b = self.barriertype(1)
899 b.wait()
900 b.wait()