Issue #7316: the acquire() method of lock objects in the :mod:`threading`
module now takes an optional timeout argument in seconds. Timeout support
relies on the system threading library, so as to avoid a semi-busy wait
loop.
diff --git a/Lib/_dummy_thread.py b/Lib/_dummy_thread.py
index 7aa6579..e10bae8 100644
--- a/Lib/_dummy_thread.py
+++ b/Lib/_dummy_thread.py
@@ -17,6 +17,10 @@
'interrupt_main', 'LockType']
import traceback as _traceback
+import time
+
+# A dummy value
+TIMEOUT_MAX = 2**31
class error(Exception):
"""Dummy implementation of _thread.error."""
@@ -92,7 +96,7 @@
def __init__(self):
self.locked_status = False
- def acquire(self, waitflag=None):
+ def acquire(self, waitflag=None, timeout=-1):
"""Dummy implementation of acquire().
For blocking calls, self.locked_status is automatically set to
@@ -111,6 +115,8 @@
self.locked_status = True
return True
else:
+ if timeout > 0:
+ time.sleep(timeout)
return False
__enter__ = acquire
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index d2b8dd1..7154d3c 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -440,10 +440,10 @@
p.terminate()
debug('joining task handler')
- task_handler.join(1e100)
+ task_handler.join()
debug('joining result handler')
- result_handler.join(1e100)
+ task_handler.join()
if pool and hasattr(pool[0], 'terminate'):
debug('joining pool workers')
diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py
index 04f7422..74db3e4 100644
--- a/Lib/test/lock_tests.py
+++ b/Lib/test/lock_tests.py
@@ -4,7 +4,7 @@
import sys
import time
-from _thread import start_new_thread, get_ident
+from _thread import start_new_thread, get_ident, TIMEOUT_MAX
import threading
import unittest
@@ -62,6 +62,14 @@
support.threading_cleanup(*self._threads)
support.reap_children()
+ def assertTimeout(self, actual, expected):
+ # The waiting and/or time.time() can be imprecise, which
+ # is why comparing to the expected value would sometimes fail
+ # (especially under Windows).
+ self.assertGreaterEqual(actual, expected * 0.6)
+ # Test nothing insane happened
+ self.assertLess(actual, expected * 10.0)
+
class BaseLockTests(BaseTestCase):
"""
@@ -143,6 +151,32 @@
Bunch(f, 15).wait_for_finished()
self.assertEqual(n, len(threading.enumerate()))
+ def test_timeout(self):
+ lock = self.locktype()
+ # Can't set timeout if not blocking
+ self.assertRaises(ValueError, lock.acquire, 0, 1)
+ # Invalid timeout values
+ self.assertRaises(ValueError, lock.acquire, timeout=-100)
+ self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
+ self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
+ # TIMEOUT_MAX is ok
+ lock.acquire(timeout=TIMEOUT_MAX)
+ lock.release()
+ t1 = time.time()
+ self.assertTrue(lock.acquire(timeout=5))
+ t2 = time.time()
+ # Just a sanity test that it didn't actually wait for the timeout.
+ self.assertLess(t2 - t1, 5)
+ results = []
+ def f():
+ t1 = time.time()
+ results.append(lock.acquire(timeout=0.5))
+ t2 = time.time()
+ results.append(t2 - t1)
+ Bunch(f, 1).wait_for_finished()
+ self.assertFalse(results[0])
+ self.assertTimeout(results[1], 0.5)
+
class LockTests(BaseLockTests):
"""
@@ -284,14 +318,14 @@
def f():
results1.append(evt.wait(0.0))
t1 = time.time()
- r = evt.wait(0.2)
+ r = evt.wait(0.5)
t2 = time.time()
results2.append((r, t2 - t1))
Bunch(f, N).wait_for_finished()
self.assertEqual(results1, [False] * N)
for r, dt in results2:
self.assertFalse(r)
- self.assertTrue(dt >= 0.2, dt)
+ self.assertTimeout(dt, 0.5)
# The event is set
results1 = []
results2 = []
@@ -397,14 +431,14 @@
def f():
cond.acquire()
t1 = time.time()
- cond.wait(0.2)
+ cond.wait(0.5)
t2 = time.time()
cond.release()
results.append(t2 - t1)
Bunch(f, N).wait_for_finished()
self.assertEqual(len(results), 5)
for dt in results:
- self.assertTrue(dt >= 0.2, dt)
+ self.assertTimeout(dt, 0.5)
class BaseSemaphoreTests(BaseTestCase):
diff --git a/Lib/threading.py b/Lib/threading.py
index 9f1525d..b4d07fe 100644
--- a/Lib/threading.py
+++ b/Lib/threading.py
@@ -31,6 +31,7 @@
_CRLock = _thread.RLock
except AttributeError:
_CRLock = None
+TIMEOUT_MAX = _thread.TIMEOUT_MAX
del _thread
@@ -107,14 +108,14 @@
return "<%s owner=%r count=%d>" % (
self.__class__.__name__, owner, self._count)
- def acquire(self, blocking=True):
+ def acquire(self, blocking=True, timeout=-1):
me = _get_ident()
if self._owner == me:
self._count = self._count + 1
if __debug__:
self._note("%s.acquire(%s): recursive success", self, blocking)
return 1
- rc = self._block.acquire(blocking)
+ rc = self._block.acquire(blocking, timeout)
if rc:
self._owner = me
self._count = 1
@@ -234,22 +235,10 @@
if __debug__:
self._note("%s.wait(): got it", self)
else:
- # Balancing act: We can't afford a pure busy loop, so we
- # have to sleep; but if we sleep the whole timeout time,
- # we'll be unresponsive. The scheme here sleeps very
- # little at first, longer as time goes on, but never longer
- # than 20 times per second (or the timeout time remaining).
- endtime = _time() + timeout
- delay = 0.0005 # 500 us -> initial delay of 1 ms
- while True:
- gotit = waiter.acquire(0)
- if gotit:
- break
- remaining = endtime - _time()
- if remaining <= 0:
- break
- delay = min(delay * 2, remaining, .05)
- _sleep(delay)
+ if timeout > 0:
+ gotit = waiter.acquire(True, timeout)
+ else:
+ gotit = waiter.acquire(False)
if not gotit:
if __debug__:
self._note("%s.wait(%s): timed out", self, timeout)