Issue #12328: Fix multiprocessing's use of overlapped I/O on Windows.
Also, add a multiprocessing.connection.wait(rlist, timeout=None) function
for polling multiple objects at once. Patch by sbt.
Complete changelist from sbt's patch:
* Adds a wait(rlist, timeout=None) function for polling multiple
objects at once. On Unix this is just a wrapper for
select(rlist, [], [], timeout=None).
* Removes use of the SentinelReady exception and the sentinels argument
to certain methods. concurrent.futures.process has been changed to
use wait() instead of SentinelReady.
* Fixes bugs concerning PipeConnection.poll() and messages of zero
length.
* Fixes PipeListener.accept() to call ConnectNamedPipe() with
overlapped=True.
* Fixes Queue.empty() and SimpleQueue.empty() so that they are
threadsafe on Windows.
* Now PipeConnection.poll() and wait() will not modify the pipe except
possibly by consuming a zero length message. (Previously poll()
could consume a partial message.)
* All of multiprocesing's pipe related blocking functions/methods are
now interruptible by SIGINT on Windows.
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index c4f9cda..262fd85 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -44,7 +44,7 @@
from queue import Empty, Full
import _multiprocessing
-from multiprocessing.connection import Pipe, SentinelReady
+from multiprocessing.connection import Pipe
from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
from multiprocessing.util import debug, info, Finalize, register_after_fork
from multiprocessing.forking import assert_spawning
@@ -360,6 +360,7 @@
def __init__(self):
self._reader, self._writer = Pipe(duplex=False)
self._rlock = Lock()
+ self._poll = self._reader.poll
if sys.platform == 'win32':
self._wlock = None
else:
@@ -367,7 +368,7 @@
self._make_methods()
def empty(self):
- return not self._reader.poll()
+ return not self._poll()
def __getstate__(self):
assert_spawning(self)
@@ -380,10 +381,10 @@
def _make_methods(self):
recv = self._reader.recv
racquire, rrelease = self._rlock.acquire, self._rlock.release
- def get(*, sentinels=None):
+ def get():
racquire()
try:
- return recv(sentinels)
+ return recv()
finally:
rrelease()
self.get = get