Issue 10260
Adding the wait_for() method to threading.Condition
diff --git a/Doc/library/threading.rst b/Doc/library/threading.rst
index d3d7d9e..11aa4c4 100644
--- a/Doc/library/threading.rst
+++ b/Doc/library/threading.rst
@@ -539,6 +539,13 @@
 in a typical producer-consumer situation, adding one item to the buffer only
 needs to wake up one consumer thread.
 
+Note:  Condition variables can be, depending on the implementation, subject
+to both spurious wakeups (when :meth:`wait` returns without a :meth:`notify`
+call) and stolen wakeups (when another thread acquires the lock before the
+awoken thread.)  For this reason, it is always necessary to verify the state
+the thread is waiting for when :meth:`wait` returns and optionally repeat
+the call as often as necessary.
+
 
 .. class:: Condition(lock=None)
 
@@ -585,6 +592,35 @@
       .. versionchanged:: 3.2
          Previously, the method always returned ``None``.
 
+   .. method:: wait_for(predicate, timeout=None)
+
+      Wait until a condition evaluates to True.  *predicate* should be a
+      callable which result will be interpreted as a boolean value.
+      A *timeout* may be provided giving the maximum time to wait.
+
+      This utility method may call :meth:`wait` repeatedly until the predicate
+      is satisfied, or until a timeout occurs. The return value is
+      the last return value of the predicate and will evaluate to
+      ``False`` if the method timed out.
+
+      Ignoring the timeout feature, calling this method is roughly equivalent to
+      writing::
+
+        while not predicate():
+            cv.wait()
+
+      Therefore, the same rules apply as with :meth:`wait`: The lock must be
+      held when called and is re-aquired on return.  The predicate is evaluated
+      with the lock held.
+
+      Using this method, the consumer example above can be written thus::
+
+         with cv:
+             cv.wait_for(an_item_is_available)
+             get_an_available_item()
+
+      .. versionadded:: 3.2
+
    .. method:: notify()
 
       Wake up a thread waiting on this condition, if any.  If the calling thread
diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py
index d956bb6..b6d818e 100644
--- a/Lib/test/lock_tests.py
+++ b/Lib/test/lock_tests.py
@@ -446,6 +446,46 @@
             # In practice, this implementation has no spurious wakeups.
             self.assertFalse(result)
 
+    def test_waitfor(self):
+        cond = self.condtype()
+        state = 0
+        def f():
+            with cond:
+                result = cond.wait_for(lambda : state==4)
+                self.assertTrue(result)
+                self.assertEqual(state, 4)
+        b = Bunch(f, 1)
+        b.wait_for_started()
+        for i in range(5):
+            time.sleep(0.01)
+            with cond:
+                state += 1
+                cond.notify()
+        b.wait_for_finished()
+
+    def test_waitfor_timeout(self):
+        cond = self.condtype()
+        state = 0
+        success = []
+        def f():
+            with cond:
+                dt = time.time()
+                result = cond.wait_for(lambda : state==4, timeout=0.1)
+                dt = time.time() - dt
+                self.assertFalse(result)
+                self.assertTimeout(dt, 0.1)
+                success.append(None)
+        b = Bunch(f, 1)
+        b.wait_for_started()
+        # Only increment 3 times, so state == 4 is never reached.
+        for i in range(3):
+            time.sleep(0.01)
+            with cond:
+                state += 1
+                cond.notify()
+        b.wait_for_finished()
+        self.assertEqual(len(success), 1)
+
 
 class BaseSemaphoreTests(BaseTestCase):
     """
diff --git a/Lib/threading.py b/Lib/threading.py
index 41956ed..b6c1e5d 100644
--- a/Lib/threading.py
+++ b/Lib/threading.py
@@ -254,6 +254,32 @@
         finally:
             self._acquire_restore(saved_state)
 
+    def wait_for(self, predicate, timeout=None):
+        endtime = None
+        waittime = timeout
+        result = predicate()
+        while not result:
+            if waittime is not None:
+                if endtime is None:
+                    endtime = _time() + waittime
+                else:
+                    waittime = endtime - _time()
+                    if waittime <= 0:
+                        if __debug__:
+                            self._note("%s.wait_for(%r, %r): Timed out.",
+                                       self, predicate, timeout)
+                        break
+            if __debug__:
+                self._note("%s.wait_for(%r, %r): Waiting with timeout=%s.",
+                           self, predicate, timeout, waittime)
+            self.wait(waittime)
+            result = predicate()
+        else:
+            if __debug__:
+                self._note("%s.wait_for(%r, %r): Success.",
+                           self, predicate, timeout)
+        return result
+
     def notify(self, n=1):
         if not self._is_owned():
             raise RuntimeError("cannot notify on un-acquired lock")
@@ -482,13 +508,12 @@
     # Wait in the barrier until we are relased.  Raise an exception
     # if the barrier is reset or broken.
     def _wait(self, timeout):
-        while self._state == 0:
-            if self._cond.wait(timeout) is False:
-                #timed out.  Break the barrier
-                self._break()
-                raise BrokenBarrierError
-            if self._state < 0:
-                raise BrokenBarrierError
+        if not self._cond.wait_for(lambda : self._state != 0, timeout):
+            #timed out.  Break the barrier
+            self._break()
+            raise BrokenBarrierError
+        if self._state < 0:
+            raise BrokenBarrierError
         assert self._state == 1
 
     # If we are the last thread to exit the barrier, signal any threads