bpo-32622: Native sendfile on windows (GH-5565)


* Support sendfile on Windows Proactor event loop naively.
(cherry picked from commit a19fb3c6aaa7632410d1d9dcb395d7101d124da4)

Co-authored-by: Andrew Svetlov <andrew.svetlov@gmail.com>
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index 10ca6f8..b675c82 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -6,11 +6,14 @@
 
 __all__ = 'BaseProactorEventLoop',
 
+import io
+import os
 import socket
 import warnings
 
 from . import base_events
 from . import constants
+from . import events
 from . import futures
 from . import protocols
 from . import sslproto
@@ -107,6 +110,11 @@
             self._force_close(exc)
 
     def _force_close(self, exc):
+        if self._empty_waiter is not None:
+            if exc is None:
+                self._empty_waiter.set_result(None)
+            else:
+                self._empty_waiter.set_exception(exc)
         if self._closing:
             return
         self._closing = True
@@ -327,6 +335,10 @@
 
     _start_tls_compatible = True
 
+    def __init__(self, *args, **kw):
+        super().__init__(*args, **kw)
+        self._empty_waiter = None
+
     def write(self, data):
         if not isinstance(data, (bytes, bytearray, memoryview)):
             raise TypeError(
@@ -334,6 +346,8 @@
                 f"not {type(data).__name__}")
         if self._eof_written:
             raise RuntimeError('write_eof() already called')
+        if self._empty_waiter is not None:
+            raise RuntimeError('unable to write; sendfile is in progress')
 
         if not data:
             return
@@ -393,6 +407,8 @@
                     self._maybe_pause_protocol()
                 else:
                     self._write_fut.add_done_callback(self._loop_writing)
+            if self._empty_waiter is not None and self._write_fut is None:
+                self._empty_waiter.set_result(None)
         except ConnectionResetError as exc:
             self._force_close(exc)
         except OSError as exc:
@@ -407,6 +423,17 @@
     def abort(self):
         self._force_close(None)
 
+    def _make_empty_waiter(self):
+        if self._empty_waiter is not None:
+            raise RuntimeError("Empty waiter is already set")
+        self._empty_waiter = self._loop.create_future()
+        if self._write_fut is None:
+            self._empty_waiter.set_result(None)
+        return self._empty_waiter
+
+    def _reset_empty_waiter(self):
+        self._empty_waiter = None
+
 
 class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport):
     def __init__(self, *args, **kw):
@@ -447,7 +474,7 @@
                                transports.Transport):
     """Transport for connected sockets."""
 
-    _sendfile_compatible = constants._SendfileMode.FALLBACK
+    _sendfile_compatible = constants._SendfileMode.TRY_NATIVE
 
     def _set_extra(self, sock):
         self._extra['socket'] = sock
@@ -556,6 +583,47 @@
     async def sock_accept(self, sock):
         return await self._proactor.accept(sock)
 
+    async def _sock_sendfile_native(self, sock, file, offset, count):
+        try:
+            fileno = file.fileno()
+        except (AttributeError, io.UnsupportedOperation) as err:
+            raise events.SendfileNotAvailableError("not a regular file")
+        try:
+            fsize = os.fstat(fileno).st_size
+        except OSError as err:
+            raise events.SendfileNotAvailableError("not a regular file")
+        blocksize = count if count else fsize
+        if not blocksize:
+            return 0  # empty file
+
+        blocksize = min(blocksize, 0xffff_ffff)
+        end_pos = min(offset + count, fsize) if count else fsize
+        offset = min(offset, fsize)
+        total_sent = 0
+        try:
+            while True:
+                blocksize = min(end_pos - offset, blocksize)
+                if blocksize <= 0:
+                    return total_sent
+                await self._proactor.sendfile(sock, file, offset, blocksize)
+                offset += blocksize
+                total_sent += blocksize
+        finally:
+            if total_sent > 0:
+                file.seek(offset)
+
+    async def _sendfile_native(self, transp, file, offset, count):
+        resume_reading = transp.is_reading()
+        transp.pause_reading()
+        await transp._make_empty_waiter()
+        try:
+            return await self.sock_sendfile(transp._sock, file, offset, count,
+                                            fallback=False)
+        finally:
+            transp._reset_empty_waiter()
+            if resume_reading:
+                transp.resume_reading()
+
     def _close_self_pipe(self):
         if self._self_reading_future is not None:
             self._self_reading_future.cancel()
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index f91fcdd..d22edec 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -4,6 +4,7 @@
 import _winapi
 import errno
 import math
+import msvcrt
 import socket
 import struct
 import weakref
@@ -527,6 +528,27 @@
 
         return self._register(ov, conn, finish_connect)
 
+    def sendfile(self, sock, file, offset, count):
+        self._register_with_iocp(sock)
+        ov = _overlapped.Overlapped(NULL)
+        offset_low = offset & 0xffff_ffff
+        offset_high = (offset >> 32) & 0xffff_ffff
+        ov.TransmitFile(sock.fileno(),
+                        msvcrt.get_osfhandle(file.fileno()),
+                        offset_low, offset_high,
+                        count, 0, 0)
+
+        def finish_sendfile(trans, key, ov):
+            try:
+                return ov.getresult()
+            except OSError as exc:
+                if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
+                                    _overlapped.ERROR_OPERATION_ABORTED):
+                    raise ConnectionResetError(*exc.args)
+                else:
+                    raise
+        return self._register(ov, sock, finish_sendfile)
+
     def accept_pipe(self, pipe):
         self._register_with_iocp(pipe)
         ov = _overlapped.Overlapped(NULL)