blob: 22c690059297a8ceb3c5db90ff5cfc4be01de64f [file] [log] [blame]
Antoine Pitrou959f3e52009-11-09 16:52:46 +00001"""
2Various tests for synchronization primitives.
3"""
4
5import sys
6import time
7from _thread import start_new_thread, get_ident
8import threading
9import unittest
10
11from test import support
12
13
14def _wait():
15 # A crude wait/yield function not relying on synchronization primitives.
16 time.sleep(0.01)
17
18class Bunch(object):
19 """
20 A bunch of threads.
21 """
22 def __init__(self, f, n, wait_before_exit=False):
23 """
24 Construct a bunch of `n` threads running the same function `f`.
25 If `wait_before_exit` is True, the threads won't terminate until
26 do_finish() is called.
27 """
28 self.f = f
29 self.n = n
30 self.started = []
31 self.finished = []
32 self._can_exit = not wait_before_exit
33 def task():
34 tid = get_ident()
35 self.started.append(tid)
36 try:
37 f()
38 finally:
39 self.finished.append(tid)
40 while not self._can_exit:
41 _wait()
42 for i in range(n):
43 start_new_thread(task, ())
44
45 def wait_for_started(self):
46 while len(self.started) < self.n:
47 _wait()
48
49 def wait_for_finished(self):
50 while len(self.finished) < self.n:
51 _wait()
52
53 def do_finish(self):
54 self._can_exit = True
55
56
57class BaseTestCase(unittest.TestCase):
58 def setUp(self):
59 self._threads = support.threading_setup()
60
61 def tearDown(self):
62 support.threading_cleanup(*self._threads)
63 support.reap_children()
64
65
66class BaseLockTests(BaseTestCase):
67 """
68 Tests for both recursive and non-recursive locks.
69 """
70
71 def test_constructor(self):
72 lock = self.locktype()
73 del lock
74
75 def test_acquire_destroy(self):
76 lock = self.locktype()
77 lock.acquire()
78 del lock
79
80 def test_acquire_release(self):
81 lock = self.locktype()
82 lock.acquire()
83 lock.release()
84 del lock
85
86 def test_try_acquire(self):
87 lock = self.locktype()
88 self.assertTrue(lock.acquire(False))
89 lock.release()
90
91 def test_try_acquire_contended(self):
92 lock = self.locktype()
93 lock.acquire()
94 result = []
95 def f():
96 result.append(lock.acquire(False))
97 Bunch(f, 1).wait_for_finished()
98 self.assertFalse(result[0])
99 lock.release()
100
101 def test_acquire_contended(self):
102 lock = self.locktype()
103 lock.acquire()
104 N = 5
105 def f():
106 lock.acquire()
107 lock.release()
108
109 b = Bunch(f, N)
110 b.wait_for_started()
111 _wait()
112 self.assertEqual(len(b.finished), 0)
113 lock.release()
114 b.wait_for_finished()
115 self.assertEqual(len(b.finished), N)
116
117 def test_with(self):
118 lock = self.locktype()
119 def f():
120 lock.acquire()
121 lock.release()
122 def _with(err=None):
123 with lock:
124 if err is not None:
125 raise err
126 _with()
127 # Check the lock is unacquired
128 Bunch(f, 1).wait_for_finished()
129 self.assertRaises(TypeError, _with, TypeError)
130 # Check the lock is unacquired
131 Bunch(f, 1).wait_for_finished()
132
133 def test_thread_leak(self):
134 # The lock shouldn't leak a Thread instance when used from a foreign
135 # (non-threading) thread.
136 lock = self.locktype()
137 def f():
138 lock.acquire()
139 lock.release()
140 n = len(threading.enumerate())
141 # We run many threads in the hope that existing threads ids won't
142 # be recycled.
143 Bunch(f, 15).wait_for_finished()
Antoine Pitrou45fdb452011-04-04 21:59:09 +0200144 if len(threading.enumerate()) != n:
145 # There is a small window during which a Thread instance's
146 # target function has finished running, but the Thread is still
147 # alive and registered. Avoid spurious failures by waiting a
148 # bit more (seen on a buildbot).
149 time.sleep(0.4)
150 self.assertEqual(n, len(threading.enumerate()))
Antoine Pitrou959f3e52009-11-09 16:52:46 +0000151
152
153class LockTests(BaseLockTests):
154 """
155 Tests for non-recursive, weak locks
156 (which can be acquired and released from different threads).
157 """
158 def test_reacquire(self):
159 # Lock needs to be released before re-acquiring.
160 lock = self.locktype()
161 phase = []
162 def f():
163 lock.acquire()
164 phase.append(None)
165 lock.acquire()
166 phase.append(None)
167 start_new_thread(f, ())
168 while len(phase) == 0:
169 _wait()
170 _wait()
171 self.assertEqual(len(phase), 1)
172 lock.release()
173 while len(phase) == 1:
174 _wait()
175 self.assertEqual(len(phase), 2)
176
177 def test_different_thread(self):
178 # Lock can be released from a different thread.
179 lock = self.locktype()
180 lock.acquire()
181 def f():
182 lock.release()
183 b = Bunch(f, 1)
184 b.wait_for_finished()
185 lock.acquire()
186 lock.release()
187
188
189class RLockTests(BaseLockTests):
190 """
191 Tests for recursive locks.
192 """
193 def test_reacquire(self):
194 lock = self.locktype()
195 lock.acquire()
196 lock.acquire()
197 lock.release()
198 lock.acquire()
199 lock.release()
200 lock.release()
201
202 def test_release_unacquired(self):
203 # Cannot release an unacquired lock
204 lock = self.locktype()
205 self.assertRaises(RuntimeError, lock.release)
206 lock.acquire()
207 lock.acquire()
208 lock.release()
209 lock.acquire()
210 lock.release()
211 lock.release()
212 self.assertRaises(RuntimeError, lock.release)
213
214 def test_different_thread(self):
215 # Cannot release from a different thread
216 lock = self.locktype()
217 def f():
218 lock.acquire()
219 b = Bunch(f, 1, True)
220 try:
221 self.assertRaises(RuntimeError, lock.release)
222 finally:
223 b.do_finish()
224
225 def test__is_owned(self):
226 lock = self.locktype()
227 self.assertFalse(lock._is_owned())
228 lock.acquire()
229 self.assertTrue(lock._is_owned())
230 lock.acquire()
231 self.assertTrue(lock._is_owned())
232 result = []
233 def f():
234 result.append(lock._is_owned())
235 Bunch(f, 1).wait_for_finished()
236 self.assertFalse(result[0])
237 lock.release()
238 self.assertTrue(lock._is_owned())
239 lock.release()
240 self.assertFalse(lock._is_owned())
241
242
243class EventTests(BaseTestCase):
244 """
245 Tests for Event objects.
246 """
247
248 def test_is_set(self):
249 evt = self.eventtype()
250 self.assertFalse(evt.is_set())
251 evt.set()
252 self.assertTrue(evt.is_set())
253 evt.set()
254 self.assertTrue(evt.is_set())
255 evt.clear()
256 self.assertFalse(evt.is_set())
257 evt.clear()
258 self.assertFalse(evt.is_set())
259
260 def _check_notify(self, evt):
261 # All threads get notified
262 N = 5
263 results1 = []
264 results2 = []
265 def f():
266 results1.append(evt.wait())
267 results2.append(evt.wait())
268 b = Bunch(f, N)
269 b.wait_for_started()
270 _wait()
271 self.assertEqual(len(results1), 0)
272 evt.set()
273 b.wait_for_finished()
274 self.assertEqual(results1, [True] * N)
275 self.assertEqual(results2, [True] * N)
276
277 def test_notify(self):
278 evt = self.eventtype()
279 self._check_notify(evt)
280 # Another time, after an explicit clear()
281 evt.set()
282 evt.clear()
283 self._check_notify(evt)
284
285 def test_timeout(self):
286 evt = self.eventtype()
287 results1 = []
288 results2 = []
289 N = 5
290 def f():
291 results1.append(evt.wait(0.0))
292 t1 = time.time()
293 r = evt.wait(0.2)
294 t2 = time.time()
295 results2.append((r, t2 - t1))
296 Bunch(f, N).wait_for_finished()
297 self.assertEqual(results1, [False] * N)
298 for r, dt in results2:
299 self.assertFalse(r)
300 self.assertTrue(dt >= 0.2, dt)
301 # The event is set
302 results1 = []
303 results2 = []
304 evt.set()
305 Bunch(f, N).wait_for_finished()
306 self.assertEqual(results1, [True] * N)
307 for r, dt in results2:
308 self.assertTrue(r)
309
310
311class ConditionTests(BaseTestCase):
312 """
313 Tests for condition variables.
314 """
315
316 def test_acquire(self):
317 cond = self.condtype()
318 # Be default we have an RLock: the condition can be acquired multiple
319 # times.
320 cond.acquire()
321 cond.acquire()
322 cond.release()
323 cond.release()
324 lock = threading.Lock()
325 cond = self.condtype(lock)
326 cond.acquire()
327 self.assertFalse(lock.acquire(False))
328 cond.release()
329 self.assertTrue(lock.acquire(False))
330 self.assertFalse(cond.acquire(False))
331 lock.release()
332 with cond:
333 self.assertFalse(lock.acquire(False))
334
335 def test_unacquired_wait(self):
336 cond = self.condtype()
337 self.assertRaises(RuntimeError, cond.wait)
338
339 def test_unacquired_notify(self):
340 cond = self.condtype()
341 self.assertRaises(RuntimeError, cond.notify)
342
343 def _check_notify(self, cond):
344 N = 5
345 results1 = []
346 results2 = []
347 phase_num = 0
348 def f():
349 cond.acquire()
350 cond.wait()
351 cond.release()
352 results1.append(phase_num)
353 cond.acquire()
354 cond.wait()
355 cond.release()
356 results2.append(phase_num)
357 b = Bunch(f, N)
358 b.wait_for_started()
359 _wait()
360 self.assertEqual(results1, [])
361 # Notify 3 threads at first
362 cond.acquire()
363 cond.notify(3)
364 _wait()
365 phase_num = 1
366 cond.release()
367 while len(results1) < 3:
368 _wait()
369 self.assertEqual(results1, [1] * 3)
370 self.assertEqual(results2, [])
371 # Notify 5 threads: they might be in their first or second wait
372 cond.acquire()
373 cond.notify(5)
374 _wait()
375 phase_num = 2
376 cond.release()
377 while len(results1) + len(results2) < 8:
378 _wait()
379 self.assertEqual(results1, [1] * 3 + [2] * 2)
380 self.assertEqual(results2, [2] * 3)
381 # Notify all threads: they are all in their second wait
382 cond.acquire()
383 cond.notify_all()
384 _wait()
385 phase_num = 3
386 cond.release()
387 while len(results2) < 5:
388 _wait()
389 self.assertEqual(results1, [1] * 3 + [2] * 2)
390 self.assertEqual(results2, [2] * 3 + [3] * 2)
391 b.wait_for_finished()
392
393 def test_notify(self):
394 cond = self.condtype()
395 self._check_notify(cond)
396 # A second time, to check internal state is still ok.
397 self._check_notify(cond)
398
399 def test_timeout(self):
400 cond = self.condtype()
401 results = []
402 N = 5
403 def f():
404 cond.acquire()
405 t1 = time.time()
406 cond.wait(0.2)
407 t2 = time.time()
408 cond.release()
409 results.append(t2 - t1)
410 Bunch(f, N).wait_for_finished()
411 self.assertEqual(len(results), 5)
412 for dt in results:
413 self.assertTrue(dt >= 0.2, dt)
414
415
416class BaseSemaphoreTests(BaseTestCase):
417 """
418 Common tests for {bounded, unbounded} semaphore objects.
419 """
420
421 def test_constructor(self):
422 self.assertRaises(ValueError, self.semtype, value = -1)
423 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
424
425 def test_acquire(self):
426 sem = self.semtype(1)
427 sem.acquire()
428 sem.release()
429 sem = self.semtype(2)
430 sem.acquire()
431 sem.acquire()
432 sem.release()
433 sem.release()
434
435 def test_acquire_destroy(self):
436 sem = self.semtype()
437 sem.acquire()
438 del sem
439
440 def test_acquire_contended(self):
441 sem = self.semtype(7)
442 sem.acquire()
443 N = 10
444 results1 = []
445 results2 = []
446 phase_num = 0
447 def f():
448 sem.acquire()
449 results1.append(phase_num)
450 sem.acquire()
451 results2.append(phase_num)
452 b = Bunch(f, 10)
453 b.wait_for_started()
454 while len(results1) + len(results2) < 6:
455 _wait()
456 self.assertEqual(results1 + results2, [0] * 6)
457 phase_num = 1
458 for i in range(7):
459 sem.release()
460 while len(results1) + len(results2) < 13:
461 _wait()
462 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
463 phase_num = 2
464 for i in range(6):
465 sem.release()
466 while len(results1) + len(results2) < 19:
467 _wait()
468 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
469 # The semaphore is still locked
470 self.assertFalse(sem.acquire(False))
471 # Final release, to let the last thread finish
472 sem.release()
473 b.wait_for_finished()
474
475 def test_try_acquire(self):
476 sem = self.semtype(2)
477 self.assertTrue(sem.acquire(False))
478 self.assertTrue(sem.acquire(False))
479 self.assertFalse(sem.acquire(False))
480 sem.release()
481 self.assertTrue(sem.acquire(False))
482
483 def test_try_acquire_contended(self):
484 sem = self.semtype(4)
485 sem.acquire()
486 results = []
487 def f():
488 results.append(sem.acquire(False))
489 results.append(sem.acquire(False))
490 Bunch(f, 5).wait_for_finished()
491 # There can be a thread switch between acquiring the semaphore and
492 # appending the result, therefore results will not necessarily be
493 # ordered.
494 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
495
496 def test_default_value(self):
497 # The default initial value is 1.
498 sem = self.semtype()
499 sem.acquire()
500 def f():
501 sem.acquire()
502 sem.release()
503 b = Bunch(f, 1)
504 b.wait_for_started()
505 _wait()
506 self.assertFalse(b.finished)
507 sem.release()
508 b.wait_for_finished()
509
510 def test_with(self):
511 sem = self.semtype(2)
512 def _with(err=None):
513 with sem:
514 self.assertTrue(sem.acquire(False))
515 sem.release()
516 with sem:
517 self.assertFalse(sem.acquire(False))
518 if err:
519 raise err
520 _with()
521 self.assertTrue(sem.acquire(False))
522 sem.release()
523 self.assertRaises(TypeError, _with, TypeError)
524 self.assertTrue(sem.acquire(False))
525 sem.release()
526
527class SemaphoreTests(BaseSemaphoreTests):
528 """
529 Tests for unbounded semaphores.
530 """
531
532 def test_release_unacquired(self):
533 # Unbounded releases are allowed and increment the semaphore's value
534 sem = self.semtype(1)
535 sem.release()
536 sem.acquire()
537 sem.acquire()
538 sem.release()
539
540
541class BoundedSemaphoreTests(BaseSemaphoreTests):
542 """
543 Tests for bounded semaphores.
544 """
545
546 def test_release_unacquired(self):
547 # Cannot go past the initial value
548 sem = self.semtype()
549 self.assertRaises(ValueError, sem.release)
550 sem.acquire()
551 sem.release()
552 self.assertRaises(ValueError, sem.release)