Issue #12328: Fix multiprocessing's use of overlapped I/O on Windows.
Also, add a multiprocessing.connection.wait(rlist, timeout=None) function
for polling multiple objects at once.  Patch by sbt.

Complete changelist from sbt's patch:

* Adds a wait(rlist, timeout=None) function for polling multiple
  objects at once.  On Unix this is just a wrapper for
  select(rlist, [], [], timeout=None).

* Removes use of the SentinelReady exception and the sentinels argument
  to certain methods.  concurrent.futures.process has been changed to
  use wait() instead of SentinelReady.

* Fixes bugs concerning PipeConnection.poll() and messages of zero
  length.

* Fixes PipeListener.accept() to call ConnectNamedPipe() with
  overlapped=True.

* Fixes Queue.empty() and SimpleQueue.empty() so that they are
  threadsafe on Windows.

* Now PipeConnection.poll() and wait() will not modify the pipe except
  possibly by consuming a zero length message.  (Previously poll()
  could consume a partial message.)

* All of multiprocesing's pipe related blocking functions/methods are
  now interruptible by SIGINT on Windows.
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py
index f141bd4..35d85d7 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/test_multiprocessing.py
@@ -1811,6 +1811,84 @@
             p.join()
             l.close()
 
+class _TestPoll(unittest.TestCase):
+
+    ALLOWED_TYPES = ('processes', 'threads')
+
+    def test_empty_string(self):
+        a, b = self.Pipe()
+        self.assertEqual(a.poll(), False)
+        b.send_bytes(b'')
+        self.assertEqual(a.poll(), True)
+        self.assertEqual(a.poll(), True)
+
+    @classmethod
+    def _child_strings(cls, conn, strings):
+        for s in strings:
+            time.sleep(0.1)
+            conn.send_bytes(s)
+        conn.close()
+
+    def test_strings(self):
+        strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
+        a, b = self.Pipe()
+        p = self.Process(target=self._child_strings, args=(b, strings))
+        p.start()
+
+        for s in strings:
+            for i in range(200):
+                if a.poll(0.01):
+                    break
+            x = a.recv_bytes()
+            self.assertEqual(s, x)
+
+        p.join()
+
+    @classmethod
+    def _child_boundaries(cls, r):
+        # Polling may "pull" a message in to the child process, but we
+        # don't want it to pull only part of a message, as that would
+        # corrupt the pipe for any other processes which might later
+        # read from it.
+        r.poll(5)
+
+    def test_boundaries(self):
+        r, w = self.Pipe(False)
+        p = self.Process(target=self._child_boundaries, args=(r,))
+        p.start()
+        time.sleep(2)
+        L = [b"first", b"second"]
+        for obj in L:
+            w.send_bytes(obj)
+        w.close()
+        p.join()
+        self.assertIn(r.recv_bytes(), L)
+
+    @classmethod
+    def _child_dont_merge(cls, b):
+        b.send_bytes(b'a')
+        b.send_bytes(b'b')
+        b.send_bytes(b'cd')
+
+    def test_dont_merge(self):
+        a, b = self.Pipe()
+        self.assertEqual(a.poll(0.0), False)
+        self.assertEqual(a.poll(0.1), False)
+
+        p = self.Process(target=self._child_dont_merge, args=(b,))
+        p.start()
+
+        self.assertEqual(a.recv_bytes(), b'a')
+        self.assertEqual(a.poll(1.0), True)
+        self.assertEqual(a.poll(1.0), True)
+        self.assertEqual(a.recv_bytes(), b'b')
+        self.assertEqual(a.poll(1.0), True)
+        self.assertEqual(a.poll(1.0), True)
+        self.assertEqual(a.poll(0.0), True)
+        self.assertEqual(a.recv_bytes(), b'cd')
+
+        p.join()
+
 #
 # Test of sending connection and socket objects between processes
 #
@@ -2404,8 +2482,163 @@
         flike.flush()
         assert sio.getvalue() == 'foo'
 
