Brett Cannon's dummy_thread and dummy_threading modules (SF patch
622537), with some nitpicking editorial changes.
diff --git a/Lib/test/test_dummy_thread.py b/Lib/test/test_dummy_thread.py
new file mode 100644
index 0000000..3be3931
--- /dev/null
+++ b/Lib/test/test_dummy_thread.py
@@ -0,0 +1,167 @@
+"""Generic thread tests.
+
+Meant to be used by dummy_thread and thread.  To allow for different modules
+to be used, test_main() can be called with the module to use as the thread
+implementation as its sole argument.
+
+"""
+import dummy_thread as _thread
+import time
+import Queue
+import random
+import unittest
+from test import test_support
+
+
+class LockTests(unittest.TestCase):
+    """Test lock objects."""
+
+    def setUp(self):
+        # Create a lock
+        self.lock = _thread.allocate_lock()
+
+    def test_initlock(self):
+        #Make sure locks start locked
+        self.failUnless(not self.lock.locked(),
+                        "Lock object is not initialized unlocked.")
+
+    def test_release(self):
+        # Test self.lock.release()
+        self.lock.acquire()
+        self.lock.release()
+        self.failUnless(not self.lock.locked(),
+                        "Lock object did not release properly.")
+    
+    def test_improper_release(self):
+        #Make sure release of an unlocked thread raises _thread.error
+        self.failUnlessRaises(_thread.error, self.lock.release)
+
+    def test_cond_acquire_success(self):
+        #Make sure the conditional acquiring of the lock works.
+        self.failUnless(self.lock.acquire(0),
+                        "Conditional acquiring of the lock failed.")
+
+    def test_cond_acquire_fail(self):
+        #Test acquiring locked lock returns False
+        self.lock.acquire(0)
+        self.failUnless(not self.lock.acquire(0),
+                        "Conditional acquiring of a locked lock incorrectly "
+                         "succeeded.")
+
+    def test_uncond_acquire_success(self):
+        #Make sure unconditional acquiring of a lock works.
+        self.lock.acquire()
+        self.failUnless(self.lock.locked(),
+                        "Uncondional locking failed.")
+
+    def test_uncond_acquire_return_val(self):
+        #Make sure that an unconditional locking returns True.
+        self.failUnless(self.lock.acquire(1) is True,
+                        "Unconditional locking did not return True.")
+    
+    def test_uncond_acquire_blocking(self):
+        #Make sure that unconditional acquiring of a locked lock blocks.
+        def delay_unlock(to_unlock, delay):
+            """Hold on to lock for a set amount of time before unlocking."""
+            time.sleep(delay)
+            to_unlock.release()
+
+        self.lock.acquire()
+        delay = 1  #In seconds
+        start_time = int(time.time())
+        _thread.start_new_thread(delay_unlock,(self.lock, delay))
+        if test_support.verbose:
+            print
+            print "*** Waiting for thread to release the lock "\
+            "(approx. %s sec.) ***" % delay
+        self.lock.acquire()
+        end_time = int(time.time())
+        if test_support.verbose:
+            print "done"
+        self.failUnless((end_time - start_time) >= delay,
+                        "Blocking by unconditional acquiring failed.")
+
+class MiscTests(unittest.TestCase):
+    """Miscellaneous tests."""
+
+    def test_exit(self):
+        #Make sure _thread.exit() raises SystemExit
+        self.failUnlessRaises(SystemExit, _thread.exit)
+
+    def test_ident(self):
+        #Test sanity of _thread.get_ident()
+        self.failUnless(isinstance(_thread.get_ident(), int),
+                        "_thread.get_ident() returned a non-integer")
+        self.failUnless(_thread.get_ident() != 0,
+                        "_thread.get_ident() returned 0")
+
+    def test_LockType(self):
+        #Make sure _thread.LockType is the same type as _thread.allocate_locke()
+        self.failUnless(isinstance(_thread.allocate_lock(), _thread.LockType),
+                        "_thread.LockType is not an instance of what is "
+                         "returned by _thread.allocate_lock()")
+
+class ThreadTests(unittest.TestCase):
+    """Test thread creation."""
+
+    def test_arg_passing(self):
+        #Make sure that parameter passing works.
+        def arg_tester(queue, arg1=False, arg2=False):
+            """Use to test _thread.start_new_thread() passes args properly."""
+            queue.put((arg1, arg2))
+
+        testing_queue = Queue.Queue(1)
+        _thread.start_new_thread(arg_tester, (testing_queue, True, True))
+        result = testing_queue.get()
+        self.failUnless(result[0] and result[1],
+                        "Argument passing for thread creation using tuple failed")
+        _thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue,
+                                                       'arg1':True, 'arg2':True})
+        result = testing_queue.get()
+        self.failUnless(result[0] and result[1],
+                        "Argument passing for thread creation using kwargs failed")
+        _thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True})
+        result = testing_queue.get()
+        self.failUnless(result[0] and result[1],
+                        "Argument passing for thread creation using both tuple"
+                        " and kwargs failed")
+    
+    def test_multi_creation(self):
+        #Make sure multiple threads can be created.
+        def queue_mark(queue, delay):
+            """Wait for ``delay`` seconds and then put something into ``queue``"""
+            time.sleep(delay)
+            queue.put(_thread.get_ident())
+        
+        thread_count = 5
+        delay = 1.5
+        testing_queue = Queue.Queue(thread_count)
+        if test_support.verbose:
+            print
+            print "*** Testing multiple thread creation "\
+            "(will take approx. %s to %s sec.) ***" % (delay, thread_count)
+        for count in xrange(thread_count):
+            _thread.start_new_thread(queue_mark,
+                                     (testing_queue, round(random.random(), 1)))
+        time.sleep(delay)
+        if test_support.verbose:
+            print 'done'
+        self.failUnless(testing_queue.qsize() == thread_count,
+                        "Not all %s threads executed properly after %s sec." % 
+                        (thread_count, delay))
+
+def test_main(imported_module=None):
+    global _thread
+    if imported_module:
+        _thread = imported_module
+    if test_support.verbose:
+        print
+        print "*** Using %s as _thread module ***" % _thread
+    suite = unittest.TestSuite()
+    suite.addTest(unittest.makeSuite(LockTests))
+    suite.addTest(unittest.makeSuite(MiscTests))
+    suite.addTest(unittest.makeSuite(ThreadTests))
+    test_support.run_suite(suite)
+
+if __name__ == '__main__':
+    test_main()