Merged revisions 76138,76173 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/branches/py3k

................
  r76138 | antoine.pitrou | 2009-11-06 23:41:14 +0100 (ven., 06 nov. 2009) | 10 lines

  Merged revisions 76137 via svnmerge from
  svn+ssh://pythondev@svn.python.org/python/trunk

  ........
    r76137 | antoine.pitrou | 2009-11-06 23:34:35 +0100 (ven., 06 nov. 2009) | 4 lines

    Issue #7270: Add some dedicated unit tests for multi-thread synchronization
    primitives such as Lock, RLock, Condition, Event and Semaphore.
  ........
................
  r76173 | antoine.pitrou | 2009-11-09 17:08:16 +0100 (lun., 09 nov. 2009) | 11 lines

  Merged revisions 76172 via svnmerge from
  svn+ssh://pythondev@svn.python.org/python/trunk

  ........
    r76172 | antoine.pitrou | 2009-11-09 17:00:11 +0100 (lun., 09 nov. 2009) | 5 lines

    Issue #7282: Fix a memory leak when an RLock was used in a thread other
    than those started through `threading.Thread` (for example, using
    `thread.start_new_thread()`.
  ........
................
diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py
new file mode 100644
index 0000000..04f7422
--- /dev/null
+++ b/Lib/test/lock_tests.py
@@ -0,0 +1,546 @@
+"""
+Various tests for synchronization primitives.
+"""
+
+import sys
+import time
+from _thread import start_new_thread, get_ident
+import threading
+import unittest
+
+from test import support
+
+
+def _wait():
+    # A crude wait/yield function not relying on synchronization primitives.
+    time.sleep(0.01)
+
+class Bunch(object):
+    """
+    A bunch of threads.
+    """
+    def __init__(self, f, n, wait_before_exit=False):
+        """
+        Construct a bunch of `n` threads running the same function `f`.
+        If `wait_before_exit` is True, the threads won't terminate until
+        do_finish() is called.
+        """
+        self.f = f
+        self.n = n
+        self.started = []
+        self.finished = []
+        self._can_exit = not wait_before_exit
+        def task():
+            tid = get_ident()
+            self.started.append(tid)
+            try:
+                f()
+            finally:
+                self.finished.append(tid)
+                while not self._can_exit:
+                    _wait()
+        for i in range(n):
+            start_new_thread(task, ())
+
+    def wait_for_started(self):
+        while len(self.started) < self.n:
+            _wait()
+
+    def wait_for_finished(self):
+        while len(self.finished) < self.n:
+            _wait()
+
+    def do_finish(self):
+        self._can_exit = True
+
+
+class BaseTestCase(unittest.TestCase):
+    def setUp(self):
+        self._threads = support.threading_setup()
+
+    def tearDown(self):
+        support.threading_cleanup(*self._threads)
+        support.reap_children()
+
+
+class BaseLockTests(BaseTestCase):
+    """
+    Tests for both recursive and non-recursive locks.
+    """
+
+    def test_constructor(self):
+        lock = self.locktype()
+        del lock
+
+    def test_acquire_destroy(self):
+        lock = self.locktype()
+        lock.acquire()
+        del lock
+
+    def test_acquire_release(self):
+        lock = self.locktype()
+        lock.acquire()
+        lock.release()
+        del lock
+
+    def test_try_acquire(self):
+        lock = self.locktype()
+        self.assertTrue(lock.acquire(False))
+        lock.release()
+
+    def test_try_acquire_contended(self):
+        lock = self.locktype()
+        lock.acquire()
+        result = []
+        def f():
+            result.append(lock.acquire(False))
+        Bunch(f, 1).wait_for_finished()
+        self.assertFalse(result[0])
+        lock.release()
+
+    def test_acquire_contended(self):
+        lock = self.locktype()
+        lock.acquire()
+        N = 5
+        def f():
+            lock.acquire()
+            lock.release()
+
+        b = Bunch(f, N)
+        b.wait_for_started()
+        _wait()
+        self.assertEqual(len(b.finished), 0)
+        lock.release()
+        b.wait_for_finished()
+        self.assertEqual(len(b.finished), N)
+
+    def test_with(self):
+        lock = self.locktype()
+        def f():
+            lock.acquire()
+            lock.release()
+        def _with(err=None):
+            with lock:
+                if err is not None:
+                    raise err
+        _with()
+        # Check the lock is unacquired
+        Bunch(f, 1).wait_for_finished()
+        self.assertRaises(TypeError, _with, TypeError)
+        # Check the lock is unacquired
+        Bunch(f, 1).wait_for_finished()
+
+    def test_thread_leak(self):
+        # The lock shouldn't leak a Thread instance when used from a foreign
+        # (non-threading) thread.
+        lock = self.locktype()
+        def f():
+            lock.acquire()
+            lock.release()
+        n = len(threading.enumerate())
+        # We run many threads in the hope that existing threads ids won't
+        # be recycled.
+        Bunch(f, 15).wait_for_finished()
+        self.assertEqual(n, len(threading.enumerate()))
+
+
+class LockTests(BaseLockTests):
+    """
+    Tests for non-recursive, weak locks
+    (which can be acquired and released from different threads).
+    """
+    def test_reacquire(self):
+        # Lock needs to be released before re-acquiring.
+        lock = self.locktype()
+        phase = []
+        def f():
+            lock.acquire()
+            phase.append(None)
+            lock.acquire()
+            phase.append(None)
+        start_new_thread(f, ())
+        while len(phase) == 0:
+            _wait()
+        _wait()
+        self.assertEqual(len(phase), 1)
+        lock.release()
+        while len(phase) == 1:
+            _wait()
+        self.assertEqual(len(phase), 2)
+
+    def test_different_thread(self):
+        # Lock can be released from a different thread.
+        lock = self.locktype()
+        lock.acquire()
+        def f():
+            lock.release()
+        b = Bunch(f, 1)
+        b.wait_for_finished()
+        lock.acquire()
+        lock.release()
+
+
+class RLockTests(BaseLockTests):
+    """
+    Tests for recursive locks.
+    """
+    def test_reacquire(self):
+        lock = self.locktype()
+        lock.acquire()
+        lock.acquire()
+        lock.release()
+        lock.acquire()
+        lock.release()
+        lock.release()
+
+    def test_release_unacquired(self):
+        # Cannot release an unacquired lock
+        lock = self.locktype()
+        self.assertRaises(RuntimeError, lock.release)
+        lock.acquire()
+        lock.acquire()
+        lock.release()
+        lock.acquire()
+        lock.release()
+        lock.release()
+        self.assertRaises(RuntimeError, lock.release)
+
+    def test_different_thread(self):
+        # Cannot release from a different thread
+        lock = self.locktype()
+        def f():
+            lock.acquire()
+        b = Bunch(f, 1, True)
+        try:
+            self.assertRaises(RuntimeError, lock.release)
+        finally:
+            b.do_finish()
+
+    def test__is_owned(self):
+        lock = self.locktype()
+        self.assertFalse(lock._is_owned())
+        lock.acquire()
+        self.assertTrue(lock._is_owned())
+        lock.acquire()
+        self.assertTrue(lock._is_owned())
+        result = []
+        def f():
+            result.append(lock._is_owned())
+        Bunch(f, 1).wait_for_finished()
+        self.assertFalse(result[0])
+        lock.release()
+        self.assertTrue(lock._is_owned())
+        lock.release()
+        self.assertFalse(lock._is_owned())
+
+
+class EventTests(BaseTestCase):
+    """
+    Tests for Event objects.
+    """
+
+    def test_is_set(self):
+        evt = self.eventtype()
+        self.assertFalse(evt.is_set())
+        evt.set()
+        self.assertTrue(evt.is_set())
+        evt.set()
+        self.assertTrue(evt.is_set())
+        evt.clear()
+        self.assertFalse(evt.is_set())
+        evt.clear()
+        self.assertFalse(evt.is_set())
+
+    def _check_notify(self, evt):
+        # All threads get notified
+        N = 5
+        results1 = []
+        results2 = []
+        def f():
+            results1.append(evt.wait())
+            results2.append(evt.wait())
+        b = Bunch(f, N)
+        b.wait_for_started()
+        _wait()
+        self.assertEqual(len(results1), 0)
+        evt.set()
+        b.wait_for_finished()
+        self.assertEqual(results1, [True] * N)
+        self.assertEqual(results2, [True] * N)
+
+    def test_notify(self):
+        evt = self.eventtype()
+        self._check_notify(evt)
+        # Another time, after an explicit clear()
+        evt.set()
+        evt.clear()
+        self._check_notify(evt)
+
+    def test_timeout(self):
+        evt = self.eventtype()
+        results1 = []
+        results2 = []
+        N = 5
+        def f():
+            results1.append(evt.wait(0.0))
+            t1 = time.time()
+            r = evt.wait(0.2)
+            t2 = time.time()
+            results2.append((r, t2 - t1))
+        Bunch(f, N).wait_for_finished()
+        self.assertEqual(results1, [False] * N)
+        for r, dt in results2:
+            self.assertFalse(r)
+            self.assertTrue(dt >= 0.2, dt)
+        # The event is set
+        results1 = []
+        results2 = []
+        evt.set()
+        Bunch(f, N).wait_for_finished()
+        self.assertEqual(results1, [True] * N)
+        for r, dt in results2:
+            self.assertTrue(r)
+
+
+class ConditionTests(BaseTestCase):
+    """
+    Tests for condition variables.
+    """
+
+    def test_acquire(self):
+        cond = self.condtype()
+        # Be default we have an RLock: the condition can be acquired multiple
+        # times.
+        cond.acquire()
+        cond.acquire()
+        cond.release()
+        cond.release()
+        lock = threading.Lock()
+        cond = self.condtype(lock)
+        cond.acquire()
+        self.assertFalse(lock.acquire(False))
+        cond.release()
+        self.assertTrue(lock.acquire(False))
+        self.assertFalse(cond.acquire(False))
+        lock.release()
+        with cond:
+            self.assertFalse(lock.acquire(False))
+
+    def test_unacquired_wait(self):
+        cond = self.condtype()
+        self.assertRaises(RuntimeError, cond.wait)
+
+    def test_unacquired_notify(self):
+        cond = self.condtype()
+        self.assertRaises(RuntimeError, cond.notify)
+
+    def _check_notify(self, cond):
+        N = 5
+        results1 = []
+        results2 = []
+        phase_num = 0
+        def f():
+            cond.acquire()
+            cond.wait()
+            cond.release()
+            results1.append(phase_num)
+            cond.acquire()
+            cond.wait()
+            cond.release()
+            results2.append(phase_num)
+        b = Bunch(f, N)
+        b.wait_for_started()
+        _wait()
+        self.assertEqual(results1, [])
+        # Notify 3 threads at first
+        cond.acquire()
+        cond.notify(3)
+        _wait()
+        phase_num = 1
+        cond.release()
+        while len(results1) < 3:
+            _wait()
+        self.assertEqual(results1, [1] * 3)
+        self.assertEqual(results2, [])
+        # Notify 5 threads: they might be in their first or second wait
+        cond.acquire()
+        cond.notify(5)
+        _wait()
+        phase_num = 2
+        cond.release()
+        while len(results1) + len(results2) < 8:
+            _wait()
+        self.assertEqual(results1, [1] * 3 + [2] * 2)
+        self.assertEqual(results2, [2] * 3)
+        # Notify all threads: they are all in their second wait
+        cond.acquire()
+        cond.notify_all()
+        _wait()
+        phase_num = 3
+        cond.release()
+        while len(results2) < 5:
+            _wait()
+        self.assertEqual(results1, [1] * 3 + [2] * 2)
+        self.assertEqual(results2, [2] * 3 + [3] * 2)
+        b.wait_for_finished()
+
+    def test_notify(self):
+        cond = self.condtype()
+        self._check_notify(cond)
+        # A second time, to check internal state is still ok.
+        self._check_notify(cond)
+
+    def test_timeout(self):
+        cond = self.condtype()
+        results = []
+        N = 5
+        def f():
+            cond.acquire()
+            t1 = time.time()
+            cond.wait(0.2)
+            t2 = time.time()
+            cond.release()
+            results.append(t2 - t1)
+        Bunch(f, N).wait_for_finished()
+        self.assertEqual(len(results), 5)
+        for dt in results:
+            self.assertTrue(dt >= 0.2, dt)
+
+
+class BaseSemaphoreTests(BaseTestCase):
+    """
+    Common tests for {bounded, unbounded} semaphore objects.
+    """
+
+    def test_constructor(self):
+        self.assertRaises(ValueError, self.semtype, value = -1)
+        self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
+
+    def test_acquire(self):
+        sem = self.semtype(1)
+        sem.acquire()
+        sem.release()
+        sem = self.semtype(2)
+        sem.acquire()
+        sem.acquire()
+        sem.release()
+        sem.release()
+
+    def test_acquire_destroy(self):
+        sem = self.semtype()
+        sem.acquire()
+        del sem
+
+    def test_acquire_contended(self):
+        sem = self.semtype(7)
+        sem.acquire()
+        N = 10
+        results1 = []
+        results2 = []
+        phase_num = 0
+        def f():
+            sem.acquire()
+            results1.append(phase_num)
+            sem.acquire()
+            results2.append(phase_num)
+        b = Bunch(f, 10)
+        b.wait_for_started()
+        while len(results1) + len(results2) < 6:
+            _wait()
+        self.assertEqual(results1 + results2, [0] * 6)
+        phase_num = 1
+        for i in range(7):
+            sem.release()
+        while len(results1) + len(results2) < 13:
+            _wait()
+        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
+        phase_num = 2
+        for i in range(6):
+            sem.release()
+        while len(results1) + len(results2) < 19:
+            _wait()
+        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
+        # The semaphore is still locked
+        self.assertFalse(sem.acquire(False))
+        # Final release, to let the last thread finish
+        sem.release()
+        b.wait_for_finished()
+
+    def test_try_acquire(self):
+        sem = self.semtype(2)
+        self.assertTrue(sem.acquire(False))
+        self.assertTrue(sem.acquire(False))
+        self.assertFalse(sem.acquire(False))
+        sem.release()
+        self.assertTrue(sem.acquire(False))
+
+    def test_try_acquire_contended(self):
+        sem = self.semtype(4)
+        sem.acquire()
+        results = []
+        def f():
+            results.append(sem.acquire(False))
+            results.append(sem.acquire(False))
+        Bunch(f, 5).wait_for_finished()
+        # There can be a thread switch between acquiring the semaphore and
+        # appending the result, therefore results will not necessarily be
+        # ordered.
+        self.assertEqual(sorted(results), [False] * 7 + [True] *  3 )
+
+    def test_default_value(self):
+        # The default initial value is 1.
+        sem = self.semtype()
+        sem.acquire()
+        def f():
+            sem.acquire()
+            sem.release()
+        b = Bunch(f, 1)
+        b.wait_for_started()
+        _wait()
+        self.assertFalse(b.finished)
+        sem.release()
+        b.wait_for_finished()
+
+    def test_with(self):
+        sem = self.semtype(2)
+        def _with(err=None):
+            with sem:
+                self.assertTrue(sem.acquire(False))
+                sem.release()
+                with sem:
+                    self.assertFalse(sem.acquire(False))
+                    if err:
+                        raise err
+        _with()
+        self.assertTrue(sem.acquire(False))
+        sem.release()
+        self.assertRaises(TypeError, _with, TypeError)
+        self.assertTrue(sem.acquire(False))
+        sem.release()
+
+class SemaphoreTests(BaseSemaphoreTests):
+    """
+    Tests for unbounded semaphores.
+    """
+
+    def test_release_unacquired(self):
+        # Unbounded releases are allowed and increment the semaphore's value
+        sem = self.semtype(1)
+        sem.release()
+        sem.acquire()
+        sem.acquire()
+        sem.release()
+
+
+class BoundedSemaphoreTests(BaseSemaphoreTests):
+    """
+    Tests for bounded semaphores.
+    """
+
+    def test_release_unacquired(self):
+        # Cannot go past the initial value
+        sem = self.semtype()
+        self.assertRaises(ValueError, sem.release)
+        sem.acquire()
+        sem.release()
+        self.assertRaises(ValueError, sem.release)
diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py
index 73d87b8..f6ce1ae 100644
--- a/Lib/test/test_thread.py
+++ b/Lib/test/test_thread.py
@@ -5,6 +5,7 @@
 import _thread as thread
 import time
 
+from test import lock_tests
 
 NUMTASKS = 10
 NUMTRIPS = 3
@@ -161,8 +162,12 @@
         if finished:
             self.done_mutex.release()
 
+class LockTests(lock_tests.LockTests):
+    locktype = thread.allocate_lock
+
+
 def test_main():
-    support.run_unittest(ThreadRunningTests, BarrierTest)
+    support.run_unittest(ThreadRunningTests, BarrierTest, LockTests)
 
 if __name__ == "__main__":
     test_main()
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index 7b6d82b..86f5773 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -11,6 +11,8 @@
 import unittest
 import weakref
 
+from test import lock_tests
+
 # A trivial mutable counter.
 class Counter(object):
     def __init__(self):
@@ -133,11 +135,9 @@
     def test_foreign_thread(self):
         # Check that a "foreign" thread can use the threading module.
         def f(mutex):
-            # Acquiring an RLock forces an entry for the foreign
+            # Calling current_thread() forces an entry for the foreign
             # thread to get made in the threading._active map.
-            r = threading.RLock()
-            r.acquire()
-            r.release()
+            threading.current_thread()
             mutex.release()
 
         mutex = threading.Lock()
@@ -471,22 +471,6 @@
         thread.start()
         self.assertRaises(RuntimeError, thread.start)
 
-    def test_releasing_unacquired_rlock(self):
-        rlock = threading.RLock()
-        self.assertRaises(RuntimeError, rlock.release)
-
-    def test_waiting_on_unacquired_condition(self):
-        cond = threading.Condition()
-        self.assertRaises(RuntimeError, cond.wait)
-
-    def test_notify_on_unacquired_condition(self):
-        cond = threading.Condition()
-        self.assertRaises(RuntimeError, cond.notify)
-
-    def test_semaphore_with_negative_value(self):
-        self.assertRaises(ValueError, threading.Semaphore, value = -1)
-        self.assertRaises(ValueError, threading.Semaphore, value = -sys.maxsize)
-
     def test_joining_current_thread(self):
         current_thread = threading.current_thread()
         self.assertRaises(RuntimeError, current_thread.join);
@@ -501,11 +485,37 @@
         self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
 
 
+class LockTests(lock_tests.LockTests):
+    locktype = staticmethod(threading.Lock)
+
+class RLockTests(lock_tests.RLockTests):
+    locktype = staticmethod(threading.RLock)
+
+class EventTests(lock_tests.EventTests):
+    eventtype = staticmethod(threading.Event)
+
+class ConditionAsRLockTests(lock_tests.RLockTests):
+    # An Condition uses an RLock by default and exports its API.
+    locktype = staticmethod(threading.Condition)
+
+class ConditionTests(lock_tests.ConditionTests):
+    condtype = staticmethod(threading.Condition)
+
+class SemaphoreTests(lock_tests.SemaphoreTests):
+    semtype = staticmethod(threading.Semaphore)
+
+class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
+    semtype = staticmethod(threading.BoundedSemaphore)
+
+
 def test_main():
-    test.support.run_unittest(ThreadTests,
-                                   ThreadJoinOnShutdown,
-                                   ThreadingExceptionTests,
-                                   )
+    test.support.run_unittest(LockTests, RLockTests, EventTests,
+                              ConditionAsRLockTests, ConditionTests,
+                              SemaphoreTests, BoundedSemaphoreTests,
+                              ThreadTests,
+                              ThreadJoinOnShutdown,
+                              ThreadingExceptionTests,
+                              )
 
 if __name__ == "__main__":
     test_main()
diff --git a/Lib/threading.py b/Lib/threading.py
index d5412e9..4bb0182 100644
--- a/Lib/threading.py
+++ b/Lib/threading.py
@@ -92,14 +92,16 @@
 
     def __repr__(self):
         owner = self._owner
-        return "<%s(%s, %d)>" % (
-                self.__class__.__name__,
-                owner and owner.name,
-                self._count)
+        try:
+            owner = _active[owner].name
+        except KeyError:
+            pass
+        return "<%s owner=%r count=%d>" % (
+                self.__class__.__name__, owner, self._count)
 
     def acquire(self, blocking=True):
-        me = current_thread()
-        if self._owner is me:
+        me = _get_ident()
+        if self._owner == me:
             self._count = self._count + 1
             if __debug__:
                 self._note("%s.acquire(%s): recursive success", self, blocking)
@@ -118,7 +120,7 @@
     __enter__ = acquire
 
     def release(self):
-        if self._owner is not current_thread():
+        if self._owner != _get_ident():
             raise RuntimeError("cannot release un-acquired lock")
         self._count = count = self._count - 1
         if not count:
@@ -152,7 +154,7 @@
         return (count, owner)
 
     def _is_owned(self):
-        return self._owner is current_thread()
+        return self._owner == _get_ident()
 
 
 def Condition(*args, **kwargs):