bpo-31819: Add AbstractEventLoop.sock_recv_into() (#4051)
* bpo-31819: Add AbstractEventLoop.sock_recv_into()
* Add NEWS
* Add doc
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index 03af699..0dbd92c 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -461,6 +461,9 @@
def sock_recv(self, sock, nbytes):
raise NotImplementedError
+ def sock_recv_into(self, sock, buf):
+ raise NotImplementedError
+
def sock_sendall(self, sock, data):
raise NotImplementedError
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index 642f61e..5e7a397 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -439,6 +439,9 @@
def sock_recv(self, sock, n):
return self._proactor.recv(sock, n)
+ def sock_recv_into(self, sock, buf):
+ return self._proactor.recv_into(sock, buf)
+
def sock_sendall(self, sock, data):
return self._proactor.send(sock, data)
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 4b40356..7143ca2 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -386,6 +386,41 @@
else:
fut.set_result(data)
+ def sock_recv_into(self, sock, buf):
+ """Receive data from the socket.
+
+ The received data is written into *buf* (a writable buffer).
+ The return value is the number of bytes written.
+
+ This method is a coroutine.
+ """
+ if self._debug and sock.gettimeout() != 0:
+ raise ValueError("the socket must be non-blocking")
+ fut = self.create_future()
+ self._sock_recv_into(fut, False, sock, buf)
+ return fut
+
+ def _sock_recv_into(self, fut, registered, sock, buf):
+ # _sock_recv_into() can add itself as an I/O callback if the operation
+ # can't be done immediately. Don't use it directly, call sock_recv_into().
+ fd = sock.fileno()
+ if registered:
+ # Remove the callback early. It should be rare that the
+ # selector says the fd is ready but the call still returns
+ # EAGAIN, and I am willing to take a hit in that case in
+ # order to simplify the common case.
+ self.remove_reader(fd)
+ if fut.cancelled():
+ return
+ try:
+ nbytes = sock.recv_into(buf)
+ except (BlockingIOError, InterruptedError):
+ self.add_reader(fd, self._sock_recv_into, fut, True, sock, buf)
+ except Exception as exc:
+ fut.set_exception(exc)
+ else:
+ fut.set_result(nbytes)
+
def sock_sendall(self, sock, data):
"""Send data to the socket.
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index b777dd0..6045ba0 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -448,6 +448,28 @@
return self._register(ov, conn, finish_recv)
+ def recv_into(self, conn, buf, flags=0):
+ self._register_with_iocp(conn)
+ ov = _overlapped.Overlapped(NULL)
+ try:
+ if isinstance(conn, socket.socket):
+ ov.WSARecvInto(conn.fileno(), buf, flags)
+ else:
+ ov.ReadFileInto(conn.fileno(), buf)
+ except BrokenPipeError:
+ return self._result(b'')
+
+ def finish_recv(trans, key, ov):
+ try:
+ return ov.getresult()
+ except OSError as exc:
+ if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
+ raise ConnectionResetError(*exc.args)
+ else:
+ raise
+
+ return self._register(ov, conn, finish_recv)
+
def send(self, conn, buf, flags=0):
self._register_with_iocp(conn)
ov = _overlapped.Overlapped(NULL)
diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py
index 736f703..0ea9c08 100644
--- a/Lib/test/test_asyncio/test_events.py
+++ b/Lib/test/test_asyncio/test_events.py
@@ -427,6 +427,9 @@
self.loop.sock_recv(sock, 1024))
with self.assertRaises(ValueError):
self.loop.run_until_complete(
+ self.loop.sock_recv_into(sock, bytearray()))
+ with self.assertRaises(ValueError):
+ self.loop.run_until_complete(
self.loop.sock_accept(sock))
# test in non-blocking mode
@@ -443,16 +446,37 @@
sock.close()
self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
+ def _basetest_sock_recv_into(self, httpd, sock):
+ # same as _basetest_sock_client_ops, but using sock_recv_into
+ sock.setblocking(False)
+ self.loop.run_until_complete(
+ self.loop.sock_connect(sock, httpd.address))
+ self.loop.run_until_complete(
+ self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
+ data = bytearray(1024)
+ with memoryview(data) as buf:
+ nbytes = self.loop.run_until_complete(
+ self.loop.sock_recv_into(sock, buf[:1024]))
+ # consume data
+ self.loop.run_until_complete(
+ self.loop.sock_recv_into(sock, buf[nbytes:]))
+ sock.close()
+ self.assertTrue(data.startswith(b'HTTP/1.0 200 OK'))
+
def test_sock_client_ops(self):
with test_utils.run_test_server() as httpd:
sock = socket.socket()
self._basetest_sock_client_ops(httpd, sock)
+ sock = socket.socket()
+ self._basetest_sock_recv_into(httpd, sock)
@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_unix_sock_client_ops(self):
with test_utils.run_test_unix_server() as httpd:
sock = socket.socket(socket.AF_UNIX)
self._basetest_sock_client_ops(httpd, sock)
+ sock = socket.socket(socket.AF_UNIX)
+ self._basetest_sock_recv_into(httpd, sock)
def test_sock_client_fail(self):
# Make sure that we will get an unused port
@@ -2613,6 +2637,8 @@
self.assertRaises(
NotImplementedError, loop.sock_recv, f, 10)
self.assertRaises(
+ NotImplementedError, loop.sock_recv_into, f, 10)
+ self.assertRaises(
NotImplementedError, loop.sock_sendall, f, 10)
self.assertRaises(
NotImplementedError, loop.sock_connect, f, f)
diff --git a/Lib/test/test_asyncio/test_proactor_events.py b/Lib/test/test_asyncio/test_proactor_events.py
index d76da66..7a8b523 100644
--- a/Lib/test/test_asyncio/test_proactor_events.py
+++ b/Lib/test/test_asyncio/test_proactor_events.py
@@ -489,6 +489,11 @@
self.loop.sock_recv(self.sock, 1024)
self.proactor.recv.assert_called_with(self.sock, 1024)
+ def test_sock_recv_into(self):
+ buf = bytearray(10)
+ self.loop.sock_recv_into(self.sock, buf)
+ self.proactor.recv_into.assert_called_with(self.sock, buf)
+
def test_sock_sendall(self):
self.loop.sock_sendall(self.sock, b'data')
self.proactor.send.assert_called_with(self.sock, b'data')