Issue #14087: multiprocessing: add Condition.wait_for(). Patch by sbt.
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index 2bcdb4e..bbde366 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -887,6 +887,73 @@
         self.assertEqual(res, False)
         self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
 
+    @classmethod
+    def _test_waitfor_f(cls, cond, state):
+        with cond:
+            state.value = 0
+            cond.notify()
+            result = cond.wait_for(lambda : state.value==4)
+            if not result or state.value != 4:
+                sys.exit(1)
+
+    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
+    def test_waitfor(self):
+        # based on test in test/lock_tests.py
+        cond = self.Condition()
+        state = self.Value('i', -1)
+
+        p = self.Process(target=self._test_waitfor_f, args=(cond, state))
+        p.daemon = True
+        p.start()
+
+        with cond:
+            result = cond.wait_for(lambda : state.value==0)
+            self.assertTrue(result)
+            self.assertEqual(state.value, 0)
+
+        for i in range(4):
+            time.sleep(0.01)
+            with cond:
+                state.value += 1
+                cond.notify()
+
+        p.join(5)
+        self.assertFalse(p.is_alive())
+        self.assertEqual(p.exitcode, 0)
+
+    @classmethod
+    def _test_waitfor_timeout_f(cls, cond, state, success):
+        with cond:
+            expected = 0.1
+            dt = time.time()
+            result = cond.wait_for(lambda : state.value==4, timeout=expected)
+            dt = time.time() - dt
+            # borrow logic in assertTimeout() from test/lock_tests.py
+            if not result and expected * 0.6 < dt < expected * 10.0:
+                success.value = True
+
+    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
+    def test_waitfor_timeout(self):
+        # based on test in test/lock_tests.py
+        cond = self.Condition()
+        state = self.Value('i', 0)
+        success = self.Value('i', False)
+
+        p = self.Process(target=self._test_waitfor_timeout_f,
+                         args=(cond, state, success))
+        p.daemon = True
+        p.start()
+
+        # Only increment 3 times, so state == 4 is never reached.
+        for i in range(3):
+            time.sleep(0.01)
+            with cond:
+                state.value += 1
+                cond.notify()
+
+        p.join(5)
+        self.assertTrue(success.value)
+
 
 class _TestEvent(BaseTestCase):