bpo-5001: More-informative multiprocessing error messages (#3079)
* Make error message more informative
Replace assertions in error-reporting code with more-informative version that doesn't cause confusion over where and what the error is.
* Additional clarification + get travis to check
* Change from SystemError to TypeError
As suggested in PR comment by @pitrou, changing from SystemError; TypeError appears appropriate.
* NEWS file installation; ACKS addition (will do my best to justify it by additional work)
* Making current AssertionErrors in multiprocessing more informative
* Blurb added re multiprocessing managers.py, queues.py cleanup
* Further multiprocessing cleanup - went through pool.py
* Fix two asserts in multiprocessing/util.py
* Most asserts in multiprocessing more informative
* Didn't save right version
* Further work on multiprocessing error messages
* Correct typo
* Correct typo v2
* Blasted colon... serves me right for trying to work on two things at once
* Simplify NEWS entry
* Update 2017-08-18-17-16-38.bpo-5001.gwnthq.rst
* Update 2017-08-18-17-16-38.bpo-5001.gwnthq.rst
OK, never mind.
* Corrected (thanks to pitrou) error messages for notify
* Remove extraneous backslash in docstring.
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index c2364ab..e457f0a 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -92,7 +92,9 @@
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
wrap_exception=False):
- assert maxtasks is None or (type(maxtasks) == int and maxtasks > 0)
+ if (maxtasks is not None) and not (isinstance(maxtasks, int)
+ and maxtasks >= 1):
+ raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks))
put = outqueue.put
get = inqueue.get
if hasattr(inqueue, '_writer'):
@@ -254,8 +256,8 @@
def apply(self, func, args=(), kwds={}):
'''
Equivalent of `func(*args, **kwds)`.
+ Pool must be running.
'''
- assert self._state == RUN
return self.apply_async(func, args, kwds).get()
def map(self, func, iterable, chunksize=None):
@@ -307,6 +309,10 @@
))
return result
else:
+ if chunksize < 1:
+ raise ValueError(
+ "Chunksize must be 1+, not {0:n}".format(
+ chunksize))
assert chunksize > 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapIterator(self._cache)
@@ -334,7 +340,9 @@
))
return result
else:
- assert chunksize > 1
+ if chunksize < 1:
+ raise ValueError(
+ "Chunksize must be 1+, not {0!r}".format(chunksize))
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = IMapUnorderedIterator(self._cache)
self._taskqueue.put(
@@ -466,7 +474,7 @@
return
if thread._state:
- assert thread._state == TERMINATE
+ assert thread._state == TERMINATE, "Thread not in TERMINATE"
util.debug('result handler found thread._state=TERMINATE')
break
@@ -542,7 +550,10 @@
def join(self):
util.debug('joining pool')
- assert self._state in (CLOSE, TERMINATE)
+ if self._state == RUN:
+ raise ValueError("Pool is still running")
+ elif self._state not in (CLOSE, TERMINATE):
+ raise ValueError("In unknown state")
self._worker_handler.join()
self._task_handler.join()
self._result_handler.join()
@@ -570,7 +581,9 @@
util.debug('helping task handler/workers to finish')
cls._help_stuff_finish(inqueue, task_handler, len(pool))
- assert result_handler.is_alive() or len(cache) == 0
+ if (not result_handler.is_alive()) and (len(cache) != 0):
+ raise AssertionError(
+ "Cannot have cache with result_hander not alive")
result_handler._state = TERMINATE
outqueue.put(None) # sentinel
@@ -628,7 +641,8 @@
return self._event.is_set()
def successful(self):
- assert self.ready()
+ if not self.ready():
+ raise ValueError("{0!r} not ready".format(self))
return self._success
def wait(self, timeout=None):