Merged revisions 76138,76173 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/branches/py3k
................
r76138 | antoine.pitrou | 2009-11-06 23:41:14 +0100 (ven., 06 nov. 2009) | 10 lines
Merged revisions 76137 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk
........
r76137 | antoine.pitrou | 2009-11-06 23:34:35 +0100 (ven., 06 nov. 2009) | 4 lines
Issue #7270: Add some dedicated unit tests for multi-thread synchronization
primitives such as Lock, RLock, Condition, Event and Semaphore.
........
................
r76173 | antoine.pitrou | 2009-11-09 17:08:16 +0100 (lun., 09 nov. 2009) | 11 lines
Merged revisions 76172 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk
........
r76172 | antoine.pitrou | 2009-11-09 17:00:11 +0100 (lun., 09 nov. 2009) | 5 lines
Issue #7282: Fix a memory leak when an RLock was used in a thread other
than those started through `threading.Thread` (for example, using
`thread.start_new_thread()`.
........
................
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index 7b6d82b..86f5773 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -11,6 +11,8 @@
import unittest
import weakref
+from test import lock_tests
+
# A trivial mutable counter.
class Counter(object):
def __init__(self):
@@ -133,11 +135,9 @@
def test_foreign_thread(self):
# Check that a "foreign" thread can use the threading module.
def f(mutex):
- # Acquiring an RLock forces an entry for the foreign
+ # Calling current_thread() forces an entry for the foreign
# thread to get made in the threading._active map.
- r = threading.RLock()
- r.acquire()
- r.release()
+ threading.current_thread()
mutex.release()
mutex = threading.Lock()
@@ -471,22 +471,6 @@
thread.start()
self.assertRaises(RuntimeError, thread.start)
- def test_releasing_unacquired_rlock(self):
- rlock = threading.RLock()
- self.assertRaises(RuntimeError, rlock.release)
-
- def test_waiting_on_unacquired_condition(self):
- cond = threading.Condition()
- self.assertRaises(RuntimeError, cond.wait)
-
- def test_notify_on_unacquired_condition(self):
- cond = threading.Condition()
- self.assertRaises(RuntimeError, cond.notify)
-
- def test_semaphore_with_negative_value(self):
- self.assertRaises(ValueError, threading.Semaphore, value = -1)
- self.assertRaises(ValueError, threading.Semaphore, value = -sys.maxsize)
-
def test_joining_current_thread(self):
current_thread = threading.current_thread()
self.assertRaises(RuntimeError, current_thread.join);
@@ -501,11 +485,37 @@
self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
+class LockTests(lock_tests.LockTests):
+ locktype = staticmethod(threading.Lock)
+
+class RLockTests(lock_tests.RLockTests):
+ locktype = staticmethod(threading.RLock)
+
+class EventTests(lock_tests.EventTests):
+ eventtype = staticmethod(threading.Event)
+
+class ConditionAsRLockTests(lock_tests.RLockTests):
+ # An Condition uses an RLock by default and exports its API.
+ locktype = staticmethod(threading.Condition)
+
+class ConditionTests(lock_tests.ConditionTests):
+ condtype = staticmethod(threading.Condition)
+
+class SemaphoreTests(lock_tests.SemaphoreTests):
+ semtype = staticmethod(threading.Semaphore)
+
+class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
+ semtype = staticmethod(threading.BoundedSemaphore)
+
+
def test_main():
- test.support.run_unittest(ThreadTests,
- ThreadJoinOnShutdown,
- ThreadingExceptionTests,
- )
+ test.support.run_unittest(LockTests, RLockTests, EventTests,
+ ConditionAsRLockTests, ConditionTests,
+ SemaphoreTests, BoundedSemaphoreTests,
+ ThreadTests,
+ ThreadJoinOnShutdown,
+ ThreadingExceptionTests,
+ )
if __name__ == "__main__":
test_main()