blob: cd1155d34e996bb9d14c6ef63c71ab52f5b6199f [file] [log] [blame]
Antoine Pitrou557934f2009-11-06 22:41:14 +00001"""
2Various tests for synchronization primitives.
3"""
4
5import sys
6import time
Victor Stinner2a129742011-05-30 23:02:52 +02007from _thread import start_new_thread, TIMEOUT_MAX
Antoine Pitrou557934f2009-11-06 22:41:14 +00008import threading
9import unittest
Raymond Hettinger7836a272015-10-09 00:03:51 -040010import weakref
Antoine Pitrou557934f2009-11-06 22:41:14 +000011
12from test import support
13
14
15def _wait():
16 # A crude wait/yield function not relying on synchronization primitives.
17 time.sleep(0.01)
18
19class Bunch(object):
20 """
21 A bunch of threads.
22 """
23 def __init__(self, f, n, wait_before_exit=False):
24 """
25 Construct a bunch of `n` threads running the same function `f`.
26 If `wait_before_exit` is True, the threads won't terminate until
27 do_finish() is called.
28 """
29 self.f = f
30 self.n = n
31 self.started = []
32 self.finished = []
33 self._can_exit = not wait_before_exit
Victor Stinnerff40ecd2017-09-14 13:07:24 -070034 self.wait_thread = support.wait_threads_exit()
35 self.wait_thread.__enter__()
36
Antoine Pitrou557934f2009-11-06 22:41:14 +000037 def task():
Victor Stinner2a129742011-05-30 23:02:52 +020038 tid = threading.get_ident()
Antoine Pitrou557934f2009-11-06 22:41:14 +000039 self.started.append(tid)
40 try:
41 f()
42 finally:
43 self.finished.append(tid)
44 while not self._can_exit:
45 _wait()
Victor Stinnerff40ecd2017-09-14 13:07:24 -070046
Serhiy Storchaka9db55002015-03-28 20:38:37 +020047 try:
48 for i in range(n):
49 start_new_thread(task, ())
50 except:
51 self._can_exit = True
52 raise
Antoine Pitrou557934f2009-11-06 22:41:14 +000053
54 def wait_for_started(self):
55 while len(self.started) < self.n:
56 _wait()
57
58 def wait_for_finished(self):
59 while len(self.finished) < self.n:
60 _wait()
Victor Stinnerff40ecd2017-09-14 13:07:24 -070061 # Wait for threads exit
62 self.wait_thread.__exit__(None, None, None)
Antoine Pitrou557934f2009-11-06 22:41:14 +000063
64 def do_finish(self):
65 self._can_exit = True
66
67
68class BaseTestCase(unittest.TestCase):
69 def setUp(self):
70 self._threads = support.threading_setup()
71
72 def tearDown(self):
73 support.threading_cleanup(*self._threads)
74 support.reap_children()
75
Antoine Pitrou7c3e5772010-04-14 15:44:10 +000076 def assertTimeout(self, actual, expected):
Victor Stinner2cf4c202018-12-17 09:36:36 +010077 # The waiting and/or time.monotonic() can be imprecise, which
Antoine Pitrou7c3e5772010-04-14 15:44:10 +000078 # is why comparing to the expected value would sometimes fail
79 # (especially under Windows).
80 self.assertGreaterEqual(actual, expected * 0.6)
81 # Test nothing insane happened
82 self.assertLess(actual, expected * 10.0)
83
Antoine Pitrou557934f2009-11-06 22:41:14 +000084
85class BaseLockTests(BaseTestCase):
86 """
87 Tests for both recursive and non-recursive locks.
88 """
89
90 def test_constructor(self):
91 lock = self.locktype()
92 del lock
93
Christian Heimesc5d95b12013-07-30 15:54:39 +020094 def test_repr(self):
95 lock = self.locktype()
Raymond Hettinger62f4dad2014-05-25 18:22:35 -070096 self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
97 del lock
98
99 def test_locked_repr(self):
100 lock = self.locktype()
101 lock.acquire()
102 self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
Christian Heimesc5d95b12013-07-30 15:54:39 +0200103 del lock
104
Antoine Pitrou557934f2009-11-06 22:41:14 +0000105 def test_acquire_destroy(self):
106 lock = self.locktype()
107 lock.acquire()
108 del lock
109
110 def test_acquire_release(self):
111 lock = self.locktype()
112 lock.acquire()
113 lock.release()
114 del lock
115
116 def test_try_acquire(self):
117 lock = self.locktype()
118 self.assertTrue(lock.acquire(False))
119 lock.release()
120
121 def test_try_acquire_contended(self):
122 lock = self.locktype()
123 lock.acquire()
124 result = []
125 def f():
126 result.append(lock.acquire(False))
127 Bunch(f, 1).wait_for_finished()
128 self.assertFalse(result[0])
129 lock.release()
130
131 def test_acquire_contended(self):
132 lock = self.locktype()
133 lock.acquire()
134 N = 5
135 def f():
136 lock.acquire()
137 lock.release()
138
139 b = Bunch(f, N)
140 b.wait_for_started()
141 _wait()
142 self.assertEqual(len(b.finished), 0)
143 lock.release()
144 b.wait_for_finished()
145 self.assertEqual(len(b.finished), N)
146
147 def test_with(self):
148 lock = self.locktype()
149 def f():
150 lock.acquire()
151 lock.release()
152 def _with(err=None):
153 with lock:
154 if err is not None:
155 raise err
156 _with()
157 # Check the lock is unacquired
158 Bunch(f, 1).wait_for_finished()
159 self.assertRaises(TypeError, _with, TypeError)
160 # Check the lock is unacquired
161 Bunch(f, 1).wait_for_finished()
162
Antoine Pitroub0872682009-11-09 16:08:16 +0000163 def test_thread_leak(self):
164 # The lock shouldn't leak a Thread instance when used from a foreign
165 # (non-threading) thread.
166 lock = self.locktype()
167 def f():
168 lock.acquire()
169 lock.release()
170 n = len(threading.enumerate())
171 # We run many threads in the hope that existing threads ids won't
172 # be recycled.
173 Bunch(f, 15).wait_for_finished()
Antoine Pitrou45fdb452011-04-04 21:59:09 +0200174 if len(threading.enumerate()) != n:
175 # There is a small window during which a Thread instance's
176 # target function has finished running, but the Thread is still
177 # alive and registered. Avoid spurious failures by waiting a
178 # bit more (seen on a buildbot).
179 time.sleep(0.4)
180 self.assertEqual(n, len(threading.enumerate()))
Antoine Pitroub0872682009-11-09 16:08:16 +0000181
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000182 def test_timeout(self):
183 lock = self.locktype()
184 # Can't set timeout if not blocking
Serhiy Storchaka1f21eaa2019-09-01 12:16:51 +0300185 self.assertRaises(ValueError, lock.acquire, False, 1)
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000186 # Invalid timeout values
187 self.assertRaises(ValueError, lock.acquire, timeout=-100)
188 self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
189 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
190 # TIMEOUT_MAX is ok
191 lock.acquire(timeout=TIMEOUT_MAX)
192 lock.release()
Victor Stinner2cf4c202018-12-17 09:36:36 +0100193 t1 = time.monotonic()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000194 self.assertTrue(lock.acquire(timeout=5))
Victor Stinner2cf4c202018-12-17 09:36:36 +0100195 t2 = time.monotonic()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000196 # Just a sanity test that it didn't actually wait for the timeout.
197 self.assertLess(t2 - t1, 5)
198 results = []
199 def f():
Victor Stinner2cf4c202018-12-17 09:36:36 +0100200 t1 = time.monotonic()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000201 results.append(lock.acquire(timeout=0.5))
Victor Stinner2cf4c202018-12-17 09:36:36 +0100202 t2 = time.monotonic()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000203 results.append(t2 - t1)
204 Bunch(f, 1).wait_for_finished()
205 self.assertFalse(results[0])
206 self.assertTimeout(results[1], 0.5)
207
Raymond Hettinger7836a272015-10-09 00:03:51 -0400208 def test_weakref_exists(self):
209 lock = self.locktype()
210 ref = weakref.ref(lock)
211 self.assertIsNotNone(ref())
212
213 def test_weakref_deleted(self):
214 lock = self.locktype()
215 ref = weakref.ref(lock)
216 del lock
217 self.assertIsNone(ref())
218
Antoine Pitrou557934f2009-11-06 22:41:14 +0000219
220class LockTests(BaseLockTests):
221 """
222 Tests for non-recursive, weak locks
223 (which can be acquired and released from different threads).
224 """
225 def test_reacquire(self):
226 # Lock needs to be released before re-acquiring.
227 lock = self.locktype()
228 phase = []
Victor Stinnerff40ecd2017-09-14 13:07:24 -0700229
Antoine Pitrou557934f2009-11-06 22:41:14 +0000230 def f():
231 lock.acquire()
232 phase.append(None)
233 lock.acquire()
234 phase.append(None)
Victor Stinnerff40ecd2017-09-14 13:07:24 -0700235
236 with support.wait_threads_exit():
237 start_new_thread(f, ())
238 while len(phase) == 0:
239 _wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000240 _wait()
Victor Stinnerff40ecd2017-09-14 13:07:24 -0700241 self.assertEqual(len(phase), 1)
242 lock.release()
243 while len(phase) == 1:
244 _wait()
245 self.assertEqual(len(phase), 2)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000246
247 def test_different_thread(self):
248 # Lock can be released from a different thread.
249 lock = self.locktype()
250 lock.acquire()
251 def f():
252 lock.release()
253 b = Bunch(f, 1)
254 b.wait_for_finished()
255 lock.acquire()
256 lock.release()
257
Antoine Pitrou7899acf2011-03-31 01:00:32 +0200258 def test_state_after_timeout(self):
259 # Issue #11618: check that lock is in a proper state after a
260 # (non-zero) timeout.
261 lock = self.locktype()
262 lock.acquire()
263 self.assertFalse(lock.acquire(timeout=0.01))
264 lock.release()
265 self.assertFalse(lock.locked())
266 self.assertTrue(lock.acquire(blocking=False))
267
Antoine Pitrou557934f2009-11-06 22:41:14 +0000268
269class RLockTests(BaseLockTests):
270 """
271 Tests for recursive locks.
272 """
273 def test_reacquire(self):
274 lock = self.locktype()
275 lock.acquire()
276 lock.acquire()
277 lock.release()
278 lock.acquire()
279 lock.release()
280 lock.release()
281
282 def test_release_unacquired(self):
283 # Cannot release an unacquired lock
284 lock = self.locktype()
285 self.assertRaises(RuntimeError, lock.release)
286 lock.acquire()
287 lock.acquire()
288 lock.release()
289 lock.acquire()
290 lock.release()
291 lock.release()
292 self.assertRaises(RuntimeError, lock.release)
Antoine Pitrouea3eb882012-05-17 18:55:59 +0200293
294 def test_release_save_unacquired(self):
295 # Cannot _release_save an unacquired lock
296 lock = self.locktype()
297 self.assertRaises(RuntimeError, lock._release_save)
298 lock.acquire()
299 lock.acquire()
300 lock.release()
301 lock.acquire()
302 lock.release()
303 lock.release()
Victor Stinnerc2824d42011-04-24 23:41:33 +0200304 self.assertRaises(RuntimeError, lock._release_save)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000305
306 def test_different_thread(self):
307 # Cannot release from a different thread
308 lock = self.locktype()
309 def f():
310 lock.acquire()
311 b = Bunch(f, 1, True)
312 try:
313 self.assertRaises(RuntimeError, lock.release)
314 finally:
315 b.do_finish()
Victor Stinner096ae332017-09-13 16:41:08 -0700316 b.wait_for_finished()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000317
318 def test__is_owned(self):
319 lock = self.locktype()
320 self.assertFalse(lock._is_owned())
321 lock.acquire()
322 self.assertTrue(lock._is_owned())
323 lock.acquire()
324 self.assertTrue(lock._is_owned())
325 result = []
326 def f():
327 result.append(lock._is_owned())
328 Bunch(f, 1).wait_for_finished()
329 self.assertFalse(result[0])
330 lock.release()
331 self.assertTrue(lock._is_owned())
332 lock.release()
333 self.assertFalse(lock._is_owned())
334
335
336class EventTests(BaseTestCase):
337 """
338 Tests for Event objects.
339 """
340
341 def test_is_set(self):
342 evt = self.eventtype()
343 self.assertFalse(evt.is_set())
344 evt.set()
345 self.assertTrue(evt.is_set())
346 evt.set()
347 self.assertTrue(evt.is_set())
348 evt.clear()
349 self.assertFalse(evt.is_set())
350 evt.clear()
351 self.assertFalse(evt.is_set())
352
353 def _check_notify(self, evt):
354 # All threads get notified
355 N = 5
356 results1 = []
357 results2 = []
358 def f():
359 results1.append(evt.wait())
360 results2.append(evt.wait())
361 b = Bunch(f, N)
362 b.wait_for_started()
363 _wait()
364 self.assertEqual(len(results1), 0)
365 evt.set()
366 b.wait_for_finished()
367 self.assertEqual(results1, [True] * N)
368 self.assertEqual(results2, [True] * N)
369
370 def test_notify(self):
371 evt = self.eventtype()
372 self._check_notify(evt)
373 # Another time, after an explicit clear()
374 evt.set()
375 evt.clear()
376 self._check_notify(evt)
377
378 def test_timeout(self):
379 evt = self.eventtype()
380 results1 = []
381 results2 = []
382 N = 5
383 def f():
384 results1.append(evt.wait(0.0))
Victor Stinner2cf4c202018-12-17 09:36:36 +0100385 t1 = time.monotonic()
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000386 r = evt.wait(0.5)
Victor Stinner2cf4c202018-12-17 09:36:36 +0100387 t2 = time.monotonic()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000388 results2.append((r, t2 - t1))
389 Bunch(f, N).wait_for_finished()
390 self.assertEqual(results1, [False] * N)
391 for r, dt in results2:
392 self.assertFalse(r)
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000393 self.assertTimeout(dt, 0.5)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000394 # The event is set
395 results1 = []
396 results2 = []
397 evt.set()
398 Bunch(f, N).wait_for_finished()
399 self.assertEqual(results1, [True] * N)
400 for r, dt in results2:
401 self.assertTrue(r)
402
Charles-François Natalided03482012-01-07 18:24:56 +0100403 def test_set_and_clear(self):
404 # Issue #13502: check that wait() returns true even when the event is
405 # cleared before the waiting thread is woken up.
406 evt = self.eventtype()
407 results = []
Victor Stinner81950492018-07-19 10:49:58 +0200408 timeout = 0.250
Charles-François Natalided03482012-01-07 18:24:56 +0100409 N = 5
410 def f():
Victor Stinner81950492018-07-19 10:49:58 +0200411 results.append(evt.wait(timeout * 4))
Charles-François Natalided03482012-01-07 18:24:56 +0100412 b = Bunch(f, N)
413 b.wait_for_started()
Victor Stinner81950492018-07-19 10:49:58 +0200414 time.sleep(timeout)
Charles-François Natalided03482012-01-07 18:24:56 +0100415 evt.set()
416 evt.clear()
417 b.wait_for_finished()
418 self.assertEqual(results, [True] * N)
419
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700420 def test_reset_internal_locks(self):
Berker Peksag6d34bbb2016-04-29 17:25:29 +0300421 # ensure that condition is still using a Lock after reset
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700422 evt = self.eventtype()
Berker Peksag6d34bbb2016-04-29 17:25:29 +0300423 with evt._cond:
424 self.assertFalse(evt._cond.acquire(False))
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700425 evt._reset_internal_locks()
Berker Peksag6d34bbb2016-04-29 17:25:29 +0300426 with evt._cond:
427 self.assertFalse(evt._cond.acquire(False))
Benjamin Peterson15982aa2015-10-05 21:56:22 -0700428
Antoine Pitrou557934f2009-11-06 22:41:14 +0000429
430class ConditionTests(BaseTestCase):
431 """
432 Tests for condition variables.
433 """
434
435 def test_acquire(self):
436 cond = self.condtype()
437 # Be default we have an RLock: the condition can be acquired multiple
438 # times.
439 cond.acquire()
440 cond.acquire()
441 cond.release()
442 cond.release()
443 lock = threading.Lock()
444 cond = self.condtype(lock)
445 cond.acquire()
446 self.assertFalse(lock.acquire(False))
447 cond.release()
448 self.assertTrue(lock.acquire(False))
449 self.assertFalse(cond.acquire(False))
450 lock.release()
451 with cond:
452 self.assertFalse(lock.acquire(False))
453
454 def test_unacquired_wait(self):
455 cond = self.condtype()
456 self.assertRaises(RuntimeError, cond.wait)
457
458 def test_unacquired_notify(self):
459 cond = self.condtype()
460 self.assertRaises(RuntimeError, cond.notify)
461
462 def _check_notify(self, cond):
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000463 # Note that this test is sensitive to timing. If the worker threads
464 # don't execute in a timely fashion, the main thread may think they
465 # are further along then they are. The main thread therefore issues
466 # _wait() statements to try to make sure that it doesn't race ahead
467 # of the workers.
468 # Secondly, this test assumes that condition variables are not subject
469 # to spurious wakeups. The absence of spurious wakeups is an implementation
Min ho Kim39d87b52019-08-31 06:21:19 +1000470 # detail of Condition Variables in current CPython, but in general, not
Kristjan Valur Jonsson020af2a2013-11-11 11:29:04 +0000471 # a guaranteed property of condition variables as a programming
472 # construct. In particular, it is possible that this can no longer
473 # be conveniently guaranteed should their implementation ever change.
Antoine Pitrou557934f2009-11-06 22:41:14 +0000474 N = 5
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300475 ready = []
Antoine Pitrou557934f2009-11-06 22:41:14 +0000476 results1 = []
477 results2 = []
478 phase_num = 0
479 def f():
480 cond.acquire()
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300481 ready.append(phase_num)
Georg Brandlb9a43912010-10-28 09:03:20 +0000482 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000483 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000484 results1.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000485 cond.acquire()
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300486 ready.append(phase_num)
Georg Brandlb9a43912010-10-28 09:03:20 +0000487 result = cond.wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000488 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000489 results2.append((result, phase_num))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000490 b = Bunch(f, N)
491 b.wait_for_started()
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300492 # first wait, to ensure all workers settle into cond.wait() before
493 # we continue. See issues #8799 and #30727.
494 while len(ready) < 5:
495 _wait()
496 ready.clear()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000497 self.assertEqual(results1, [])
498 # Notify 3 threads at first
499 cond.acquire()
500 cond.notify(3)
501 _wait()
502 phase_num = 1
503 cond.release()
504 while len(results1) < 3:
505 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000506 self.assertEqual(results1, [(True, 1)] * 3)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000507 self.assertEqual(results2, [])
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300508 # make sure all awaken workers settle into cond.wait()
509 while len(ready) < 3:
510 _wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000511 # Notify 5 threads: they might be in their first or second wait
512 cond.acquire()
513 cond.notify(5)
514 _wait()
515 phase_num = 2
516 cond.release()
517 while len(results1) + len(results2) < 8:
518 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000519 self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
520 self.assertEqual(results2, [(True, 2)] * 3)
Serhiy Storchaka32cb9682017-06-23 13:36:36 +0300521 # make sure all workers settle into cond.wait()
522 while len(ready) < 5:
523 _wait()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000524 # Notify all threads: they are all in their second wait
525 cond.acquire()
526 cond.notify_all()
527 _wait()
528 phase_num = 3
529 cond.release()
530 while len(results2) < 5:
531 _wait()
Georg Brandlb9a43912010-10-28 09:03:20 +0000532 self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
533 self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000534 b.wait_for_finished()
535
536 def test_notify(self):
537 cond = self.condtype()
538 self._check_notify(cond)
539 # A second time, to check internal state is still ok.
540 self._check_notify(cond)
541
542 def test_timeout(self):
543 cond = self.condtype()
544 results = []
545 N = 5
546 def f():
547 cond.acquire()
Victor Stinner2cf4c202018-12-17 09:36:36 +0100548 t1 = time.monotonic()
Georg Brandlb9a43912010-10-28 09:03:20 +0000549 result = cond.wait(0.5)
Victor Stinner2cf4c202018-12-17 09:36:36 +0100550 t2 = time.monotonic()
Antoine Pitrou557934f2009-11-06 22:41:14 +0000551 cond.release()
Georg Brandlb9a43912010-10-28 09:03:20 +0000552 results.append((t2 - t1, result))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000553 Bunch(f, N).wait_for_finished()
Georg Brandlb9a43912010-10-28 09:03:20 +0000554 self.assertEqual(len(results), N)
555 for dt, result in results:
Antoine Pitrou7c3e5772010-04-14 15:44:10 +0000556 self.assertTimeout(dt, 0.5)
Georg Brandlb9a43912010-10-28 09:03:20 +0000557 # Note that conceptually (that"s the condition variable protocol)
558 # a wait() may succeed even if no one notifies us and before any
559 # timeout occurs. Spurious wakeups can occur.
560 # This makes it hard to verify the result value.
561 # In practice, this implementation has no spurious wakeups.
562 self.assertFalse(result)
Antoine Pitrou557934f2009-11-06 22:41:14 +0000563
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000564 def test_waitfor(self):
565 cond = self.condtype()
566 state = 0
567 def f():
568 with cond:
569 result = cond.wait_for(lambda : state==4)
570 self.assertTrue(result)
571 self.assertEqual(state, 4)
572 b = Bunch(f, 1)
573 b.wait_for_started()
Victor Stinner3349bca2011-05-18 00:16:14 +0200574 for i in range(4):
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000575 time.sleep(0.01)
576 with cond:
577 state += 1
578 cond.notify()
579 b.wait_for_finished()
580
581 def test_waitfor_timeout(self):
582 cond = self.condtype()
583 state = 0
584 success = []
585 def f():
586 with cond:
Victor Stinner2cf4c202018-12-17 09:36:36 +0100587 dt = time.monotonic()
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000588 result = cond.wait_for(lambda : state==4, timeout=0.1)
Victor Stinner2cf4c202018-12-17 09:36:36 +0100589 dt = time.monotonic() - dt
Kristján Valur Jónsson63315202010-11-18 12:46:39 +0000590 self.assertFalse(result)
591 self.assertTimeout(dt, 0.1)
592 success.append(None)
593 b = Bunch(f, 1)
594 b.wait_for_started()
595 # Only increment 3 times, so state == 4 is never reached.
596 for i in range(3):
597 time.sleep(0.01)
598 with cond:
599 state += 1
600 cond.notify()
601 b.wait_for_finished()
602 self.assertEqual(len(success), 1)
603
Antoine Pitrou557934f2009-11-06 22:41:14 +0000604
605class BaseSemaphoreTests(BaseTestCase):
606 """
607 Common tests for {bounded, unbounded} semaphore objects.
608 """
609
610 def test_constructor(self):
611 self.assertRaises(ValueError, self.semtype, value = -1)
612 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
613
614 def test_acquire(self):
615 sem = self.semtype(1)
616 sem.acquire()
617 sem.release()
618 sem = self.semtype(2)
619 sem.acquire()
620 sem.acquire()
621 sem.release()
622 sem.release()
623
624 def test_acquire_destroy(self):
625 sem = self.semtype()
626 sem.acquire()
627 del sem
628
629 def test_acquire_contended(self):
630 sem = self.semtype(7)
631 sem.acquire()
632 N = 10
Garrett Berga0374dd2017-12-07 11:04:26 -0700633 sem_results = []
Antoine Pitrou557934f2009-11-06 22:41:14 +0000634 results1 = []
635 results2 = []
636 phase_num = 0
637 def f():
Garrett Berga0374dd2017-12-07 11:04:26 -0700638 sem_results.append(sem.acquire())
Antoine Pitrou557934f2009-11-06 22:41:14 +0000639 results1.append(phase_num)
Garrett Berga0374dd2017-12-07 11:04:26 -0700640 sem_results.append(sem.acquire())
Antoine Pitrou557934f2009-11-06 22:41:14 +0000641 results2.append(phase_num)
642 b = Bunch(f, 10)
643 b.wait_for_started()
644 while len(results1) + len(results2) < 6:
645 _wait()
646 self.assertEqual(results1 + results2, [0] * 6)
647 phase_num = 1
648 for i in range(7):
649 sem.release()
650 while len(results1) + len(results2) < 13:
651 _wait()
652 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
653 phase_num = 2
654 for i in range(6):
655 sem.release()
656 while len(results1) + len(results2) < 19:
657 _wait()
658 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
659 # The semaphore is still locked
660 self.assertFalse(sem.acquire(False))
661 # Final release, to let the last thread finish
662 sem.release()
663 b.wait_for_finished()
Garrett Berga0374dd2017-12-07 11:04:26 -0700664 self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1))
Antoine Pitrou557934f2009-11-06 22:41:14 +0000665
Raymond Hettinger35f63012019-08-29 01:45:19 -0700666 def test_multirelease(self):
667 sem = self.semtype(7)
668 sem.acquire()
669 results1 = []
670 results2 = []
671 phase_num = 0
672 def f():
673 sem.acquire()
674 results1.append(phase_num)
675 sem.acquire()
676 results2.append(phase_num)
677 b = Bunch(f, 10)
678 b.wait_for_started()
679 while len(results1) + len(results2) < 6:
680 _wait()
681 self.assertEqual(results1 + results2, [0] * 6)
682 phase_num = 1
683 sem.release(7)
684 while len(results1) + len(results2) < 13:
685 _wait()
686 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
687 phase_num = 2
688 sem.release(6)
689 while len(results1) + len(results2) < 19:
690 _wait()
691 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
692 # The semaphore is still locked
693 self.assertFalse(sem.acquire(False))
694 # Final release, to let the last thread finish
695 sem.release()
696 b.wait_for_finished()
697
Antoine Pitrou557934f2009-11-06 22:41:14 +0000698 def test_try_acquire(self):
699 sem = self.semtype(2)
700 self.assertTrue(sem.acquire(False))
701 self.assertTrue(sem.acquire(False))
702 self.assertFalse(sem.acquire(False))
703 sem.release()
704 self.assertTrue(sem.acquire(False))
705
706 def test_try_acquire_contended(self):
707 sem = self.semtype(4)
708 sem.acquire()
709 results = []
710 def f():
711 results.append(sem.acquire(False))
712 results.append(sem.acquire(False))
713 Bunch(f, 5).wait_for_finished()
714 # There can be a thread switch between acquiring the semaphore and
715 # appending the result, therefore results will not necessarily be
716 # ordered.
717 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
718
Antoine Pitrou0454af92010-04-17 23:51:58 +0000719 def test_acquire_timeout(self):
720 sem = self.semtype(2)
721 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
722 self.assertTrue(sem.acquire(timeout=0.005))
723 self.assertTrue(sem.acquire(timeout=0.005))
724 self.assertFalse(sem.acquire(timeout=0.005))
725 sem.release()
726 self.assertTrue(sem.acquire(timeout=0.005))
Victor Stinner2cf4c202018-12-17 09:36:36 +0100727 t = time.monotonic()
Antoine Pitrou0454af92010-04-17 23:51:58 +0000728 self.assertFalse(sem.acquire(timeout=0.5))
Victor Stinner2cf4c202018-12-17 09:36:36 +0100729 dt = time.monotonic() - t
Antoine Pitrou0454af92010-04-17 23:51:58 +0000730 self.assertTimeout(dt, 0.5)
731
Antoine Pitrou557934f2009-11-06 22:41:14 +0000732 def test_default_value(self):
733 # The default initial value is 1.
734 sem = self.semtype()
735 sem.acquire()
736 def f():
737 sem.acquire()
738 sem.release()
739 b = Bunch(f, 1)
740 b.wait_for_started()
741 _wait()
742 self.assertFalse(b.finished)
743 sem.release()
744 b.wait_for_finished()
745
746 def test_with(self):
747 sem = self.semtype(2)
748 def _with(err=None):
749 with sem:
750 self.assertTrue(sem.acquire(False))
751 sem.release()
752 with sem:
753 self.assertFalse(sem.acquire(False))
754 if err:
755 raise err
756 _with()
757 self.assertTrue(sem.acquire(False))
758 sem.release()
759 self.assertRaises(TypeError, _with, TypeError)
760 self.assertTrue(sem.acquire(False))
761 sem.release()
762
763class SemaphoreTests(BaseSemaphoreTests):
764 """
765 Tests for unbounded semaphores.
766 """
767
768 def test_release_unacquired(self):
769 # Unbounded releases are allowed and increment the semaphore's value
770 sem = self.semtype(1)
771 sem.release()
772 sem.acquire()
773 sem.acquire()
774 sem.release()
775
776
777class BoundedSemaphoreTests(BaseSemaphoreTests):
778 """
779 Tests for bounded semaphores.
780 """
781
782 def test_release_unacquired(self):
783 # Cannot go past the initial value
784 sem = self.semtype()
785 self.assertRaises(ValueError, sem.release)
786 sem.acquire()
787 sem.release()
788 self.assertRaises(ValueError, sem.release)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000789
790
791class BarrierTests(BaseTestCase):
792 """
793 Tests for Barrier objects.
794 """
795 N = 5
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000796 defaultTimeout = 2.0
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000797
798 def setUp(self):
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000799 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000800 def tearDown(self):
801 self.barrier.abort()
802
803 def run_threads(self, f):
804 b = Bunch(f, self.N-1)
805 f()
806 b.wait_for_finished()
807
808 def multipass(self, results, n):
809 m = self.barrier.parties
810 self.assertEqual(m, self.N)
811 for i in range(n):
812 results[0].append(True)
813 self.assertEqual(len(results[1]), i * m)
814 self.barrier.wait()
815 results[1].append(True)
816 self.assertEqual(len(results[0]), (i + 1) * m)
817 self.barrier.wait()
818 self.assertEqual(self.barrier.n_waiting, 0)
819 self.assertFalse(self.barrier.broken)
820
821 def test_barrier(self, passes=1):
822 """
823 Test that a barrier is passed in lockstep
824 """
825 results = [[],[]]
826 def f():
827 self.multipass(results, passes)
828 self.run_threads(f)
829
830 def test_barrier_10(self):
831 """
832 Test that a barrier works for 10 consecutive runs
833 """
834 return self.test_barrier(10)
835
836 def test_wait_return(self):
837 """
838 test the return value from barrier.wait
839 """
840 results = []
841 def f():
842 r = self.barrier.wait()
843 results.append(r)
844
845 self.run_threads(f)
846 self.assertEqual(sum(results), sum(range(self.N)))
847
848 def test_action(self):
849 """
850 Test the 'action' callback
851 """
852 results = []
853 def action():
854 results.append(True)
855 barrier = self.barriertype(self.N, action)
856 def f():
857 barrier.wait()
858 self.assertEqual(len(results), 1)
859
860 self.run_threads(f)
861
862 def test_abort(self):
863 """
864 Test that an abort will put the barrier in a broken state
865 """
866 results1 = []
867 results2 = []
868 def f():
869 try:
870 i = self.barrier.wait()
871 if i == self.N//2:
872 raise RuntimeError
873 self.barrier.wait()
874 results1.append(True)
875 except threading.BrokenBarrierError:
876 results2.append(True)
877 except RuntimeError:
878 self.barrier.abort()
879 pass
880
881 self.run_threads(f)
882 self.assertEqual(len(results1), 0)
883 self.assertEqual(len(results2), self.N-1)
884 self.assertTrue(self.barrier.broken)
885
886 def test_reset(self):
887 """
888 Test that a 'reset' on a barrier frees the waiting threads
889 """
890 results1 = []
891 results2 = []
892 results3 = []
893 def f():
894 i = self.barrier.wait()
895 if i == self.N//2:
896 # Wait until the other threads are all in the barrier.
897 while self.barrier.n_waiting < self.N-1:
898 time.sleep(0.001)
899 self.barrier.reset()
900 else:
901 try:
902 self.barrier.wait()
903 results1.append(True)
904 except threading.BrokenBarrierError:
905 results2.append(True)
906 # Now, pass the barrier again
907 self.barrier.wait()
908 results3.append(True)
909
910 self.run_threads(f)
911 self.assertEqual(len(results1), 0)
912 self.assertEqual(len(results2), self.N-1)
913 self.assertEqual(len(results3), self.N)
914
915
916 def test_abort_and_reset(self):
917 """
918 Test that a barrier can be reset after being broken.
919 """
920 results1 = []
921 results2 = []
922 results3 = []
923 barrier2 = self.barriertype(self.N)
924 def f():
925 try:
926 i = self.barrier.wait()
927 if i == self.N//2:
928 raise RuntimeError
929 self.barrier.wait()
930 results1.append(True)
931 except threading.BrokenBarrierError:
932 results2.append(True)
933 except RuntimeError:
934 self.barrier.abort()
935 pass
936 # Synchronize and reset the barrier. Must synchronize first so
937 # that everyone has left it when we reset, and after so that no
938 # one enters it before the reset.
939 if barrier2.wait() == self.N//2:
940 self.barrier.reset()
941 barrier2.wait()
942 self.barrier.wait()
943 results3.append(True)
944
945 self.run_threads(f)
946 self.assertEqual(len(results1), 0)
947 self.assertEqual(len(results2), self.N-1)
948 self.assertEqual(len(results3), self.N)
949
950 def test_timeout(self):
951 """
952 Test wait(timeout)
953 """
954 def f():
955 i = self.barrier.wait()
956 if i == self.N // 2:
957 # One thread is late!
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000958 time.sleep(1.0)
959 # Default timeout is 2.0, so this is shorter.
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000960 self.assertRaises(threading.BrokenBarrierError,
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000961 self.barrier.wait, 0.5)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000962 self.run_threads(f)
963
964 def test_default_timeout(self):
965 """
966 Test the barrier's default timeout
967 """
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200968 # create a barrier with a low default timeout
969 barrier = self.barriertype(self.N, timeout=0.3)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000970 def f():
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000971 i = barrier.wait()
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000972 if i == self.N // 2:
Charles-François Natalid4d1d062011-07-27 21:26:42 +0200973 # One thread is later than the default timeout of 0.3s.
Antoine Pitrou12ae2902010-11-17 21:55:41 +0000974 time.sleep(1.0)
Kristján Valur Jónssonf53a6262010-10-31 03:00:57 +0000975 self.assertRaises(threading.BrokenBarrierError, barrier.wait)
Kristján Valur Jónsson3be00032010-10-28 09:43:10 +0000976 self.run_threads(f)
977
978 def test_single_thread(self):
979 b = self.barriertype(1)
980 b.wait()
981 b.wait()