Issue #12328: Fix multiprocessing's use of overlapped I/O on Windows.
Also, add a multiprocessing.connection.wait(rlist, timeout=None) function
for polling multiple objects at once. Patch by sbt.
Complete changelist from sbt's patch:
* Adds a wait(rlist, timeout=None) function for polling multiple
objects at once. On Unix this is just a wrapper for
select(rlist, [], [], timeout=None).
* Removes use of the SentinelReady exception and the sentinels argument
to certain methods. concurrent.futures.process has been changed to
use wait() instead of SentinelReady.
* Fixes bugs concerning PipeConnection.poll() and messages of zero
length.
* Fixes PipeListener.accept() to call ConnectNamedPipe() with
overlapped=True.
* Fixes Queue.empty() and SimpleQueue.empty() so that they are
threadsafe on Windows.
* Now PipeConnection.poll() and wait() will not modify the pipe except
possibly by consuming a zero length message. (Previously poll()
could consume a partial message.)
* All of multiprocesing's pipe related blocking functions/methods are
now interruptible by SIGINT on Windows.
diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 7f31ec2..04238a7 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -50,7 +50,8 @@
from concurrent.futures import _base
import queue
import multiprocessing
-from multiprocessing.queues import SimpleQueue, SentinelReady, Full
+from multiprocessing.queues import SimpleQueue, Full
+from multiprocessing.connection import wait
import threading
import weakref
@@ -212,6 +213,8 @@
for p in processes.values():
p.join()
+ reader = result_queue._reader
+
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
@@ -219,9 +222,10 @@
sentinels = [p.sentinel for p in processes.values()]
assert sentinels
- try:
- result_item = result_queue.get(sentinels=sentinels)
- except SentinelReady:
+ ready = wait([reader] + sentinels)
+ if reader in ready:
+ result_item = reader.recv()
+ else:
# Mark the process pool broken so that submits fail right now.
executor = executor_reference()
if executor is not None: