Issue #19425 -- a pickling error should not cause pool to hang.
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 4d18600..04531b9 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -169,7 +169,8 @@
 
         self._task_handler = threading.Thread(
             target=Pool._handle_tasks,
-            args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
+            args=(self._taskqueue, self._quick_put, self._outqueue,
+                  self._pool, self._cache)
             )
         self._task_handler.daemon = True
         self._task_handler._state = RUN
@@ -329,7 +330,7 @@
         debug('worker handler exiting')
 
     @staticmethod
-    def _handle_tasks(taskqueue, put, outqueue, pool):
+    def _handle_tasks(taskqueue, put, outqueue, pool, cache):
         thread = threading.current_thread()
 
         for taskseq, set_length in iter(taskqueue.get, None):
@@ -340,9 +341,12 @@
                     break
                 try:
                     put(task)
-                except IOError:
-                    debug('could not put task on queue')
-                    break
+                except Exception as e:
+                    job, ind = task[:2]
+                    try:
+                        cache[job]._set(ind, (False, e))
+                    except KeyError:
+                        pass
             else:
                 if set_length:
                     debug('doing set_length()')
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index 579229a..a8928ee 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1117,6 +1117,16 @@
         self.assertEqual(pmap(sqr, range(100), chunksize=20),
                          map(sqr, range(100)))
 
+    def test_map_unplicklable(self):
+        # Issue #19425 -- failure to pickle should not cause a hang
+        if self.TYPE == 'threads':
+            return
+        class A(object):
+            def __reduce__(self):
+                raise RuntimeError('cannot pickle')
+        with self.assertRaises(RuntimeError):
+            self.pool.map(sqr, [A()]*10)
+
     def test_map_chunksize(self):
         try:
             self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)