Issue #23051: multiprocessing.Pool methods imap() and imap_unordered() now
handle exceptions raised by an iterator.  Patch by Alon Diamant and Davin
Potts.
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index b1e75b5..681529f 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1122,6 +1122,15 @@
 def sqr(x, wait=0.0):
     time.sleep(wait)
     return x*x
+
+class SayWhenError(ValueError): pass
+
+def exception_throwing_generator(total, when):
+    for i in range(total):
+        if i == when:
+            raise SayWhenError("Somebody said when")
+        yield i
+
 class _TestPool(BaseTestCase):
 
     def test_apply(self):
@@ -1177,6 +1186,25 @@
             self.assertEqual(it.next(), i*i)
         self.assertRaises(StopIteration, it.next)
 
+    def test_imap_handle_iterable_exception(self):
+        if self.TYPE == 'manager':
+            self.skipTest('test not appropriate for {}'.format(self.TYPE))
+
+        it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
+        for i in range(3):
+            self.assertEqual(next(it), i*i)
+        self.assertRaises(SayWhenError, it.next)
+
+        # SayWhenError seen at start of problematic chunk's results
+        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
+        for i in range(6):
+            self.assertEqual(next(it), i*i)
+        self.assertRaises(SayWhenError, it.next)
+        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
+        for i in range(4):
+            self.assertEqual(next(it), i*i)
+        self.assertRaises(SayWhenError, it.next)
+
     def test_imap_unordered(self):
         it = self.pool.imap_unordered(sqr, range(1000))
         self.assertEqual(sorted(it), map(sqr, range(1000)))
@@ -1184,6 +1212,25 @@
         it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
         self.assertEqual(sorted(it), map(sqr, range(1000)))
 
+    def test_imap_unordered_handle_iterable_exception(self):
+        if self.TYPE == 'manager':
+            self.skipTest('test not appropriate for {}'.format(self.TYPE))
+
+        it = self.pool.imap_unordered(sqr,
+                                      exception_throwing_generator(10, 3),
+                                      1)
+        with self.assertRaises(SayWhenError):
+            # imap_unordered makes it difficult to anticipate the SayWhenError
+            for i in range(10):
+                self.assertEqual(next(it), i*i)
+
+        it = self.pool.imap_unordered(sqr,
+                                      exception_throwing_generator(20, 7),
+                                      2)
+        with self.assertRaises(SayWhenError):
+            for i in range(20):
+                self.assertEqual(next(it), i*i)
+
     def test_make_pool(self):
         self.assertRaises(ValueError, multiprocessing.Pool, -1)
         self.assertRaises(ValueError, multiprocessing.Pool, 0)