bpo-31819: Add AbstractEventLoop.sock_recv_into() (#4051)

* bpo-31819: Add AbstractEventLoop.sock_recv_into()

* Add NEWS

* Add doc
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index 03af699..0dbd92c 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -461,6 +461,9 @@
     def sock_recv(self, sock, nbytes):
         raise NotImplementedError
 
+    def sock_recv_into(self, sock, buf):
+        raise NotImplementedError
+
     def sock_sendall(self, sock, data):
         raise NotImplementedError
 
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index 642f61e..5e7a397 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -439,6 +439,9 @@
     def sock_recv(self, sock, n):
         return self._proactor.recv(sock, n)
 
+    def sock_recv_into(self, sock, buf):
+        return self._proactor.recv_into(sock, buf)
+
     def sock_sendall(self, sock, data):
         return self._proactor.send(sock, data)
 
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 4b40356..7143ca2 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -386,6 +386,41 @@
         else:
             fut.set_result(data)
 
+    def sock_recv_into(self, sock, buf):
+        """Receive data from the socket.
+
+        The received data is written into *buf* (a writable buffer).
+        The return value is the number of bytes written.
+
+        This method is a coroutine.
+        """
+        if self._debug and sock.gettimeout() != 0:
+            raise ValueError("the socket must be non-blocking")
+        fut = self.create_future()
+        self._sock_recv_into(fut, False, sock, buf)
+        return fut
+
+    def _sock_recv_into(self, fut, registered, sock, buf):
+        # _sock_recv_into() can add itself as an I/O callback if the operation
+        # can't be done immediately. Don't use it directly, call sock_recv_into().
+        fd = sock.fileno()
+        if registered:
+            # Remove the callback early.  It should be rare that the
+            # selector says the fd is ready but the call still returns
+            # EAGAIN, and I am willing to take a hit in that case in
+            # order to simplify the common case.
+            self.remove_reader(fd)
+        if fut.cancelled():
+            return
+        try:
+            nbytes = sock.recv_into(buf)
+        except (BlockingIOError, InterruptedError):
+            self.add_reader(fd, self._sock_recv_into, fut, True, sock, buf)
+        except Exception as exc:
+            fut.set_exception(exc)
+        else:
+            fut.set_result(nbytes)
+
     def sock_sendall(self, sock, data):
         """Send data to the socket.
 
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index b777dd0..6045ba0 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -448,6 +448,28 @@
 
         return self._register(ov, conn, finish_recv)
 
+    def recv_into(self, conn, buf, flags=0):
+        self._register_with_iocp(conn)
+        ov = _overlapped.Overlapped(NULL)
+        try:
+            if isinstance(conn, socket.socket):
+                ov.WSARecvInto(conn.fileno(), buf, flags)
+            else:
+                ov.ReadFileInto(conn.fileno(), buf)
+        except BrokenPipeError:
+            return self._result(b'')
+
+        def finish_recv(trans, key, ov):
+            try:
+                return ov.getresult()
+            except OSError as exc:
+                if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
+                    raise ConnectionResetError(*exc.args)
+                else:
+                    raise
+
+        return self._register(ov, conn, finish_recv)
+
     def send(self, conn, buf, flags=0):
         self._register_with_iocp(conn)
         ov = _overlapped.Overlapped(NULL)