Issue #14087: multiprocessing: add Condition.wait_for(). Patch by sbt.
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index eaf912c..d1c9d45 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -48,6 +48,7 @@
 from multiprocessing import Process, current_process, active_children, Pool, util, connection
 from multiprocessing.process import AuthenticationString
 from multiprocessing.forking import exit, Popen, ForkingPickler
+from time import time as _time
 
 #
 # Register some things for pickling
@@ -996,6 +997,24 @@
         return self._callmethod('notify')
     def notify_all(self):
         return self._callmethod('notify_all')
+    def wait_for(self, predicate, timeout=None):
+        result = predicate()
+        if result:
+            return result
+        if timeout is not None:
+            endtime = _time() + timeout
+        else:
+            endtime = None
+            waittime = None
+        while not result:
+            if endtime is not None:
+                waittime = endtime - _time()
+                if waittime <= 0:
+                    break
+            self.wait(waittime)
+            result = predicate()
+        return result
+
 
 class EventProxy(BaseProxy):
     _exposed_ = ('is_set', 'set', 'clear', 'wait')
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
index e35bbff..532ac5c 100644
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -43,6 +43,7 @@
 from multiprocessing.process import current_process
 from multiprocessing.util import register_after_fork, debug
 from multiprocessing.forking import assert_spawning, Popen
+from time import time as _time
 
 # Try to import the mp.synchronize module cleanly, if it fails
 # raise ImportError for platforms lacking a working sem_open implementation.
@@ -290,6 +291,24 @@
             while self._wait_semaphore.acquire(False):
                 pass
 
+    def wait_for(self, predicate, timeout=None):
+        result = predicate()
+        if result:
+            return result
+        if timeout is not None:
+            endtime = _time() + timeout
+        else:
+            endtime = None
+            waittime = None
+        while not result:
+            if endtime is not None:
+                waittime = endtime - _time()
+                if waittime <= 0:
+                    break
+            self.wait(waittime)
+            result = predicate()
+        return result
+
 #
 # Event
 #
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index 2bcdb4e..bbde366 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -887,6 +887,73 @@
         self.assertEqual(res, False)
         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
 
+    @classmethod
+    def _test_waitfor_f(cls, cond, state):
+        with cond:
+            state.value = 0
+            cond.notify()
+            result = cond.wait_for(lambda : state.value==4)
+            if not result or state.value != 4:
+                sys.exit(1)
+
+    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
+    def test_waitfor(self):
+        # based on test in test/lock_tests.py
+        cond = self.Condition()
+        state = self.Value('i', -1)
+
+        p = self.Process(target=self._test_waitfor_f, args=(cond, state))
+        p.daemon = True
+        p.start()
+
+        with cond:
+            result = cond.wait_for(lambda : state.value==0)
+            self.assertTrue(result)
+            self.assertEqual(state.value, 0)
+
+        for i in range(4):
+            time.sleep(0.01)
+            with cond:
+                state.value += 1
+                cond.notify()
+
+        p.join(5)
+        self.assertFalse(p.is_alive())
+        self.assertEqual(p.exitcode, 0)
+
+    @classmethod
+    def _test_waitfor_timeout_f(cls, cond, state, success):
+        with cond:
+            expected = 0.1
+            dt = time.time()
+            result = cond.wait_for(lambda : state.value==4, timeout=expected)
+            dt = time.time() - dt
+            # borrow logic in assertTimeout() from test/lock_tests.py
+            if not result and expected * 0.6 < dt < expected * 10.0:
+                success.value = True
+
+    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
+    def test_waitfor_timeout(self):
+        # based on test in test/lock_tests.py
+        cond = self.Condition()
+        state = self.Value('i', 0)
+        success = self.Value('i', False)
+
+        p = self.Process(target=self._test_waitfor_timeout_f,
+                         args=(cond, state, success))
+        p.daemon = True
+        p.start()
+
+        # Only increment 3 times, so state == 4 is never reached.
+        for i in range(3):
+            time.sleep(0.01)
+            with cond:
+                state.value += 1
+                cond.notify()
+
+        p.join(5)
+        self.assertTrue(success.value)
+
 
 class _TestEvent(BaseTestCase):