bpo-5001: More-informative multiprocessing error messages (#3079)

* Make error message more informative

Replace assertions in error-reporting code with more-informative version that doesn't cause confusion over where and what the error is.

* Additional clarification + get travis to check

* Change from SystemError to TypeError

As suggested in PR comment by @pitrou, changing from SystemError; TypeError appears appropriate.

* NEWS file installation; ACKS addition (will do my best to justify it by additional work)

* Making current AssertionErrors in multiprocessing more informative

* Blurb added re multiprocessing managers.py, queues.py cleanup

* Further multiprocessing cleanup - went through pool.py

* Fix two asserts in multiprocessing/util.py

* Most asserts in multiprocessing more informative

* Didn't save right version

* Further work on multiprocessing error messages

* Correct typo

* Correct typo v2

* Blasted colon... serves me right for trying to work on two things at once

* Simplify NEWS entry

* Update 2017-08-18-17-16-38.bpo-5001.gwnthq.rst

* Update 2017-08-18-17-16-38.bpo-5001.gwnthq.rst

OK, never mind.

* Corrected (thanks to pitrou) error messages for notify

* Remove extraneous backslash in docstring.
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index c2364ab..e457f0a 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -92,7 +92,9 @@
 
 def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
            wrap_exception=False):
-    assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
+    if (maxtasks is not None) and not (isinstance(maxtasks, int)
+                                       and maxtasks >= 1):
+        raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))
     put = outqueue.put
     get = inqueue.get
     if hasattr(inqueue, '_writer'):
@@ -254,8 +256,8 @@
     def apply(self, func, args=(), kwds={}):
         '''
         Equivalent of `func(*args, **kwds)`.
+        Pool must be running.
         '''
-        assert self._state == RUN
         return self.apply_async(func, args, kwds).get()
 
     def map(self, func, iterable, chunksize=None):
@@ -307,6 +309,10 @@
                 ))
             return result
         else:
+            if chunksize < 1:
+                raise ValueError(
+                    "Chunksize must be 1+, not {0:n}".format(
+                        chunksize))
             assert chunksize > 1
             task_batches = Pool._get_tasks(func, iterable, chunksize)
             result = IMapIterator(self._cache)
@@ -334,7 +340,9 @@
                 ))
             return result
         else:
-            assert chunksize > 1
+            if chunksize < 1:
+                raise ValueError(
+                    "Chunksize must be 1+, not {0!r}".format(chunksize))
             task_batches = Pool._get_tasks(func, iterable, chunksize)
             result = IMapUnorderedIterator(self._cache)
             self._taskqueue.put(
@@ -466,7 +474,7 @@
                 return
 
             if thread._state:
-                assert thread._state == TERMINATE
+                assert thread._state == TERMINATE, "Thread not in TERMINATE"
                 util.debug('result handler found thread._state=TERMINATE')
                 break
 
@@ -542,7 +550,10 @@
 
     def join(self):
         util.debug('joining pool')
-        assert self._state in (CLOSE, TERMINATE)
+        if self._state == RUN:
+            raise ValueError("Pool is still running")
+        elif self._state not in (CLOSE, TERMINATE):
+            raise ValueError("In unknown state")
         self._worker_handler.join()
         self._task_handler.join()
         self._result_handler.join()
@@ -570,7 +581,9 @@
         util.debug('helping task handler/workers to finish')
         cls._help_stuff_finish(inqueue, task_handler, len(pool))
 
-        assert result_handler.is_alive() or len(cache) == 0
+        if (not result_handler.is_alive()) and (len(cache) != 0):
+            raise AssertionError(
+                "Cannot have cache with result_hander not alive")
 
         result_handler._state = TERMINATE
         outqueue.put(None)                  # sentinel
@@ -628,7 +641,8 @@
         return self._event.is_set()
 
     def successful(self):
-        assert self.ready()
+        if not self.ready():
+            raise ValueError("{0!r} not ready".format(self))
         return self._success
 
     def wait(self, timeout=None):