blob: e8fa4f999a7ce8f9c64eaba2eb25124189d12545 [file] [log] [blame]
Antoine Pitrou557934f2009-11-06 22:41:14 +00001"""
2Various tests for synchronization primitives.
3"""
4
5import sys
6import time
Victor Stinner2a129742011-05-30 23:02:52 +02007from _thread import start_new_thread, TIMEOUT_MAX
Antoine Pitrou557934f2009-11-06 22:41:14 +00008import threading
9import unittest
Raymond Hettinger7836a272015-10-09 00:03:51 -040010import weakref
Antoine Pitrou557934f2009-11-06 22:41:14 +000011
12from test import support
13
14
15def _wait():
16 # A crude wait/yield function not relying on synchronization primitives.
17 time.sleep(0.01)
18
19class Bunch(object):
20 """
21 A bunch of threads.
22 """
23 def __init__(self, f, n, wait_before_exit=False):
24 """
25 Construct a bunch of `n` threads running the same function `f`.
26 If `wait_before_exit` is True, the threads won't terminate until
27 do_finish() is called.
28 """
29 self.f = f
30 self.n = n
31 self.started = []
32 self.finished = []
33 self._can_exit = not wait_before_exit
34 def task():
Victor Stinner2a129742011-05-30 23:02:52 +020035 tid = threading.get_ident()
Antoine Pitrou557934f2009-11-06 22:41:14 +000036 self.started.append(tid)
37 try:
38 f()
39 finally:
40 self.finished.append(tid)
41 while not self._can_exit:
42 _wait()
Serhiy Storchaka9db55002015-03-28 20:38:37 +020043 try:
44 for i in range(n):
45 start_new_thread(task, ())
46 except:
47 self._can_exit = True
48 raise
Antoine Pitrou557934f2009-11-06 22:41:14 +000049
50 def wait_for_started(self):
51 while len(self.started) < self.n:
52 _wait()
53
54 def wait_for_finished(self):
55 while len(self.finished) < self.n:
56 _wait()
Victor Stinner096ae332017-09-13 16:41:08 -070057 # Wait a little bit longer to prevent the "threading_cleanup()
58 # failed to cleanup X threads" warning. The loop above is a weak
59 # synchronization. At the C level, t_bootstrap() can still be
60 # running and so _thread.count() still accounts the "almost dead"
61 # thead.
62 for _ in range(self.n):
63 _wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +000064
65 def do_finish(self):
66 self._can_exit = True
67
68
69class BaseTestCase(unittest.TestCase):
70 def setUp(self):
71 self._threads = support.threading_setup()
72
73 def tearDown(self):
74 support.threading_cleanup(*self._threads)
75 support.reap_children()
76
Antoine Pitrou7c3e5772010-04-14 15:44:10 +000077 def assertTimeout(self, actual, expected):
78 # The waiting and/or time.time() can be imprecise, which
79 # is why comparing to the expected value would sometimes fail
80 # (especially under Windows).
81 self.assertGreaterEqual(actual, expected * 0.6)
82 # Test nothing insane happened
83 self.assertLess(actual, expected * 10.0)
84
Antoine Pitrou557934f2009-11-06 22:41:14 +000085
86class BaseLockTests(BaseTestCase):
87 """
88 Tests for both recursive and non-recursive locks.
89 """
90
91 def test_constructor(self):
92 lock = self.locktype()
93 del lock
94
Christian Heimesc5d95b12013-07-30 15:54:39 +020095 def test_repr(self):
96 lock = self.locktype()
Raymond Hettinger62f4dad2014-05-25 18:22:35 -070097 self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
98 del lock
99
100 def test_locked_repr(self):
101 lock = self.locktype()
102 lock.acquire()
103 self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
Christian Heimesc5d95b12013-07-30 15:54:39 +0200104 del lock
105
Antoine Pitrou557934f2009-11-06 22:41:14 +0000106 def test_acquire_destroy(self):
107 lock = self.locktype()
108 lock.acquire()
109 del lock
110
111 def test_acquire_release(self):
112 lock = self.locktype()
113 lock.acquire()
114 lock.release()
115 del lock
116
117 def test_try_acquire(self):
118 lock = self.locktype()
119 self.assertTrue(lock.acquire(False))
120 lock.release()
121
122 def test_try_acquire_contended(self):
123 lock = self.locktype()
124 lock.acquire()
125 result = []
126 def f():
127 result.append(lock.acquire(False))
128 Bunch(f, 1).wait_for_finished()
129 self.assertFalse(result[0])
130 lock.release()
131
132 def test_acquire_contended(self):
133 lock = self.locktype()
134 lock.acquire()
135 N = 5
136 def f():
137 lock.acquire()
138 lock.release()
139
140 b = Bunch(f, N)
141 b.wait_for_started()
142 _wait()
143 self.assertEqual(len(b.finished), 0)
144 lock.release()
145 b.wait_for_finished()
146 self.assertEqual(len(b.finished), N)
147
148 def test_with(self):
149 lock = self.locktype()
150 def f():
151 lock.acquire()
152 lock.release()
153 def _with(err=None):
154 with lock:
155 if err is not None:
156 raise err
157 _with()
158 # Check the lock is unacquired
159 Bunch(f, 1).wait_for_finished()
160 self.assertRaises(TypeError, _with, TypeError)
161 # Check the lock is unacquired
162 Bunch(f, 1).wait_for_finished()
163
Antoine Pitroub0872682009-11-09 16:08:16 +0000164 def test_thread_leak(self):
165 # The lock shouldn't leak a Thread instance when used from a foreign
166 # (non-threading) thread.
167 lock = self.locktype()
168 def f():
169 lock.acquire()
170 lock.release()
171 n = len(threading.enumerate())
172 # We run many threads in the hope that existing threads ids won't
173 # be recycled.
174 Bunch(f, 15).wait_for_finished()
Antoine Pitrou45fdb452011-04-04 21:59:09 +0200175 if len(threading.enumerate()) != n:
176 # There is a small window during which a Thread instance's
177 # target function has finished running, but the Thread is still
178 # alive and registered. Avoid spurious failures by waiting a
179 # bit more (seen on a buildbot).
180 time.sleep(0.4)
181 self.assertEqual(n, len(threading.enumerate()))
Antoine Pitroub0872682009-11-09 16:08:16 +0000182
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000183 def test_timeout(self):
184 lock = self.locktype()
185 # Can't set timeout if not blocking
186 self.assertRaises(ValueError, lock.acquire, 0, 1)
187 # Invalid timeout values
188 self.assertRaises(ValueError, lock.acquire, timeout=-100)
189 self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
190 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
191 # TIMEOUT_MAX is ok
192 lock.acquire(timeout=TIMEOUT_MAX)
193 lock.release()
194 t1 = time.time()
195 self.assertTrue(lock.acquire(timeout=5))
196 t2 = time.time()
197 # Just a sanity test that it didn't actually wait for the timeout.
198 self.assertLess(t2 - t1, 5)
199 results = []
200 def f():
201 t1 = time.time()
202 results.append(lock.acquire(timeout=0.5))
203 t2 = time.time()
204 results.append(t2 - t1)
205 Bunch(f, 1).wait_for_finished()
206 self.assertFalse(results[0])
207 self.assertTimeout(results[1], 0.5)
208
Raymond Hettinger7836a272015-10-09 00:03:51 -0400209 def test_weakref_exists(self):
210 lock = self.locktype()
211 ref = weakref.ref(lock)
212 self.assertIsNotNone(ref())
213
214 def test_weakref_deleted(self):
215 lock = self.locktype()
216 ref = weakref.ref(lock)
217 del lock
218 self.assertIsNone(ref())
219
Antoine Pitrou557934f2009-11-06 22:41:14 +0000220
221class LockTests(BaseLockTests):
222 """
223 Tests for non-recursive, weak locks
224 (which can be acquired and released from different threads).
225 """
226 def test_reacquire(self):
227 # Lock needs to be released before re-acquiring.
228 lock = self.locktype()
229 phase = []
230 def f():
231 lock.acquire()
232 phase.append(None)
233 lock.acquire()
234 phase.append(None)
235 start_new_thread(f, ())
236 while len(phase) == 0:
237 _wait()
238 _wait()
239 self.assertEqual(len(phase), 1)
240 lock.release()
241 while len(phase) == 1:
242 _wait()
243 self.assertEqual(len(phase), 2)
244
245 def test_different_thread(self):
246 # Lock can be released from a different thread.
247 lock = self.locktype()
248 lock.acquire()
249 def f():
250 lock.release()
251 b = Bunch(f, 1)
252 b.wait_for_finished()
253 lock.acquire()
254 lock.release()
255
Antoine Pitrou7899acf2011-03-31 01:00:32 +0200256 def test_state_after_timeout(self):
257 # Issue #11618: check that lock is in a proper state after a
258 # (non-zero) timeout.
259 lock = self.locktype()
260 lock.acquire()
261 self.assertFalse(lock.acquire(timeout=0.01))
262 lock.release()
263 self.assertFalse(lock.locked())
264 self.assertTrue(lock.acquire(blocking=False))
265
Antoine Pitrou557934f2009-11-06 22:41:14 +0000266
267class RLockTests(BaseLockTests):
268 """
269 Tests for recursive locks.
270 """
271 def test_reacquire(self):
272 lock = self.locktype()
273 lock.acquire()
274 lock.acquire()
275 lock.release()
276 lock.acquire()
277 lock.release()
278 lock.release()
279
280 def test_release_unacquired(self):
281 # Cannot release an unacquired lock
282 lock = self.locktype()
283 self.assertRaises(RuntimeError, lock.release)
284 lock.acquire()
285 lock.acquire()
286 lock.release()
287 lock.acquire()
288 lock.release()
289 lock.release()
290 self.assertRaises(RuntimeError, lock.release)
Antoine Pitrouea3eb882012-05-17 18:55:59 +0200291
292 def test_release_save_unacquired(self):
293 # Cannot _release_save an unacquired lock
294 lock = self.locktype()
295 self.assertRaises(RuntimeError, lock._release_save)
296 lock.acquire()
297 lock.acquire()
298 lock.release()
299 lock.acquire()
300 lock.release()
301 lock.release()
Victor Stinnerc2824d42011-04-24 23:41:33 +0200302 self.assertRaises(RuntimeError, lock._release_save)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000303
304 def test_different_thread(self):
305 # Cannot release from a different thread
306 lock = self.locktype()
307 def f():
308 lock.acquire()
309 b = Bunch(f, 1, True)
310 try:
311 self.assertRaises(RuntimeError, lock.release)
312 finally:
313 b.do_finish()
Victor Stinner096ae332017-09-13 16:41:08 -0700314 b.wait_for_finished()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000315
316 def test__is_owned(self):
317 lock = self.locktype()
318 self.assertFalse(lock._is_owned())
319 lock.acquire()
320 self.assertTrue(lock._is_owned())
321 lock.acquire()
322 self.assertTrue(lock._is_owned())
323 result = []
324 def f():
325 result.append(lock._is_owned())
326 Bunch(f, 1).wait_for_finished()
327 self.assertFalse(result[0])
328 lock.release()
329 self.assertTrue(lock._is_owned())
330 lock.release()
331 self.assertFalse(lock._is_owned())
332
333
334class EventTests(BaseTestCase):
335 """
336 Tests for Event objects.
337 """
338
339 def test_is_set(self):
340 evt = self.eventtype()
341 self.assertFalse(evt.is_set())
342 evt.set()
343 self.assertTrue(evt.is_set())
344 evt.set()
345 self.assertTrue(evt.is_set())
346 evt.clear()
347 self.assertFalse(evt.is_set())
348 evt.clear()
349 self.assertFalse(evt.is_set())
350
351 def _check_notify(self, evt):
352 # All threads get notified
353 N = 5
354 results1 = []
355 results2 = []
356 def f():
357 results1.append(evt.wait())
358 results2.append(evt.wait())
359 b = Bunch(f, N)
360 b.wait_for_started()
361 _wait()
362 self.assertEqual(len(results1), 0)
363 evt.set()
364 b.wait_for_finished()
365 self.assertEqual(results1, [True] * N)
366 self.assertEqual(results2, [True] * N)
367
368 def test_notify(self):
369 evt = self.eventtype()
370 self._check_notify(evt)
371 # Another time, after an explicit clear()
372 evt.set()
373 evt.clear()
374 self._check_notify(evt)
375
376 def test_timeout(self):
377 evt = self.eventtype()
378 results1 = []
379 results2 = []
380 N = 5
381 def f():
382 results1.append(evt.wait(0.0))
383 t1 = time.time()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000384 r = evt.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000385 t2 = time.time()
386 results2.append((r, t2 - t1))
387 Bunch(f, N).wait_for_finished()
388 self.assertEqual(results1, [False] * N)
389 for r, dt in results2:
390 self.assertFalse(r)
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000391 self.assertTimeout(dt, 0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000392 # The event is set
393 results1 = []
394 results2 = []
395 evt.set()
396 Bunch(f, N).wait_for_finished()
397 self.assertEqual(results1, [True] * N)
398 for r, dt in results2:
399 self.assertTrue(r)
400
Charles-François Natalided03482012-01-07 18:24:56 +0100401 def test_set_and_clear(self):
402 # Issue #13502: check that wait() returns true even when the event is
403 # cleared before the waiting thread is woken up.
404 evt = self.eventtype()
405 results = []
406 N = 5
407 def f():
408 results.append(evt.wait(1))
409 b = Bunch(f, N)
410 b.wait_for_started()
411 time.sleep(0.5)
412 evt.set()
413 evt.clear()
414 b.wait_for_finished()
415 self.assertEqual(results, [True] * N)
416
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700417 def test_reset_internal_locks(self):
Berker Peksag6d34bbb2016-04-29 17:25:29 +0300418 # ensure that condition is still using a Lock after reset
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700419 evt = self.eventtype()
Berker Peksag6d34bbb2016-04-29 17:25:29 +0300420 with evt._cond:
421 self.assertFalse(evt._cond.acquire(False))
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700422 evt._reset_internal_locks()
Berker Peksag6d34bbb2016-04-29 17:25:29 +0300423 with evt._cond:
424 self.assertFalse(evt._cond.acquire(False))
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700425
Antoine Pitrou557934f2009-11-06 22:41:14 +0000426
427class ConditionTests(BaseTestCase):
428 """
429 Tests for condition variables.
430 """
431
432 def test_acquire(self):
433 cond = self.condtype()
434 # Be default we have an RLock: the condition can be acquired multiple
435 # times.
436 cond.acquire()
437 cond.acquire()
438 cond.release()
439 cond.release()
440 lock = threading.Lock()
441 cond = self.condtype(lock)
442 cond.acquire()
443 self.assertFalse(lock.acquire(False))
444 cond.release()
445 self.assertTrue(lock.acquire(False))
446 self.assertFalse(cond.acquire(False))
447 lock.release()
448 with cond:
449 self.assertFalse(lock.acquire(False))
450
451 def test_unacquired_wait(self):
452 cond = self.condtype()
453 self.assertRaises(RuntimeError, cond.wait)
454
455 def test_unacquired_notify(self):
456 cond = self.condtype()
457 self.assertRaises(RuntimeError, cond.notify)
458
459 def _check_notify(self, cond):
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000460 # Note that this test is sensitive to timing. If the worker threads
461 # don't execute in a timely fashion, the main thread may think they
462 # are further along then they are. The main thread therefore issues
463 # _wait() statements to try to make sure that it doesn't race ahead
464 # of the workers.
465 # Secondly, this test assumes that condition variables are not subject
466 # to spurious wakeups. The absence of spurious wakeups is an implementation
467 # detail of Condition Cariables in current CPython, but in general, not
468 # a guaranteed property of condition variables as a programming
469 # construct. In particular, it is possible that this can no longer
470 # be conveniently guaranteed should their implementation ever change.
Antoine Pitrou557934f2009-11-06 22:41:14 +0000471 N = 5
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300472 ready = []
Antoine Pitrou557934f2009-11-06 22:41:14 +0000473 results1 = []
474 results2 = []
475 phase_num = 0
476 def f():
477 cond.acquire()
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300478 ready.append(phase_num)
Georg Brandlb9a43912010-10-28 09:03:20 +0000479 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000480 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000481 results1.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000482 cond.acquire()
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300483 ready.append(phase_num)
Georg Brandlb9a43912010-10-28 09:03:20 +0000484 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000485 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000486 results2.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000487 b = Bunch(f, N)
488 b.wait_for_started()
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300489 # first wait, to ensure all workers settle into cond.wait() before
490 # we continue. See issues #8799 and #30727.
491 while len(ready) < 5:
492 _wait()
493 ready.clear()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000494 self.assertEqual(results1, [])
495 # Notify 3 threads at first
496 cond.acquire()
497 cond.notify(3)
498 _wait()
499 phase_num = 1
500 cond.release()
501 while len(results1) < 3:
502 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000503 self.assertEqual(results1, [(True, 1)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000504 self.assertEqual(results2, [])
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300505 # make sure all awaken workers settle into cond.wait()
506 while len(ready) < 3:
507 _wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000508 # Notify 5 threads: they might be in their first or second wait
509 cond.acquire()
510 cond.notify(5)
511 _wait()
512 phase_num = 2
513 cond.release()
514 while len(results1) + len(results2) < 8:
515 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000516 self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
517 self.assertEqual(results2, [(True, 2)] * 3)
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300518 # make sure all workers settle into cond.wait()
519 while len(ready) < 5:
520 _wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000521 # Notify all threads: they are all in their second wait
522 cond.acquire()
523 cond.notify_all()
524 _wait()
525 phase_num = 3
526 cond.release()
527 while len(results2) < 5:
528 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000529 self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
530 self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000531 b.wait_for_finished()
532
533 def test_notify(self):
534 cond = self.condtype()
535 self._check_notify(cond)
536 # A second time, to check internal state is still ok.
537 self._check_notify(cond)
538
539 def test_timeout(self):
540 cond = self.condtype()
541 results = []
542 N = 5
543 def f():
544 cond.acquire()
545 t1 = time.time()
Georg Brandlb9a43912010-10-28 09:03:20 +0000546 result = cond.wait(0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000547 t2 = time.time()
548 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000549 results.append((t2 - t1, result))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000550 Bunch(f, N).wait_for_finished()
Georg Brandlb9a43912010-10-28 09:03:20 +0000551 self.assertEqual(len(results), N)
552 for dt, result in results:
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000553 self.assertTimeout(dt, 0.5)
Georg Brandlb9a43912010-10-28 09:03:20 +0000554 # Note that conceptually (that"s the condition variable protocol)
555 # a wait() may succeed even if no one notifies us and before any
556 # timeout occurs. Spurious wakeups can occur.
557 # This makes it hard to verify the result value.
558 # In practice, this implementation has no spurious wakeups.
559 self.assertFalse(result)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000560
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000561 def test_waitfor(self):
562 cond = self.condtype()
563 state = 0
564 def f():
565 with cond:
566 result = cond.wait_for(lambda : state==4)
567 self.assertTrue(result)
568 self.assertEqual(state, 4)
569 b = Bunch(f, 1)
570 b.wait_for_started()
Victor Stinner3349bca2011-05-18 00:16:14 +0200571 for i in range(4):
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000572 time.sleep(0.01)
573 with cond:
574 state += 1
575 cond.notify()
576 b.wait_for_finished()
577
578 def test_waitfor_timeout(self):
579 cond = self.condtype()
580 state = 0
581 success = []
582 def f():
583 with cond:
584 dt = time.time()
585 result = cond.wait_for(lambda : state==4, timeout=0.1)
586 dt = time.time() - dt
587 self.assertFalse(result)
588 self.assertTimeout(dt, 0.1)
589 success.append(None)
590 b = Bunch(f, 1)
591 b.wait_for_started()
592 # Only increment 3 times, so state == 4 is never reached.
593 for i in range(3):
594 time.sleep(0.01)
595 with cond:
596 state += 1
597 cond.notify()
598 b.wait_for_finished()
599 self.assertEqual(len(success), 1)
600
Antoine Pitrou557934f2009-11-06 22:41:14 +0000601
602class BaseSemaphoreTests(BaseTestCase):
603 """
604 Common tests for {bounded, unbounded} semaphore objects.
605 """
606
607 def test_constructor(self):
608 self.assertRaises(ValueError, self.semtype, value = -1)
609 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
610
611 def test_acquire(self):
612 sem = self.semtype(1)
613 sem.acquire()
614 sem.release()
615 sem = self.semtype(2)
616 sem.acquire()
617 sem.acquire()
618 sem.release()
619 sem.release()
620
621 def test_acquire_destroy(self):
622 sem = self.semtype()
623 sem.acquire()
624 del sem
625
626 def test_acquire_contended(self):
627 sem = self.semtype(7)
628 sem.acquire()
629 N = 10
630 results1 = []
631 results2 = []
632 phase_num = 0
633 def f():
634 sem.acquire()
635 results1.append(phase_num)
636 sem.acquire()
637 results2.append(phase_num)
638 b = Bunch(f, 10)
639 b.wait_for_started()
640 while len(results1) + len(results2) < 6:
641 _wait()
642 self.assertEqual(results1 + results2, [0] * 6)
643 phase_num = 1
644 for i in range(7):
645 sem.release()
646 while len(results1) + len(results2) < 13:
647 _wait()
648 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
649 phase_num = 2
650 for i in range(6):
651 sem.release()
652 while len(results1) + len(results2) < 19:
653 _wait()
654 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
655 # The semaphore is still locked
656 self.assertFalse(sem.acquire(False))
657 # Final release, to let the last thread finish
658 sem.release()
659 b.wait_for_finished()
660
661 def test_try_acquire(self):
662 sem = self.semtype(2)
663 self.assertTrue(sem.acquire(False))
664 self.assertTrue(sem.acquire(False))
665 self.assertFalse(sem.acquire(False))
666 sem.release()
667 self.assertTrue(sem.acquire(False))
668
669 def test_try_acquire_contended(self):
670 sem = self.semtype(4)
671 sem.acquire()
672 results = []
673 def f():
674 results.append(sem.acquire(False))
675 results.append(sem.acquire(False))
676 Bunch(f, 5).wait_for_finished()
677 # There can be a thread switch between acquiring the semaphore and
678 # appending the result, therefore results will not necessarily be
679 # ordered.
680 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
681
Antoine Pitrou0454af92010-04-17 23:51:58 +0000682 def test_acquire_timeout(self):
683 sem = self.semtype(2)
684 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
685 self.assertTrue(sem.acquire(timeout=0.005))
686 self.assertTrue(sem.acquire(timeout=0.005))
687 self.assertFalse(sem.acquire(timeout=0.005))
688 sem.release()
689 self.assertTrue(sem.acquire(timeout=0.005))
690 t = time.time()
691 self.assertFalse(sem.acquire(timeout=0.5))
692 dt = time.time() - t
693 self.assertTimeout(dt, 0.5)
694
Antoine Pitrou557934f2009-11-06 22:41:14 +0000695 def test_default_value(self):
696 # The default initial value is 1.
697 sem = self.semtype()
698 sem.acquire()
699 def f():
700 sem.acquire()
701 sem.release()
702 b = Bunch(f, 1)
703 b.wait_for_started()
704 _wait()
705 self.assertFalse(b.finished)
706 sem.release()
707 b.wait_for_finished()
708
709 def test_with(self):
710 sem = self.semtype(2)
711 def _with(err=None):
712 with sem:
713 self.assertTrue(sem.acquire(False))
714 sem.release()
715 with sem:
716 self.assertFalse(sem.acquire(False))
717 if err:
718 raise err
719 _with()
720 self.assertTrue(sem.acquire(False))
721 sem.release()
722 self.assertRaises(TypeError, _with, TypeError)
723 self.assertTrue(sem.acquire(False))
724 sem.release()
725
726class SemaphoreTests(BaseSemaphoreTests):
727 """
728 Tests for unbounded semaphores.
729 """
730
731 def test_release_unacquired(self):
732 # Unbounded releases are allowed and increment the semaphore's value
733 sem = self.semtype(1)
734 sem.release()
735 sem.acquire()
736 sem.acquire()
737 sem.release()
738
739
740class BoundedSemaphoreTests(BaseSemaphoreTests):
741 """
742 Tests for bounded semaphores.
743 """
744
745 def test_release_unacquired(self):
746 # Cannot go past the initial value
747 sem = self.semtype()
748 self.assertRaises(ValueError, sem.release)
749 sem.acquire()
750 sem.release()
751 self.assertRaises(ValueError, sem.release)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000752
753
754class BarrierTests(BaseTestCase):
755 """
756 Tests for Barrier objects.
757 """
758 N = 5
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000759 defaultTimeout = 2.0
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000760
761 def setUp(self):
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000762 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000763 def tearDown(self):
764 self.barrier.abort()
765
766 def run_threads(self, f):
767 b = Bunch(f, self.N-1)
768 f()
769 b.wait_for_finished()
770
771 def multipass(self, results, n):
772 m = self.barrier.parties
773 self.assertEqual(m, self.N)
774 for i in range(n):
775 results[0].append(True)
776 self.assertEqual(len(results[1]), i * m)
777 self.barrier.wait()
778 results[1].append(True)
779 self.assertEqual(len(results[0]), (i + 1) * m)
780 self.barrier.wait()
781 self.assertEqual(self.barrier.n_waiting, 0)
782 self.assertFalse(self.barrier.broken)
783
784 def test_barrier(self, passes=1):
785 """
786 Test that a barrier is passed in lockstep
787 """
788 results = [[],[]]
789 def f():
790 self.multipass(results, passes)
791 self.run_threads(f)
792
793 def test_barrier_10(self):
794 """
795 Test that a barrier works for 10 consecutive runs
796 """
797 return self.test_barrier(10)
798
799 def test_wait_return(self):
800 """
801 test the return value from barrier.wait
802 """
803 results = []
804 def f():
805 r = self.barrier.wait()
806 results.append(r)
807
808 self.run_threads(f)
809 self.assertEqual(sum(results), sum(range(self.N)))
810
811 def test_action(self):
812 """
813 Test the 'action' callback
814 """
815 results = []
816 def action():
817 results.append(True)
818 barrier = self.barriertype(self.N, action)
819 def f():
820 barrier.wait()
821 self.assertEqual(len(results), 1)
822
823 self.run_threads(f)
824
825 def test_abort(self):
826 """
827 Test that an abort will put the barrier in a broken state
828 """
829 results1 = []
830 results2 = []
831 def f():
832 try:
833 i = self.barrier.wait()
834 if i == self.N//2:
835 raise RuntimeError
836 self.barrier.wait()
837 results1.append(True)
838 except threading.BrokenBarrierError:
839 results2.append(True)
840 except RuntimeError:
841 self.barrier.abort()
842 pass
843
844 self.run_threads(f)
845 self.assertEqual(len(results1), 0)
846 self.assertEqual(len(results2), self.N-1)
847 self.assertTrue(self.barrier.broken)
848
849 def test_reset(self):
850 """
851 Test that a 'reset' on a barrier frees the waiting threads
852 """
853 results1 = []
854 results2 = []
855 results3 = []
856 def f():
857 i = self.barrier.wait()
858 if i == self.N//2:
859 # Wait until the other threads are all in the barrier.
860 while self.barrier.n_waiting < self.N-1:
861 time.sleep(0.001)
862 self.barrier.reset()
863 else:
864 try:
865 self.barrier.wait()
866 results1.append(True)
867 except threading.BrokenBarrierError:
868 results2.append(True)
869 # Now, pass the barrier again
870 self.barrier.wait()
871 results3.append(True)
872
873 self.run_threads(f)
874 self.assertEqual(len(results1), 0)
875 self.assertEqual(len(results2), self.N-1)
876 self.assertEqual(len(results3), self.N)
877
878
879 def test_abort_and_reset(self):
880 """
881 Test that a barrier can be reset after being broken.
882 """
883 results1 = []
884 results2 = []
885 results3 = []
886 barrier2 = self.barriertype(self.N)
887 def f():
888 try:
889 i = self.barrier.wait()
890 if i == self.N//2:
891 raise RuntimeError
892 self.barrier.wait()
893 results1.append(True)
894 except threading.BrokenBarrierError:
895 results2.append(True)
896 except RuntimeError:
897 self.barrier.abort()
898 pass
899 # Synchronize and reset the barrier. Must synchronize first so
900 # that everyone has left it when we reset, and after so that no
901 # one enters it before the reset.
902 if barrier2.wait() == self.N//2:
903 self.barrier.reset()
904 barrier2.wait()
905 self.barrier.wait()
906 results3.append(True)
907
908 self.run_threads(f)
909 self.assertEqual(len(results1), 0)
910 self.assertEqual(len(results2), self.N-1)
911 self.assertEqual(len(results3), self.N)
912
913 def test_timeout(self):
914 """
915 Test wait(timeout)
916 """
917 def f():
918 i = self.barrier.wait()
919 if i == self.N // 2:
920 # One thread is late!
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000921 time.sleep(1.0)
922 # Default timeout is 2.0, so this is shorter.
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000923 self.assertRaises(threading.BrokenBarrierError,
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000924 self.barrier.wait, 0.5)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000925 self.run_threads(f)
926
927 def test_default_timeout(self):
928 """
929 Test the barrier's default timeout
930 """
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200931 # create a barrier with a low default timeout
932 barrier = self.barriertype(self.N, timeout=0.3)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000933 def f():
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000934 i = barrier.wait()
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000935 if i == self.N // 2:
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200936 # One thread is later than the default timeout of 0.3s.
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000937 time.sleep(1.0)
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000938 self.assertRaises(threading.BrokenBarrierError, barrier.wait)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000939 self.run_threads(f)
940
941 def test_single_thread(self):
942 b = self.barriertype(1)
943 b.wait()
944 b.wait()