blob: d69bcc9496843f372ca6078c0e6601cc59677643 [file] [log] [blame]
Antoine Pitrou557934f2009-11-06 22:41:14 +00001"""
2Various tests for synchronization primitives.
3"""
4
Victor Stinner87255be2020-04-07 23:11:49 +02005import os
Antoine Pitrou557934f2009-11-06 22:41:14 +00006import sys
7import time
Victor Stinner2a129742011-05-30 23:02:52 +02008from _thread import start_new_thread, TIMEOUT_MAX
Antoine Pitrou557934f2009-11-06 22:41:14 +00009import threading
10import unittest
Raymond Hettinger7836a272015-10-09 00:03:51 -040011import weakref
Antoine Pitrou557934f2009-11-06 22:41:14 +000012
13from test import support
Hai Shie80697d2020-05-28 06:10:27 +080014from test.support import threading_helper
Antoine Pitrou557934f2009-11-06 22:41:14 +000015
16
Victor Stinner87255be2020-04-07 23:11:49 +020017requires_fork = unittest.skipUnless(hasattr(os, 'fork'),
18 "platform doesn't support fork "
19 "(no _at_fork_reinit method)")
20
21
Antoine Pitrou557934f2009-11-06 22:41:14 +000022def _wait():
23 # A crude wait/yield function not relying on synchronization primitives.
24 time.sleep(0.01)
25
26class Bunch(object):
27 """
28 A bunch of threads.
29 """
30 def __init__(self, f, n, wait_before_exit=False):
31 """
32 Construct a bunch of `n` threads running the same function `f`.
33 If `wait_before_exit` is True, the threads won't terminate until
34 do_finish() is called.
35 """
36 self.f = f
37 self.n = n
38 self.started = []
39 self.finished = []
40 self._can_exit = not wait_before_exit
Hai Shie80697d2020-05-28 06:10:27 +080041 self.wait_thread = threading_helper.wait_threads_exit()
Victor Stinnerff40ecd2017-09-14 13:07:24 -070042 self.wait_thread.__enter__()
43
Antoine Pitrou557934f2009-11-06 22:41:14 +000044 def task():
Victor Stinner2a129742011-05-30 23:02:52 +020045 tid = threading.get_ident()
Antoine Pitrou557934f2009-11-06 22:41:14 +000046 self.started.append(tid)
47 try:
48 f()
49 finally:
50 self.finished.append(tid)
51 while not self._can_exit:
52 _wait()
Victor Stinnerff40ecd2017-09-14 13:07:24 -070053
Serhiy Storchaka9db55002015-03-28 20:38:37 +020054 try:
55 for i in range(n):
56 start_new_thread(task, ())
57 except:
58 self._can_exit = True
59 raise
Antoine Pitrou557934f2009-11-06 22:41:14 +000060
61 def wait_for_started(self):
62 while len(self.started) < self.n:
63 _wait()
64
65 def wait_for_finished(self):
66 while len(self.finished) < self.n:
67 _wait()
Victor Stinnerff40ecd2017-09-14 13:07:24 -070068 # Wait for threads exit
69 self.wait_thread.__exit__(None, None, None)
Antoine Pitrou557934f2009-11-06 22:41:14 +000070
71 def do_finish(self):
72 self._can_exit = True
73
74
75class BaseTestCase(unittest.TestCase):
76 def setUp(self):
Hai Shie80697d2020-05-28 06:10:27 +080077 self._threads = threading_helper.threading_setup()
Antoine Pitrou557934f2009-11-06 22:41:14 +000078
79 def tearDown(self):
Hai Shie80697d2020-05-28 06:10:27 +080080 threading_helper.threading_cleanup(*self._threads)
Antoine Pitrou557934f2009-11-06 22:41:14 +000081 support.reap_children()
82
Antoine Pitrou7c3e5772010-04-14 15:44:10 +000083 def assertTimeout(self, actual, expected):
Victor Stinner2cf4c202018-12-17 09:36:36 +010084 # The waiting and/or time.monotonic() can be imprecise, which
Antoine Pitrou7c3e5772010-04-14 15:44:10 +000085 # is why comparing to the expected value would sometimes fail
86 # (especially under Windows).
87 self.assertGreaterEqual(actual, expected * 0.6)
88 # Test nothing insane happened
89 self.assertLess(actual, expected * 10.0)
90
Antoine Pitrou557934f2009-11-06 22:41:14 +000091
92class BaseLockTests(BaseTestCase):
93 """
94 Tests for both recursive and non-recursive locks.
95 """
96
97 def test_constructor(self):
98 lock = self.locktype()
99 del lock
100
Christian Heimesc5d95b12013-07-30 15:54:39 +0200101 def test_repr(self):
102 lock = self.locktype()
Raymond Hettinger62f4dad2014-05-25 18:22:35 -0700103 self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
104 del lock
105
106 def test_locked_repr(self):
107 lock = self.locktype()
108 lock.acquire()
109 self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
Christian Heimesc5d95b12013-07-30 15:54:39 +0200110 del lock
111
Antoine Pitrou557934f2009-11-06 22:41:14 +0000112 def test_acquire_destroy(self):
113 lock = self.locktype()
114 lock.acquire()
115 del lock
116
117 def test_acquire_release(self):
118 lock = self.locktype()
119 lock.acquire()
120 lock.release()
121 del lock
122
123 def test_try_acquire(self):
124 lock = self.locktype()
125 self.assertTrue(lock.acquire(False))
126 lock.release()
127
128 def test_try_acquire_contended(self):
129 lock = self.locktype()
130 lock.acquire()
131 result = []
132 def f():
133 result.append(lock.acquire(False))
134 Bunch(f, 1).wait_for_finished()
135 self.assertFalse(result[0])
136 lock.release()
137
138 def test_acquire_contended(self):
139 lock = self.locktype()
140 lock.acquire()
141 N = 5
142 def f():
143 lock.acquire()
144 lock.release()
145
146 b = Bunch(f, N)
147 b.wait_for_started()
148 _wait()
149 self.assertEqual(len(b.finished), 0)
150 lock.release()
151 b.wait_for_finished()
152 self.assertEqual(len(b.finished), N)
153
154 def test_with(self):
155 lock = self.locktype()
156 def f():
157 lock.acquire()
158 lock.release()
159 def _with(err=None):
160 with lock:
161 if err is not None:
162 raise err
163 _with()
164 # Check the lock is unacquired
165 Bunch(f, 1).wait_for_finished()
166 self.assertRaises(TypeError, _with, TypeError)
167 # Check the lock is unacquired
168 Bunch(f, 1).wait_for_finished()
169
Antoine Pitroub0872682009-11-09 16:08:16 +0000170 def test_thread_leak(self):
171 # The lock shouldn't leak a Thread instance when used from a foreign
172 # (non-threading) thread.
173 lock = self.locktype()
174 def f():
175 lock.acquire()
176 lock.release()
177 n = len(threading.enumerate())
178 # We run many threads in the hope that existing threads ids won't
179 # be recycled.
180 Bunch(f, 15).wait_for_finished()
Antoine Pitrou45fdb452011-04-04 21:59:09 +0200181 if len(threading.enumerate()) != n:
182 # There is a small window during which a Thread instance's
183 # target function has finished running, but the Thread is still
184 # alive and registered. Avoid spurious failures by waiting a
185 # bit more (seen on a buildbot).
186 time.sleep(0.4)
187 self.assertEqual(n, len(threading.enumerate()))
Antoine Pitroub0872682009-11-09 16:08:16 +0000188
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000189 def test_timeout(self):
190 lock = self.locktype()
191 # Can't set timeout if not blocking
Serhiy Storchaka1f21eaa2019-09-01 12:16:51 +0300192 self.assertRaises(ValueError, lock.acquire, False, 1)
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000193 # Invalid timeout values
194 self.assertRaises(ValueError, lock.acquire, timeout=-100)
195 self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
196 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
197 # TIMEOUT_MAX is ok
198 lock.acquire(timeout=TIMEOUT_MAX)
199 lock.release()
Victor Stinner2cf4c202018-12-17 09:36:36 +0100200 t1 = time.monotonic()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000201 self.assertTrue(lock.acquire(timeout=5))
Victor Stinner2cf4c202018-12-17 09:36:36 +0100202 t2 = time.monotonic()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000203 # Just a sanity test that it didn't actually wait for the timeout.
204 self.assertLess(t2 - t1, 5)
205 results = []
206 def f():
Victor Stinner2cf4c202018-12-17 09:36:36 +0100207 t1 = time.monotonic()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000208 results.append(lock.acquire(timeout=0.5))
Victor Stinner2cf4c202018-12-17 09:36:36 +0100209 t2 = time.monotonic()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000210 results.append(t2 - t1)
211 Bunch(f, 1).wait_for_finished()
212 self.assertFalse(results[0])
213 self.assertTimeout(results[1], 0.5)
214
Raymond Hettinger7836a272015-10-09 00:03:51 -0400215 def test_weakref_exists(self):
216 lock = self.locktype()
217 ref = weakref.ref(lock)
218 self.assertIsNotNone(ref())
219
220 def test_weakref_deleted(self):
221 lock = self.locktype()
222 ref = weakref.ref(lock)
223 del lock
224 self.assertIsNone(ref())
225
Antoine Pitrou557934f2009-11-06 22:41:14 +0000226
227class LockTests(BaseLockTests):
228 """
229 Tests for non-recursive, weak locks
230 (which can be acquired and released from different threads).
231 """
232 def test_reacquire(self):
233 # Lock needs to be released before re-acquiring.
234 lock = self.locktype()
235 phase = []
Victor Stinnerff40ecd2017-09-14 13:07:24 -0700236
Antoine Pitrou557934f2009-11-06 22:41:14 +0000237 def f():
238 lock.acquire()
239 phase.append(None)
240 lock.acquire()
241 phase.append(None)
Victor Stinnerff40ecd2017-09-14 13:07:24 -0700242
Hai Shie80697d2020-05-28 06:10:27 +0800243 with threading_helper.wait_threads_exit():
Victor Stinnerff40ecd2017-09-14 13:07:24 -0700244 start_new_thread(f, ())
245 while len(phase) == 0:
246 _wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000247 _wait()
Victor Stinnerff40ecd2017-09-14 13:07:24 -0700248 self.assertEqual(len(phase), 1)
249 lock.release()
250 while len(phase) == 1:
251 _wait()
252 self.assertEqual(len(phase), 2)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000253
254 def test_different_thread(self):
255 # Lock can be released from a different thread.
256 lock = self.locktype()
257 lock.acquire()
258 def f():
259 lock.release()
260 b = Bunch(f, 1)
261 b.wait_for_finished()
262 lock.acquire()
263 lock.release()
264
Antoine Pitrou7899acf2011-03-31 01:00:32 +0200265 def test_state_after_timeout(self):
266 # Issue #11618: check that lock is in a proper state after a
267 # (non-zero) timeout.
268 lock = self.locktype()
269 lock.acquire()
270 self.assertFalse(lock.acquire(timeout=0.01))
271 lock.release()
272 self.assertFalse(lock.locked())
273 self.assertTrue(lock.acquire(blocking=False))
274
Victor Stinner87255be2020-04-07 23:11:49 +0200275 @requires_fork
276 def test_at_fork_reinit(self):
277 def use_lock(lock):
278 # make sure that the lock still works normally
279 # after _at_fork_reinit()
280 lock.acquire()
281 lock.release()
282
283 # unlocked
284 lock = self.locktype()
285 lock._at_fork_reinit()
286 use_lock(lock)
287
288 # locked: _at_fork_reinit() resets the lock to the unlocked state
289 lock2 = self.locktype()
290 lock2.acquire()
291 lock2._at_fork_reinit()
292 use_lock(lock2)
293
Antoine Pitrou557934f2009-11-06 22:41:14 +0000294
295class RLockTests(BaseLockTests):
296 """
297 Tests for recursive locks.
298 """
299 def test_reacquire(self):
300 lock = self.locktype()
301 lock.acquire()
302 lock.acquire()
303 lock.release()
304 lock.acquire()
305 lock.release()
306 lock.release()
307
308 def test_release_unacquired(self):
309 # Cannot release an unacquired lock
310 lock = self.locktype()
311 self.assertRaises(RuntimeError, lock.release)
312 lock.acquire()
313 lock.acquire()
314 lock.release()
315 lock.acquire()
316 lock.release()
317 lock.release()
318 self.assertRaises(RuntimeError, lock.release)
Antoine Pitrouea3eb882012-05-17 18:55:59 +0200319
320 def test_release_save_unacquired(self):
321 # Cannot _release_save an unacquired lock
322 lock = self.locktype()
323 self.assertRaises(RuntimeError, lock._release_save)
324 lock.acquire()
325 lock.acquire()
326 lock.release()
327 lock.acquire()
328 lock.release()
329 lock.release()
Victor Stinnerc2824d42011-04-24 23:41:33 +0200330 self.assertRaises(RuntimeError, lock._release_save)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000331
332 def test_different_thread(self):
333 # Cannot release from a different thread
334 lock = self.locktype()
335 def f():
336 lock.acquire()
337 b = Bunch(f, 1, True)
338 try:
339 self.assertRaises(RuntimeError, lock.release)
340 finally:
341 b.do_finish()
Victor Stinner096ae332017-09-13 16:41:08 -0700342 b.wait_for_finished()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000343
344 def test__is_owned(self):
345 lock = self.locktype()
346 self.assertFalse(lock._is_owned())
347 lock.acquire()
348 self.assertTrue(lock._is_owned())
349 lock.acquire()
350 self.assertTrue(lock._is_owned())
351 result = []
352 def f():
353 result.append(lock._is_owned())
354 Bunch(f, 1).wait_for_finished()
355 self.assertFalse(result[0])
356 lock.release()
357 self.assertTrue(lock._is_owned())
358 lock.release()
359 self.assertFalse(lock._is_owned())
360
361
362class EventTests(BaseTestCase):
363 """
364 Tests for Event objects.
365 """
366
367 def test_is_set(self):
368 evt = self.eventtype()
369 self.assertFalse(evt.is_set())
370 evt.set()
371 self.assertTrue(evt.is_set())
372 evt.set()
373 self.assertTrue(evt.is_set())
374 evt.clear()
375 self.assertFalse(evt.is_set())
376 evt.clear()
377 self.assertFalse(evt.is_set())
378
379 def _check_notify(self, evt):
380 # All threads get notified
381 N = 5
382 results1 = []
383 results2 = []
384 def f():
385 results1.append(evt.wait())
386 results2.append(evt.wait())
387 b = Bunch(f, N)
388 b.wait_for_started()
389 _wait()
390 self.assertEqual(len(results1), 0)
391 evt.set()
392 b.wait_for_finished()
393 self.assertEqual(results1, [True] * N)
394 self.assertEqual(results2, [True] * N)
395
396 def test_notify(self):
397 evt = self.eventtype()
398 self._check_notify(evt)
399 # Another time, after an explicit clear()
400 evt.set()
401 evt.clear()
402 self._check_notify(evt)
403
404 def test_timeout(self):
405 evt = self.eventtype()
406 results1 = []
407 results2 = []
408 N = 5
409 def f():
410 results1.append(evt.wait(0.0))
Victor Stinner2cf4c202018-12-17 09:36:36 +0100411 t1 = time.monotonic()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000412 r = evt.wait(0.5)
Victor Stinner2cf4c202018-12-17 09:36:36 +0100413 t2 = time.monotonic()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000414 results2.append((r, t2 - t1))
415 Bunch(f, N).wait_for_finished()
416 self.assertEqual(results1, [False] * N)
417 for r, dt in results2:
418 self.assertFalse(r)
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000419 self.assertTimeout(dt, 0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000420 # The event is set
421 results1 = []
422 results2 = []
423 evt.set()
424 Bunch(f, N).wait_for_finished()
425 self.assertEqual(results1, [True] * N)
426 for r, dt in results2:
427 self.assertTrue(r)
428
Charles-François Natalided03482012-01-07 18:24:56 +0100429 def test_set_and_clear(self):
430 # Issue #13502: check that wait() returns true even when the event is
431 # cleared before the waiting thread is woken up.
432 evt = self.eventtype()
433 results = []
Victor Stinner81950492018-07-19 10:49:58 +0200434 timeout = 0.250
Charles-François Natalided03482012-01-07 18:24:56 +0100435 N = 5
436 def f():
Victor Stinner81950492018-07-19 10:49:58 +0200437 results.append(evt.wait(timeout * 4))
Charles-François Natalided03482012-01-07 18:24:56 +0100438 b = Bunch(f, N)
439 b.wait_for_started()
Victor Stinner81950492018-07-19 10:49:58 +0200440 time.sleep(timeout)
Charles-François Natalided03482012-01-07 18:24:56 +0100441 evt.set()
442 evt.clear()
443 b.wait_for_finished()
444 self.assertEqual(results, [True] * N)
445
Victor Stinner87255be2020-04-07 23:11:49 +0200446 @requires_fork
447 def test_at_fork_reinit(self):
Berker Peksag6d34bbb2016-04-29 17:25:29 +0300448 # ensure that condition is still using a Lock after reset
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700449 evt = self.eventtype()
Berker Peksag6d34bbb2016-04-29 17:25:29 +0300450 with evt._cond:
451 self.assertFalse(evt._cond.acquire(False))
Victor Stinner87255be2020-04-07 23:11:49 +0200452 evt._at_fork_reinit()
Berker Peksag6d34bbb2016-04-29 17:25:29 +0300453 with evt._cond:
454 self.assertFalse(evt._cond.acquire(False))
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700455
Antoine Pitrou557934f2009-11-06 22:41:14 +0000456
457class ConditionTests(BaseTestCase):
458 """
459 Tests for condition variables.
460 """
461
462 def test_acquire(self):
463 cond = self.condtype()
464 # Be default we have an RLock: the condition can be acquired multiple
465 # times.
466 cond.acquire()
467 cond.acquire()
468 cond.release()
469 cond.release()
470 lock = threading.Lock()
471 cond = self.condtype(lock)
472 cond.acquire()
473 self.assertFalse(lock.acquire(False))
474 cond.release()
475 self.assertTrue(lock.acquire(False))
476 self.assertFalse(cond.acquire(False))
477 lock.release()
478 with cond:
479 self.assertFalse(lock.acquire(False))
480
481 def test_unacquired_wait(self):
482 cond = self.condtype()
483 self.assertRaises(RuntimeError, cond.wait)
484
485 def test_unacquired_notify(self):
486 cond = self.condtype()
487 self.assertRaises(RuntimeError, cond.notify)
488
489 def _check_notify(self, cond):
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000490 # Note that this test is sensitive to timing. If the worker threads
491 # don't execute in a timely fashion, the main thread may think they
492 # are further along then they are. The main thread therefore issues
493 # _wait() statements to try to make sure that it doesn't race ahead
494 # of the workers.
495 # Secondly, this test assumes that condition variables are not subject
496 # to spurious wakeups. The absence of spurious wakeups is an implementation
Min ho Kim39d87b52019-08-31 06:21:19 +1000497 # detail of Condition Variables in current CPython, but in general, not
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000498 # a guaranteed property of condition variables as a programming
499 # construct. In particular, it is possible that this can no longer
500 # be conveniently guaranteed should their implementation ever change.
Antoine Pitrou557934f2009-11-06 22:41:14 +0000501 N = 5
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300502 ready = []
Antoine Pitrou557934f2009-11-06 22:41:14 +0000503 results1 = []
504 results2 = []
505 phase_num = 0
506 def f():
507 cond.acquire()
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300508 ready.append(phase_num)
Georg Brandlb9a43912010-10-28 09:03:20 +0000509 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000510 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000511 results1.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000512 cond.acquire()
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300513 ready.append(phase_num)
Georg Brandlb9a43912010-10-28 09:03:20 +0000514 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000515 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000516 results2.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000517 b = Bunch(f, N)
518 b.wait_for_started()
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300519 # first wait, to ensure all workers settle into cond.wait() before
520 # we continue. See issues #8799 and #30727.
521 while len(ready) < 5:
522 _wait()
523 ready.clear()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000524 self.assertEqual(results1, [])
525 # Notify 3 threads at first
526 cond.acquire()
527 cond.notify(3)
528 _wait()
529 phase_num = 1
530 cond.release()
531 while len(results1) < 3:
532 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000533 self.assertEqual(results1, [(True, 1)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000534 self.assertEqual(results2, [])
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300535 # make sure all awaken workers settle into cond.wait()
536 while len(ready) < 3:
537 _wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000538 # Notify 5 threads: they might be in their first or second wait
539 cond.acquire()
540 cond.notify(5)
541 _wait()
542 phase_num = 2
543 cond.release()
544 while len(results1) + len(results2) < 8:
545 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000546 self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
547 self.assertEqual(results2, [(True, 2)] * 3)
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300548 # make sure all workers settle into cond.wait()
549 while len(ready) < 5:
550 _wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000551 # Notify all threads: they are all in their second wait
552 cond.acquire()
553 cond.notify_all()
554 _wait()
555 phase_num = 3
556 cond.release()
557 while len(results2) < 5:
558 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000559 self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
560 self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000561 b.wait_for_finished()
562
563 def test_notify(self):
564 cond = self.condtype()
565 self._check_notify(cond)
566 # A second time, to check internal state is still ok.
567 self._check_notify(cond)
568
569 def test_timeout(self):
570 cond = self.condtype()
571 results = []
572 N = 5
573 def f():
574 cond.acquire()
Victor Stinner2cf4c202018-12-17 09:36:36 +0100575 t1 = time.monotonic()
Georg Brandlb9a43912010-10-28 09:03:20 +0000576 result = cond.wait(0.5)
Victor Stinner2cf4c202018-12-17 09:36:36 +0100577 t2 = time.monotonic()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000578 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000579 results.append((t2 - t1, result))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000580 Bunch(f, N).wait_for_finished()
Georg Brandlb9a43912010-10-28 09:03:20 +0000581 self.assertEqual(len(results), N)
582 for dt, result in results:
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000583 self.assertTimeout(dt, 0.5)
Georg Brandlb9a43912010-10-28 09:03:20 +0000584 # Note that conceptually (that"s the condition variable protocol)
585 # a wait() may succeed even if no one notifies us and before any
586 # timeout occurs. Spurious wakeups can occur.
587 # This makes it hard to verify the result value.
588 # In practice, this implementation has no spurious wakeups.
589 self.assertFalse(result)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000590
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000591 def test_waitfor(self):
592 cond = self.condtype()
593 state = 0
594 def f():
595 with cond:
596 result = cond.wait_for(lambda : state==4)
597 self.assertTrue(result)
598 self.assertEqual(state, 4)
599 b = Bunch(f, 1)
600 b.wait_for_started()
Victor Stinner3349bca2011-05-18 00:16:14 +0200601 for i in range(4):
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000602 time.sleep(0.01)
603 with cond:
604 state += 1
605 cond.notify()
606 b.wait_for_finished()
607
608 def test_waitfor_timeout(self):
609 cond = self.condtype()
610 state = 0
611 success = []
612 def f():
613 with cond:
Victor Stinner2cf4c202018-12-17 09:36:36 +0100614 dt = time.monotonic()
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000615 result = cond.wait_for(lambda : state==4, timeout=0.1)
Victor Stinner2cf4c202018-12-17 09:36:36 +0100616 dt = time.monotonic() - dt
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000617 self.assertFalse(result)
618 self.assertTimeout(dt, 0.1)
619 success.append(None)
620 b = Bunch(f, 1)
621 b.wait_for_started()
622 # Only increment 3 times, so state == 4 is never reached.
623 for i in range(3):
624 time.sleep(0.01)
625 with cond:
626 state += 1
627 cond.notify()
628 b.wait_for_finished()
629 self.assertEqual(len(success), 1)
630
Antoine Pitrou557934f2009-11-06 22:41:14 +0000631
632class BaseSemaphoreTests(BaseTestCase):
633 """
634 Common tests for {bounded, unbounded} semaphore objects.
635 """
636
637 def test_constructor(self):
638 self.assertRaises(ValueError, self.semtype, value = -1)
639 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
640
641 def test_acquire(self):
642 sem = self.semtype(1)
643 sem.acquire()
644 sem.release()
645 sem = self.semtype(2)
646 sem.acquire()
647 sem.acquire()
648 sem.release()
649 sem.release()
650
651 def test_acquire_destroy(self):
652 sem = self.semtype()
653 sem.acquire()
654 del sem
655
656 def test_acquire_contended(self):
657 sem = self.semtype(7)
658 sem.acquire()
659 N = 10
Garrett Berga0374dd2017-12-07 11:04:26 -0700660 sem_results = []
Antoine Pitrou557934f2009-11-06 22:41:14 +0000661 results1 = []
662 results2 = []
663 phase_num = 0
664 def f():
Garrett Berga0374dd2017-12-07 11:04:26 -0700665 sem_results.append(sem.acquire())
Antoine Pitrou557934f2009-11-06 22:41:14 +0000666 results1.append(phase_num)
Garrett Berga0374dd2017-12-07 11:04:26 -0700667 sem_results.append(sem.acquire())
Antoine Pitrou557934f2009-11-06 22:41:14 +0000668 results2.append(phase_num)
669 b = Bunch(f, 10)
670 b.wait_for_started()
671 while len(results1) + len(results2) < 6:
672 _wait()
673 self.assertEqual(results1 + results2, [0] * 6)
674 phase_num = 1
675 for i in range(7):
676 sem.release()
677 while len(results1) + len(results2) < 13:
678 _wait()
679 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
680 phase_num = 2
681 for i in range(6):
682 sem.release()
683 while len(results1) + len(results2) < 19:
684 _wait()
685 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
686 # The semaphore is still locked
687 self.assertFalse(sem.acquire(False))
688 # Final release, to let the last thread finish
689 sem.release()
690 b.wait_for_finished()
Garrett Berga0374dd2017-12-07 11:04:26 -0700691 self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000692
Raymond Hettinger35f63012019-08-29 01:45:19 -0700693 def test_multirelease(self):
694 sem = self.semtype(7)
695 sem.acquire()
696 results1 = []
697 results2 = []
698 phase_num = 0
699 def f():
700 sem.acquire()
701 results1.append(phase_num)
702 sem.acquire()
703 results2.append(phase_num)
704 b = Bunch(f, 10)
705 b.wait_for_started()
706 while len(results1) + len(results2) < 6:
707 _wait()
708 self.assertEqual(results1 + results2, [0] * 6)
709 phase_num = 1
710 sem.release(7)
711 while len(results1) + len(results2) < 13:
712 _wait()
713 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
714 phase_num = 2
715 sem.release(6)
716 while len(results1) + len(results2) < 19:
717 _wait()
718 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
719 # The semaphore is still locked
720 self.assertFalse(sem.acquire(False))
721 # Final release, to let the last thread finish
722 sem.release()
723 b.wait_for_finished()
724
Antoine Pitrou557934f2009-11-06 22:41:14 +0000725 def test_try_acquire(self):
726 sem = self.semtype(2)
727 self.assertTrue(sem.acquire(False))
728 self.assertTrue(sem.acquire(False))
729 self.assertFalse(sem.acquire(False))
730 sem.release()
731 self.assertTrue(sem.acquire(False))
732
733 def test_try_acquire_contended(self):
734 sem = self.semtype(4)
735 sem.acquire()
736 results = []
737 def f():
738 results.append(sem.acquire(False))
739 results.append(sem.acquire(False))
740 Bunch(f, 5).wait_for_finished()
741 # There can be a thread switch between acquiring the semaphore and
742 # appending the result, therefore results will not necessarily be
743 # ordered.
744 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
745
Antoine Pitrou0454af92010-04-17 23:51:58 +0000746 def test_acquire_timeout(self):
747 sem = self.semtype(2)
748 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
749 self.assertTrue(sem.acquire(timeout=0.005))
750 self.assertTrue(sem.acquire(timeout=0.005))
751 self.assertFalse(sem.acquire(timeout=0.005))
752 sem.release()
753 self.assertTrue(sem.acquire(timeout=0.005))
Victor Stinner2cf4c202018-12-17 09:36:36 +0100754 t = time.monotonic()
Antoine Pitrou0454af92010-04-17 23:51:58 +0000755 self.assertFalse(sem.acquire(timeout=0.5))
Victor Stinner2cf4c202018-12-17 09:36:36 +0100756 dt = time.monotonic() - t
Antoine Pitrou0454af92010-04-17 23:51:58 +0000757 self.assertTimeout(dt, 0.5)
758
Antoine Pitrou557934f2009-11-06 22:41:14 +0000759 def test_default_value(self):
760 # The default initial value is 1.
761 sem = self.semtype()
762 sem.acquire()
763 def f():
764 sem.acquire()
765 sem.release()
766 b = Bunch(f, 1)
767 b.wait_for_started()
768 _wait()
769 self.assertFalse(b.finished)
770 sem.release()
771 b.wait_for_finished()
772
773 def test_with(self):
774 sem = self.semtype(2)
775 def _with(err=None):
776 with sem:
777 self.assertTrue(sem.acquire(False))
778 sem.release()
779 with sem:
780 self.assertFalse(sem.acquire(False))
781 if err:
782 raise err
783 _with()
784 self.assertTrue(sem.acquire(False))
785 sem.release()
786 self.assertRaises(TypeError, _with, TypeError)
787 self.assertTrue(sem.acquire(False))
788 sem.release()
789
790class SemaphoreTests(BaseSemaphoreTests):
791 """
792 Tests for unbounded semaphores.
793 """
794
795 def test_release_unacquired(self):
796 # Unbounded releases are allowed and increment the semaphore's value
797 sem = self.semtype(1)
798 sem.release()
799 sem.acquire()
800 sem.acquire()
801 sem.release()
802
803
804class BoundedSemaphoreTests(BaseSemaphoreTests):
805 """
806 Tests for bounded semaphores.
807 """
808
809 def test_release_unacquired(self):
810 # Cannot go past the initial value
811 sem = self.semtype()
812 self.assertRaises(ValueError, sem.release)
813 sem.acquire()
814 sem.release()
815 self.assertRaises(ValueError, sem.release)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000816
817
818class BarrierTests(BaseTestCase):
819 """
820 Tests for Barrier objects.
821 """
822 N = 5
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000823 defaultTimeout = 2.0
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000824
825 def setUp(self):
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000826 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000827 def tearDown(self):
828 self.barrier.abort()
829
830 def run_threads(self, f):
831 b = Bunch(f, self.N-1)
832 f()
833 b.wait_for_finished()
834
835 def multipass(self, results, n):
836 m = self.barrier.parties
837 self.assertEqual(m, self.N)
838 for i in range(n):
839 results[0].append(True)
840 self.assertEqual(len(results[1]), i * m)
841 self.barrier.wait()
842 results[1].append(True)
843 self.assertEqual(len(results[0]), (i + 1) * m)
844 self.barrier.wait()
845 self.assertEqual(self.barrier.n_waiting, 0)
846 self.assertFalse(self.barrier.broken)
847
848 def test_barrier(self, passes=1):
849 """
850 Test that a barrier is passed in lockstep
851 """
852 results = [[],[]]
853 def f():
854 self.multipass(results, passes)
855 self.run_threads(f)
856
857 def test_barrier_10(self):
858 """
859 Test that a barrier works for 10 consecutive runs
860 """
861 return self.test_barrier(10)
862
863 def test_wait_return(self):
864 """
865 test the return value from barrier.wait
866 """
867 results = []
868 def f():
869 r = self.barrier.wait()
870 results.append(r)
871
872 self.run_threads(f)
873 self.assertEqual(sum(results), sum(range(self.N)))
874
875 def test_action(self):
876 """
877 Test the 'action' callback
878 """
879 results = []
880 def action():
881 results.append(True)
882 barrier = self.barriertype(self.N, action)
883 def f():
884 barrier.wait()
885 self.assertEqual(len(results), 1)
886
887 self.run_threads(f)
888
889 def test_abort(self):
890 """
891 Test that an abort will put the barrier in a broken state
892 """
893 results1 = []
894 results2 = []
895 def f():
896 try:
897 i = self.barrier.wait()
898 if i == self.N//2:
899 raise RuntimeError
900 self.barrier.wait()
901 results1.append(True)
902 except threading.BrokenBarrierError:
903 results2.append(True)
904 except RuntimeError:
905 self.barrier.abort()
906 pass
907
908 self.run_threads(f)
909 self.assertEqual(len(results1), 0)
910 self.assertEqual(len(results2), self.N-1)
911 self.assertTrue(self.barrier.broken)
912
913 def test_reset(self):
914 """
915 Test that a 'reset' on a barrier frees the waiting threads
916 """
917 results1 = []
918 results2 = []
919 results3 = []
920 def f():
921 i = self.barrier.wait()
922 if i == self.N//2:
923 # Wait until the other threads are all in the barrier.
924 while self.barrier.n_waiting < self.N-1:
925 time.sleep(0.001)
926 self.barrier.reset()
927 else:
928 try:
929 self.barrier.wait()
930 results1.append(True)
931 except threading.BrokenBarrierError:
932 results2.append(True)
933 # Now, pass the barrier again
934 self.barrier.wait()
935 results3.append(True)
936
937 self.run_threads(f)
938 self.assertEqual(len(results1), 0)
939 self.assertEqual(len(results2), self.N-1)
940 self.assertEqual(len(results3), self.N)
941
942
943 def test_abort_and_reset(self):
944 """
945 Test that a barrier can be reset after being broken.
946 """
947 results1 = []
948 results2 = []
949 results3 = []
950 barrier2 = self.barriertype(self.N)
951 def f():
952 try:
953 i = self.barrier.wait()
954 if i == self.N//2:
955 raise RuntimeError
956 self.barrier.wait()
957 results1.append(True)
958 except threading.BrokenBarrierError:
959 results2.append(True)
960 except RuntimeError:
961 self.barrier.abort()
962 pass
963 # Synchronize and reset the barrier. Must synchronize first so
964 # that everyone has left it when we reset, and after so that no
965 # one enters it before the reset.
966 if barrier2.wait() == self.N//2:
967 self.barrier.reset()
968 barrier2.wait()
969 self.barrier.wait()
970 results3.append(True)
971
972 self.run_threads(f)
973 self.assertEqual(len(results1), 0)
974 self.assertEqual(len(results2), self.N-1)
975 self.assertEqual(len(results3), self.N)
976
977 def test_timeout(self):
978 """
979 Test wait(timeout)
980 """
981 def f():
982 i = self.barrier.wait()
983 if i == self.N // 2:
984 # One thread is late!
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000985 time.sleep(1.0)
986 # Default timeout is 2.0, so this is shorter.
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000987 self.assertRaises(threading.BrokenBarrierError,
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000988 self.barrier.wait, 0.5)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000989 self.run_threads(f)
990
991 def test_default_timeout(self):
992 """
993 Test the barrier's default timeout
994 """
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200995 # create a barrier with a low default timeout
996 barrier = self.barriertype(self.N, timeout=0.3)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000997 def f():
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000998 i = barrier.wait()
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000999 if i == self.N // 2:
Charles-François Natalid4d1d062011-07-27 21:26:42 +02001000 # One thread is later than the default timeout of 0.3s.
Antoine Pitrou12ae2902010-11-17 21:55:41 +00001001 time.sleep(1.0)
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +00001002 self.assertRaises(threading.BrokenBarrierError, barrier.wait)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +00001003 self.run_threads(f)
1004
1005 def test_single_thread(self):
1006 b = self.barriertype(1)
1007 b.wait()
1008 b.wait()