+
+class TestWait(unittest.TestCase):
+
+    @classmethod
+    def _child_test_wait(cls, w, slow):
+        for i in range(10):
+            if slow:
+                time.sleep(random.random()*0.1)
+            w.send((i, os.getpid()))
+        w.close()
+
+    def test_wait(self, slow=False):
+        from multiprocessing import Pipe, Process
+        from multiprocessing.connection import wait
+        readers = []
+        procs = []
+        messages = []
+
+        for i in range(4):
+            r, w = Pipe(duplex=False)
+            p = Process(target=self._child_test_wait, args=(w, slow))
+            p.daemon = True
+            p.start()
+            w.close()
+            readers.append(r)
+            procs.append(p)
+
+        while readers:
+            for r in wait(readers):
+                try:
+                    msg = r.recv()
+                except EOFError:
+                    readers.remove(r)
+                    r.close()
+                else:
+                    messages.append(msg)
+
+        messages.sort()
+        expected = sorted((i, p.pid) for i in range(10) for p in procs)
+        self.assertEqual(messages, expected)
+
+    @classmethod
+    def _child_test_wait_socket(cls, address, slow):
+        s = socket.socket()
+        s.connect(address)
+        for i in range(10):
+            if slow:
+                time.sleep(random.random()*0.1)
+            s.sendall(('%s\n' % i).encode('ascii'))
+        s.close()
+
+    def test_wait_socket(self, slow=False):
+        from multiprocessing import Process
+        from multiprocessing.connection import wait
+        l = socket.socket()
+        l.bind(('', 0))
+        l.listen(4)
+        addr = ('localhost', l.getsockname()[1])
+        readers = []
+        procs = []
+        dic = {}
+
+        for i in range(4):
+            p = Process(target=self._child_test_wait_socket, args=(addr, slow))
+            p.daemon = True
+            p.start()
+            procs.append(p)
+
+        for i in range(4):
+            r, _ = l.accept()
+            readers.append(r)
+            dic[r] = []
+        l.close()
+
+        while readers:
+            for r in wait(readers):
+                msg = r.recv(32)
+                if not msg:
+                    readers.remove(r)
+                    r.close()
+                else:
+                    dic[r].append(msg)
+
+        expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
+        for v in dic.values():
+            self.assertEqual(b''.join(v), expected)
+
+    def test_wait_slow(self):
+        self.test_wait(True)
+
+    def test_wait_socket_slow(self):
+        self.test_wait(True)
+
+    def test_wait_timeout(self):
+        from multiprocessing.connection import wait
+
+        expected = 1
+        a, b = multiprocessing.Pipe()
+
+        start = time.time()
+        res = wait([a, b], 1)
+        delta = time.time() - start
+
+        self.assertEqual(res, [])
+        self.assertLess(delta, expected + 0.2)
+        self.assertGreater(delta, expected - 0.2)
+
+        b.send(None)
+
+        start = time.time()
+        res = wait([a, b], 1)
+        delta = time.time() - start
+
+        self.assertEqual(res, [a])
+        self.assertLess(delta, 0.2)
+
+    def test_wait_integer(self):
+        from multiprocessing.connection import wait
+
+        expected = 5
+        a, b = multiprocessing.Pipe()
+        p = multiprocessing.Process(target=time.sleep, args=(expected,))
+
+        p.start()
+        self.assertIsInstance(p.sentinel, int)
+
+        start = time.time()
+        res = wait([a, p.sentinel, b], expected + 20)
+        delta = time.time() - start
+
+        self.assertEqual(res, [p.sentinel])
+        self.assertLess(delta, expected + 1)
+        self.assertGreater(delta, expected - 1)
+
+        a.send(None)
+
+        start = time.time()
+        res = wait([a, p.sentinel, b], 20)
+        delta = time.time() - start
+
+        self.assertEqual(res, [p.sentinel, b])
+        self.assertLess(delta, 0.2)
+
+        b.send(None)
+
+        start = time.time()
+        res = wait([a, p.sentinel, b], 20)
+        delta = time.time() - start
+
+        self.assertEqual(res, [a, p.sentinel, b])
+        self.assertLess(delta, 0.2)
+
+        p.join()
+
+
 testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
-                   TestStdinBadfiledescriptor]
+                   TestStdinBadfiledescriptor, TestWait]
 
 #
 #