Issue #9244: multiprocessing.pool: Worker crashes if result can't be encoded
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index 0b3f937..bb0f06a 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1011,6 +1011,7 @@
 def sqr(x, wait=0.0):
     time.sleep(wait)
     return x*x
+
 class _TestPool(BaseTestCase):
 
     def test_apply(self):
@@ -1087,9 +1088,55 @@
         join()
         self.assertTrue(join.elapsed < 0.2)
 
-class _TestPoolWorkerLifetime(BaseTestCase):
+def raising():
+    raise KeyError("key")
 
+def unpickleable_result():
+    return lambda: 42
+
+class _TestPoolWorkerErrors(BaseTestCase):
     ALLOWED_TYPES = ('processes', )
+
+    def test_async_error_callback(self):
+        p = multiprocessing.Pool(2)
+
+        scratchpad = [None]
+        def errback(exc):
+            scratchpad[0] = exc
+
+        res = p.apply_async(raising, error_callback=errback)
+        self.assertRaises(KeyError, res.get)
+        self.assertTrue(scratchpad[0])
+        self.assertIsInstance(scratchpad[0], KeyError)
+
+        p.close()
+        p.join()
+
+    def test_unpickleable_result(self):
+        from multiprocessing.pool import MaybeEncodingError
+        p = multiprocessing.Pool(2)
+
+        # Make sure we don't lose pool processes because of encoding errors.
+        for iteration in range(20):
+
+            scratchpad = [None]
+            def errback(exc):
+                scratchpad[0] = exc
+
+            res = p.apply_async(unpickleable_result, error_callback=errback)
+            self.assertRaises(MaybeEncodingError, res.get)
+            wrapped = scratchpad[0]
+            self.assertTrue(wrapped)
+            self.assertIsInstance(scratchpad[0], MaybeEncodingError)
+            self.assertIsNotNone(wrapped.exc)
+            self.assertIsNotNone(wrapped.value)
+
+        p.close()
+        p.join()
+
+class _TestPoolWorkerLifetime(BaseTestCase):
+    ALLOWED_TYPES = ('processes', )
+
     def test_pool_worker_lifetime(self):
         p = multiprocessing.Pool(3, maxtasksperchild=10)
         self.assertEqual(3, len(p._pool))