Issue #9205: concurrent.futures.ProcessPoolExecutor now detects killed
children and raises BrokenProcessPool in such a situation.  Previously it
would reliably freeze/deadlock.
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index 415e210..ede2908 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -48,14 +48,18 @@
 
 import _multiprocessing
 from multiprocessing import current_process, AuthenticationError, BufferTooShort
-from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
+from multiprocessing.util import (
+    get_temp_dir, Finalize, sub_debug, debug, _eintr_retry)
 try:
     from _multiprocessing import win32
+    from _subprocess import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE
 except ImportError:
     if sys.platform == 'win32':
         raise
     win32 = None
 
+_select = _eintr_retry(select.select)
+
 #
 #
 #
@@ -118,6 +122,15 @@
     else:
         raise ValueError('address type of %r unrecognized' % address)
 
+
+class SentinelReady(Exception):
+    """
+    Raised when a sentinel is ready when polling.
+    """
+    def __init__(self, *args):
+        Exception.__init__(self, *args)
+        self.sentinels = args[0]
+
 #
 # Connection classes
 #
@@ -253,19 +266,17 @@
                               (offset + size) // itemsize])
             return size
 
-    def recv(self):
+    def recv(self, sentinels=None):
         """Receive a (picklable) object"""
         self._check_closed()
         self._check_readable()
-        buf = self._recv_bytes()
+        buf = self._recv_bytes(sentinels=sentinels)
         return pickle.loads(buf.getbuffer())
 
     def poll(self, timeout=0.0):
         """Whether there is any input available to be read"""
         self._check_closed()
         self._check_readable()
-        if timeout < 0.0:
-            timeout = None
         return self._poll(timeout)
 
 
@@ -274,61 +285,88 @@
     class PipeConnection(_ConnectionBase):
         """
         Connection class based on a Windows named pipe.
+        Overlapped I/O is used, so the handles must have been created
+        with FILE_FLAG_OVERLAPPED.
         """
+        _buffered = b''
 
         def _close(self):
             win32.CloseHandle(self._handle)
 
         def _send_bytes(self, buf):
-            nwritten = win32.WriteFile(self._handle, buf)
+            overlapped = win32.WriteFile(self._handle, buf, overlapped=True)
+            nwritten, complete = overlapped.GetOverlappedResult(True)
+            assert complete
             assert nwritten == len(buf)
 
-        def _recv_bytes(self, maxsize=None):
+        def _recv_bytes(self, maxsize=None, sentinels=()):
+            if sentinels:
+                self._poll(-1.0, sentinels)
             buf = io.BytesIO()
-            bufsize = 512
-            if maxsize is not None:
-                bufsize = min(bufsize, maxsize)
-            try:
-                firstchunk, complete = win32.ReadFile(self._handle, bufsize)
-            except IOError as e:
-                if e.errno == win32.ERROR_BROKEN_PIPE:
-                    raise EOFError
-                raise
-            lenfirstchunk = len(firstchunk)
-            buf.write(firstchunk)
-            if complete:
-                return buf
+            firstchunk = self._buffered
+            if firstchunk:
+                lenfirstchunk = len(firstchunk)
+                buf.write(firstchunk)
+                self._buffered = b''
+            else:
+                # A reasonable size for the first chunk transfer
+                bufsize = 128
+                if maxsize is not None and maxsize < bufsize:
+                    bufsize = maxsize
+                try:
+                    overlapped = win32.ReadFile(self._handle, bufsize, overlapped=True)
+                    lenfirstchunk, complete = overlapped.GetOverlappedResult(True)
+                    firstchunk = overlapped.getbuffer()
+                    assert lenfirstchunk == len(firstchunk)
+                except IOError as e:
+                    if e.errno == win32.ERROR_BROKEN_PIPE:
+                        raise EOFError
+                    raise
+                buf.write(firstchunk)
+                if complete:
+                    return buf
             navail, nleft = win32.PeekNamedPipe(self._handle)
             if maxsize is not None and lenfirstchunk + nleft > maxsize:
                 return None
-            lastchunk, complete = win32.ReadFile(self._handle, nleft)
-            assert complete
-            buf.write(lastchunk)
+            if nleft > 0:
+                overlapped = win32.ReadFile(self._handle, nleft, overlapped=True)
+                res, complete = overlapped.GetOverlappedResult(True)
+                assert res == nleft
+                assert complete
+                buf.write(overlapped.getbuffer())
             return buf
 
