bpo-31819: Add AbstractEventLoop.sock_recv_into() (#4051)

* bpo-31819: Add AbstractEventLoop.sock_recv_into()

* Add NEWS

* Add doc
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index 03af699..0dbd92c 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -461,6 +461,9 @@
     def sock_recv(self, sock, nbytes):
         raise NotImplementedError
 
+    def sock_recv_into(self, sock, buf):
+        raise NotImplementedError
+
     def sock_sendall(self, sock, data):
         raise NotImplementedError
 
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index 642f61e..5e7a397 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -439,6 +439,9 @@
     def sock_recv(self, sock, n):
         return self._proactor.recv(sock, n)
 
+    def sock_recv_into(self, sock, buf):
+        return self._proactor.recv_into(sock, buf)
+
     def sock_sendall(self, sock, data):
         return self._proactor.send(sock, data)
 
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 4b40356..7143ca2 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -386,6 +386,41 @@
         else:
             fut.set_result(data)
 
+    def sock_recv_into(self, sock, buf):
+        """Receive data from the socket.
+
+        The received data is written into *buf* (a writable buffer).
+        The return value is the number of bytes written.
+
+        This method is a coroutine.
+        """
+        if self._debug and sock.gettimeout() != 0:
+            raise ValueError("the socket must be non-blocking")
+        fut = self.create_future()
+        self._sock_recv_into(fut, False, sock, buf)
+        return fut
+
+    def _sock_recv_into(self, fut, registered, sock, buf):
+        # _sock_recv_into() can add itself as an I/O callback if the operation
+        # can't be done immediately. Don't use it directly, call sock_recv_into().
+        fd = sock.fileno()
+        if registered:
+            # Remove the callback early.  It should be rare that the
+            # selector says the fd is ready but the call still returns
+            # EAGAIN, and I am willing to take a hit in that case in
+            # order to simplify the common case.
+            self.remove_reader(fd)
+        if fut.cancelled():
+            return
+        try:
+            nbytes = sock.recv_into(buf)
+        except (BlockingIOError, InterruptedError):
+            self.add_reader(fd, self._sock_recv_into, fut, True, sock, buf)
+        except Exception as exc:
+            fut.set_exception(exc)
+        else:
+            fut.set_result(nbytes)
+
     def sock_sendall(self, sock, data):
         """Send data to the socket.
 
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index b777dd0..6045ba0 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -448,6 +448,28 @@
 
         return self._register(ov, conn, finish_recv)
 
+    def recv_into(self, conn, buf, flags=0):
+        self._register_with_iocp(conn)
+        ov = _overlapped.Overlapped(NULL)
+        try:
+            if isinstance(conn, socket.socket):
+                ov.WSARecvInto(conn.fileno(), buf, flags)
+            else:
+                ov.ReadFileInto(conn.fileno(), buf)
+        except BrokenPipeError:
+            return self._result(b'')
+
+        def finish_recv(trans, key, ov):
+            try:
+                return ov.getresult()
+            except OSError as exc:
+                if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
+                    raise ConnectionResetError(*exc.args)
+                else:
+                    raise
+
+        return self._register(ov, conn, finish_recv)
+
     def send(self, conn, buf, flags=0):
         self._register_with_iocp(conn)
         ov = _overlapped.Overlapped(NULL)
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index 736f703..0ea9c08 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -427,6 +427,9 @@
                     self.loop.sock_recv(sock, 1024))
             with self.assertRaises(ValueError):
                 self.loop.run_until_complete(
+                    self.loop.sock_recv_into(sock, bytearray()))
+            with self.assertRaises(ValueError):
+                self.loop.run_until_complete(
                     self.loop.sock_accept(sock))
 
         # test in non-blocking mode
@@ -443,16 +446,37 @@
         sock.close()
         self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
 
+    def _basetest_sock_recv_into(self, httpd, sock):
+        # same as _basetest_sock_client_ops, but using sock_recv_into
+        sock.setblocking(False)
+        self.loop.run_until_complete(
+            self.loop.sock_connect(sock, httpd.address))
+        self.loop.run_until_complete(
+            self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
+        data = bytearray(1024)
+        with memoryview(data) as buf:
+            nbytes = self.loop.run_until_complete(
+                self.loop.sock_recv_into(sock, buf[:1024]))
+            # consume data
+            self.loop.run_until_complete(
+                self.loop.sock_recv_into(sock, buf[nbytes:]))
+        sock.close()
+        self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
+
     def test_sock_client_ops(self):
         with test_utils.run_test_server() as httpd:
             sock = socket.socket()
             self._basetest_sock_client_ops(httpd, sock)
+            sock = socket.socket()
+            self._basetest_sock_recv_into(httpd, sock)
 
     @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
     def test_unix_sock_client_ops(self):
         with test_utils.run_test_unix_server() as httpd:
             sock = socket.socket(socket.AF_UNIX)
             self._basetest_sock_client_ops(httpd, sock)
+            sock = socket.socket(socket.AF_UNIX)
+            self._basetest_sock_recv_into(httpd, sock)
 
     def test_sock_client_fail(self):
         # Make sure that we will get an unused port
@@ -2613,6 +2637,8 @@
         self.assertRaises(
             NotImplementedError, loop.sock_recv, f, 10)
         self.assertRaises(
+            NotImplementedError, loop.sock_recv_into, f, 10)
+        self.assertRaises(
             NotImplementedError, loop.sock_sendall, f, 10)
         self.assertRaises(
             NotImplementedError, loop.sock_connect, f, f)
diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py
index d76da66..7a8b523 100644
--- a/Lib/test/test_asyncio/test_proactor_events.py
+++ b/Lib/test/test_asyncio/test_proactor_events.py
@@ -489,6 +489,11 @@
         self.loop.sock_recv(self.sock, 1024)
         self.proactor.recv.assert_called_with(self.sock, 1024)
 
+    def test_sock_recv_into(self):
+        buf = bytearray(10)
+        self.loop.sock_recv_into(self.sock, buf)
+        self.proactor.recv_into.assert_called_with(self.sock, buf)
+
     def test_sock_sendall(self):
         self.loop.sock_sendall(self.sock, b'data')
         self.proactor.send.assert_called_with(self.sock, b'data')