-        def _poll(self, timeout):
+        def _poll(self, timeout, sentinels=()):
+            # Fast non-blocking path
             navail, nleft = win32.PeekNamedPipe(self._handle)
             if navail > 0:
                 return True
             elif timeout == 0.0:
                 return False
-            # Setup a polling loop (translated straight from old
-            # pipe_connection.c)
+            # Blocking: use overlapped I/O
             if timeout < 0.0:
-                deadline = None
+                timeout = INFINITE
             else:
-                deadline = time.time() + timeout
-            delay = 0.001
-            max_delay = 0.02
-            while True:
-                time.sleep(delay)
-                navail, nleft = win32.PeekNamedPipe(self._handle)
-                if navail > 0:
-                    return True
-                if deadline and time.time() > deadline:
-                    return False
-                if delay < max_delay:
-                    delay += 0.001
+                timeout = int(timeout * 1000 + 0.5)
+            overlapped = win32.ReadFile(self._handle, 1, overlapped=True)
+            try:
+                handles = [overlapped.event]
+                handles += sentinels
+                res = win32.WaitForMultipleObjects(handles, False, timeout)
+            finally:
+                # Always cancel overlapped I/O in the same thread
+                # (because CancelIoEx() appears only in Vista)
+                overlapped.cancel()
+            if res == WAIT_TIMEOUT:
+                return False
+            idx = res - WAIT_OBJECT_0
+            if idx == 0:
+                # I/O was successful, store received data
+                overlapped.GetOverlappedResult(True)
+                self._buffered += overlapped.getbuffer()
+                return True
+            assert 0 < idx < len(handles)
+            raise SentinelReady([handles[idx]])
 
 
 class Connection(_ConnectionBase):
@@ -357,11 +395,18 @@
                 break
             buf = buf[n:]
 
-    def _recv(self, size, read=_read):
+    def _recv(self, size, sentinels=(), read=_read):
         buf = io.BytesIO()
+        handle = self._handle
+        if sentinels:
+            handles = [handle] + sentinels
         remaining = size
         while remaining > 0:
-            chunk = read(self._handle, remaining)
+            if sentinels:
+                r = _select(handles, [], [])[0]
+                if handle not in r:
+                    raise SentinelReady(r)
+            chunk = read(handle, remaining)
             n = len(chunk)
             if n == 0:
                 if remaining == size:
@@ -381,15 +426,17 @@
         if n > 0:
             self._send(buf)
 
-    def _recv_bytes(self, maxsize=None):
-        buf = self._recv(4)
+    def _recv_bytes(self, maxsize=None, sentinels=()):
+        buf = self._recv(4, sentinels)
         size, = struct.unpack("=i", buf.getvalue())
         if maxsize is not None and size > maxsize:
             return None
-        return self._recv(size)
+        return self._recv(size, sentinels)
 
     def _poll(self, timeout):
-        r = select.select([self._handle], [], [], timeout)[0]
+        if timeout < 0.0:
+            timeout = None
+        r = _select([self._handle], [], [], timeout)[0]
         return bool(r)
 
 
@@ -495,23 +542,21 @@
             obsize, ibsize = 0, BUFSIZE
 
         h1 = win32.CreateNamedPipe(
-            address, openmode,
+            address, openmode | win32.FILE_FLAG_OVERLAPPED,
             win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
             win32.PIPE_WAIT,
             1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
             )
         h2 = win32.CreateFile(
-            address, access, 0, win32.NULL, win32.OPEN_EXISTING, 0, win32.NULL
+            address, access, 0, win32.NULL, win32.OPEN_EXISTING,
+            win32.FILE_FLAG_OVERLAPPED, win32.NULL
             )
         win32.SetNamedPipeHandleState(
             h2, win32.PIPE_READMODE_MESSAGE, None, None
             )
 
-        try:
-            win32.ConnectNamedPipe(h1, win32.NULL)
-        except WindowsError as e:
-            if e.args[0] != win32.ERROR_PIPE_CONNECTED:
-                raise
+        overlapped = win32.ConnectNamedPipe(h1, overlapped=True)
+        overlapped.GetOverlappedResult(True)
 
         c1 = PipeConnection(h1, writable=duplex)
         c2 = PipeConnection(h2, readable=duplex)