blob: 772ddf4dfebd518826c7f2006bb4cc8880f0ad46 [file] [log] [blame]
Yury Selivanovb0b0e622014-02-18 22:27:48 -05001"""Selector and proactor event loops for Windows."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Victor Stinner4271dfd2017-11-28 15:19:56 +01003import _overlapped
Victor Stinnerf2e17682014-01-31 16:25:24 +01004import _winapi
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import errno
Victor Stinnerf2e17682014-01-31 16:25:24 +01006import math
Andrew Svetlova19fb3c2018-02-25 19:32:14 +03007import msvcrt
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008import socket
Victor Stinnerf2e17682014-01-31 16:25:24 +01009import struct
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080012from . import events
Guido van Rossum59691282013-10-30 14:52:03 -070013from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014from . import futures
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070015from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import proactor_events
17from . import selector_events
18from . import tasks
19from . import windows_utils
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
22
Yury Selivanov6370f342017-12-10 18:36:12 -050023__all__ = (
24 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
Yury Selivanov8f404292018-06-07 20:44:57 -040025 'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
26 'WindowsProactorEventLoopPolicy',
Yury Selivanov6370f342017-12-10 18:36:12 -050027)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
29
30NULL = 0
31INFINITE = 0xffffffff
32ERROR_CONNECTION_REFUSED = 1225
33ERROR_CONNECTION_ABORTED = 1236
34
Victor Stinner7ffa2c52015-01-22 22:55:08 +010035# Initial delay in seconds for connect_pipe() before retrying to connect
36CONNECT_PIPE_INIT_DELAY = 0.001
37
38# Maximum delay in seconds for connect_pipe() before retrying to connect
39CONNECT_PIPE_MAX_DELAY = 0.100
40
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
42class _OverlappedFuture(futures.Future):
43 """Subclass of Future which represents an overlapped operation.
44
45 Cancelling it will immediately cancel the overlapped operation.
46 """
47
48 def __init__(self, ov, *, loop=None):
49 super().__init__(loop=loop)
Victor Stinnerfea6a102014-07-25 00:54:53 +020050 if self._source_traceback:
51 del self._source_traceback[-1]
Victor Stinner18a28dc2014-07-25 13:05:20 +020052 self._ov = ov
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053
Victor Stinner313a9802014-07-29 12:58:23 +020054 def _repr_info(self):
55 info = super()._repr_info()
Victor Stinner18a28dc2014-07-25 13:05:20 +020056 if self._ov is not None:
57 state = 'pending' if self._ov.pending else 'completed'
Yury Selivanov6370f342017-12-10 18:36:12 -050058 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
Victor Stinner313a9802014-07-29 12:58:23 +020059 return info
Victor Stinnere912e652014-07-12 03:11:53 +020060
Victor Stinner18a28dc2014-07-25 13:05:20 +020061 def _cancel_overlapped(self):
62 if self._ov is None:
63 return
64 try:
65 self._ov.cancel()
66 except OSError as exc:
67 context = {
68 'message': 'Cancelling an overlapped future failed',
69 'exception': exc,
70 'future': self,
71 }
72 if self._source_traceback:
73 context['source_traceback'] = self._source_traceback
74 self._loop.call_exception_handler(context)
75 self._ov = None
76
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 def cancel(self):
Victor Stinner18a28dc2014-07-25 13:05:20 +020078 self._cancel_overlapped()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 return super().cancel()
80
Victor Stinner18a28dc2014-07-25 13:05:20 +020081 def set_exception(self, exception):
82 super().set_exception(exception)
83 self._cancel_overlapped()
84
Victor Stinner51e44ea2014-07-26 00:58:34 +020085 def set_result(self, result):
86 super().set_result(result)
87 self._ov = None
88
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070089
Victor Stinnerd0a28de2015-01-21 23:39:51 +010090class _BaseWaitHandleFuture(futures.Future):
Guido van Rossum90fb9142013-10-30 14:44:05 -070091 """Subclass of Future which represents a wait handle."""
92
Victor Stinnerd0a28de2015-01-21 23:39:51 +010093 def __init__(self, ov, handle, wait_handle, *, loop=None):
Guido van Rossum90fb9142013-10-30 14:44:05 -070094 super().__init__(loop=loop)
Victor Stinner313a9802014-07-29 12:58:23 +020095 if self._source_traceback:
96 del self._source_traceback[-1]
Victor Stinnerd0a28de2015-01-21 23:39:51 +010097 # Keep a reference to the Overlapped object to keep it alive until the
98 # wait is unregistered
Victor Stinner313a9802014-07-29 12:58:23 +020099 self._ov = ov
Victor Stinner18a28dc2014-07-25 13:05:20 +0200100 self._handle = handle
Guido van Rossum90fb9142013-10-30 14:44:05 -0700101 self._wait_handle = wait_handle
102
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100103 # Should we call UnregisterWaitEx() if the wait completes
104 # or is cancelled?
105 self._registered = True
106
Victor Stinner18a28dc2014-07-25 13:05:20 +0200107 def _poll(self):
108 # non-blocking wait: use a timeout of 0 millisecond
109 return (_winapi.WaitForSingleObject(self._handle, 0) ==
110 _winapi.WAIT_OBJECT_0)
111
Victor Stinner313a9802014-07-29 12:58:23 +0200112 def _repr_info(self):
113 info = super()._repr_info()
Yury Selivanov6370f342017-12-10 18:36:12 -0500114 info.append(f'handle={self._handle:#x}')
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100115 if self._handle is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200116 state = 'signaled' if self._poll() else 'waiting'
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100117 info.append(state)
118 if self._wait_handle is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500119 info.append(f'wait_handle={self._wait_handle:#x}')
Victor Stinner313a9802014-07-29 12:58:23 +0200120 return info
Victor Stinner18a28dc2014-07-25 13:05:20 +0200121
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100122 def _unregister_wait_cb(self, fut):
123 # The wait was unregistered: it's not safe to destroy the Overlapped
124 # object
125 self._ov = None
126
Victor Stinner313a9802014-07-29 12:58:23 +0200127 def _unregister_wait(self):
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100128 if not self._registered:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200129 return
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100130 self._registered = False
131
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100132 wait_handle = self._wait_handle
133 self._wait_handle = None
Guido van Rossum90fb9142013-10-30 14:44:05 -0700134 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100135 _overlapped.UnregisterWait(wait_handle)
Victor Stinnerb2614752014-08-25 23:20:52 +0200136 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100137 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerb2614752014-08-25 23:20:52 +0200138 context = {
139 'message': 'Failed to unregister the wait handle',
140 'exception': exc,
141 'future': self,
142 }
143 if self._source_traceback:
144 context['source_traceback'] = self._source_traceback
145 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100146 return
147 # ERROR_IO_PENDING means that the unregister is pending
148
149 self._unregister_wait_cb(None)
Victor Stinnerfea6a102014-07-25 00:54:53 +0200150
151 def cancel(self):
Victor Stinner313a9802014-07-29 12:58:23 +0200152 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100153 return super().cancel()
Victor Stinner313a9802014-07-29 12:58:23 +0200154
155 def set_exception(self, exception):
Victor Stinner313a9802014-07-29 12:58:23 +0200156 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100157 super().set_exception(exception)
Victor Stinner313a9802014-07-29 12:58:23 +0200158
159 def set_result(self, result):
Victor Stinner313a9802014-07-29 12:58:23 +0200160 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100161 super().set_result(result)
162
163
164class _WaitCancelFuture(_BaseWaitHandleFuture):
165 """Subclass of Future which represents a wait for the cancellation of a
166 _WaitHandleFuture using an event.
167 """
168
169 def __init__(self, ov, event, wait_handle, *, loop=None):
170 super().__init__(ov, event, wait_handle, loop=loop)
171
172 self._done_callback = None
173
Victor Stinner1ca93922015-01-22 00:17:54 +0100174 def cancel(self):
175 raise RuntimeError("_WaitCancelFuture must not be cancelled")
176
INADA Naokia8363622016-10-21 12:30:15 +0900177 def set_result(self, result):
178 super().set_result(result)
179 if self._done_callback is not None:
180 self._done_callback(self)
181
182 def set_exception(self, exception):
183 super().set_exception(exception)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100184 if self._done_callback is not None:
185 self._done_callback(self)
186
187
188class _WaitHandleFuture(_BaseWaitHandleFuture):
189 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
190 super().__init__(ov, handle, wait_handle, loop=loop)
191 self._proactor = proactor
192 self._unregister_proactor = True
193 self._event = _overlapped.CreateEvent(None, True, False, None)
194 self._event_fut = None
195
196 def _unregister_wait_cb(self, fut):
197 if self._event is not None:
198 _winapi.CloseHandle(self._event)
199 self._event = None
200 self._event_fut = None
201
202 # If the wait was cancelled, the wait may never be signalled, so
203 # it's required to unregister it. Otherwise, IocpProactor.close() will
204 # wait forever for an event which will never come.
205 #
206 # If the IocpProactor already received the event, it's safe to call
207 # _unregister() because we kept a reference to the Overlapped object
Martin Panter6245cb32016-04-15 02:14:19 +0000208 # which is used as a unique key.
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100209 self._proactor._unregister(self._ov)
210 self._proactor = None
211
212 super()._unregister_wait_cb(fut)
213
214 def _unregister_wait(self):
215 if not self._registered:
216 return
217 self._registered = False
218
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100219 wait_handle = self._wait_handle
220 self._wait_handle = None
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100221 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100222 _overlapped.UnregisterWaitEx(wait_handle, self._event)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100223 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100224 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100225 context = {
226 'message': 'Failed to unregister the wait handle',
227 'exception': exc,
228 'future': self,
229 }
230 if self._source_traceback:
231 context['source_traceback'] = self._source_traceback
232 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100233 return
234 # ERROR_IO_PENDING is not an error, the wait was unregistered
235
236 self._event_fut = self._proactor._wait_cancel(self._event,
237 self._unregister_wait_cb)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700238
239
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240class PipeServer(object):
241 """Class representing a pipe server.
242
243 This is much like a bound, listening socket.
244 """
245 def __init__(self, address):
246 self._address = address
247 self._free_instances = weakref.WeakSet()
Victor Stinnerb2614752014-08-25 23:20:52 +0200248 # initialize the pipe attribute before calling _server_pipe_handle()
249 # because this function can raise an exception and the destructor calls
250 # the close() method
251 self._pipe = None
252 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700253 self._pipe = self._server_pipe_handle(True)
254
255 def _get_unconnected_pipe(self):
256 # Create new instance and return previous one. This ensures
257 # that (until the server is closed) there is always at least
258 # one pipe handle for address. Therefore if a client attempt
259 # to connect it will not fail with FileNotFoundError.
260 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
261 return tmp
262
263 def _server_pipe_handle(self, first):
264 # Return a wrapper for a new pipe handle.
Victor Stinnera19b7b32015-01-26 15:03:20 +0100265 if self.closed():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700266 return None
267 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
268 if first:
269 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
270 h = _winapi.CreateNamedPipe(
271 self._address, flags,
272 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
273 _winapi.PIPE_WAIT,
274 _winapi.PIPE_UNLIMITED_INSTANCES,
275 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
276 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
277 pipe = windows_utils.PipeHandle(h)
278 self._free_instances.add(pipe)
279 return pipe
280
Victor Stinnera19b7b32015-01-26 15:03:20 +0100281 def closed(self):
282 return (self._address is None)
283
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284 def close(self):
Victor Stinnerb2614752014-08-25 23:20:52 +0200285 if self._accept_pipe_future is not None:
286 self._accept_pipe_future.cancel()
287 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288 # Close all instances which have not been connected to by a client.
289 if self._address is not None:
290 for pipe in self._free_instances:
291 pipe.close()
292 self._pipe = None
293 self._address = None
294 self._free_instances.clear()
295
296 __del__ = close
297
298
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800299class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 """Windows version of selector event loop."""
301
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302
303class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
304 """Windows version of proactor event loop using IOCP."""
305
306 def __init__(self, proactor=None):
307 if proactor is None:
308 proactor = IocpProactor()
309 super().__init__(proactor)
310
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200311 async def create_pipe_connection(self, protocol_factory, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312 f = self._proactor.connect_pipe(address)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200313 pipe = await f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 protocol = protocol_factory()
315 trans = self._make_duplex_pipe_transport(pipe, protocol,
316 extra={'addr': address})
317 return trans, protocol
318
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200319 async def start_serving_pipe(self, protocol_factory, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700321
Victor Stinnerb2614752014-08-25 23:20:52 +0200322 def loop_accept_pipe(f=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323 pipe = None
324 try:
325 if f:
326 pipe = f.result()
327 server._free_instances.discard(pipe)
Victor Stinnera19b7b32015-01-26 15:03:20 +0100328
329 if server.closed():
330 # A client connected before the server was closed:
331 # drop the client (close the pipe) and exit
332 pipe.close()
333 return
334
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 protocol = protocol_factory()
336 self._make_duplex_pipe_transport(
337 pipe, protocol, extra={'addr': address})
Victor Stinnera19b7b32015-01-26 15:03:20 +0100338
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 pipe = server._get_unconnected_pipe()
340 if pipe is None:
341 return
Victor Stinnera19b7b32015-01-26 15:03:20 +0100342
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 f = self._proactor.accept_pipe(pipe)
Yury Selivanovff827f02014-02-18 18:02:19 -0500344 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 if pipe and pipe.fileno() != -1:
Yury Selivanovff827f02014-02-18 18:02:19 -0500346 self.call_exception_handler({
347 'message': 'Pipe accept failed',
348 'exception': exc,
349 'pipe': pipe,
350 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 pipe.close()
Victor Stinnerb2614752014-08-25 23:20:52 +0200352 elif self._debug:
353 logger.warning("Accept pipe failed on pipe %r",
354 pipe, exc_info=True)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700355 except exceptions.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356 if pipe:
357 pipe.close()
358 else:
Victor Stinnerb2614752014-08-25 23:20:52 +0200359 server._accept_pipe_future = f
360 f.add_done_callback(loop_accept_pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700361
Victor Stinnerb2614752014-08-25 23:20:52 +0200362 self.call_soon(loop_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700363 return [server]
364
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200365 async def _make_subprocess_transport(self, protocol, args, shell,
366 stdin, stdout, stderr, bufsize,
367 extra=None, **kwargs):
Yury Selivanov7661db62016-05-16 15:38:39 -0400368 waiter = self.create_future()
Guido van Rossum59691282013-10-30 14:52:03 -0700369 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
370 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100371 waiter=waiter, extra=extra,
372 **kwargs)
Victor Stinner4bf22e02015-01-15 14:24:22 +0100373 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200374 await waiter
375 except Exception:
Victor Stinner4bf22e02015-01-15 14:24:22 +0100376 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200377 await transp._wait()
378 raise
Victor Stinner4bf22e02015-01-15 14:24:22 +0100379
Guido van Rossum59691282013-10-30 14:52:03 -0700380 return transp
381
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382
383class IocpProactor:
384 """Proactor implementation using IOCP."""
385
386 def __init__(self, concurrency=0xffffffff):
387 self._loop = None
388 self._results = []
389 self._iocp = _overlapped.CreateIoCompletionPort(
390 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
391 self._cache = {}
392 self._registered = weakref.WeakSet()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100393 self._unregistered = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394 self._stopped_serving = weakref.WeakSet()
395
Victor Stinnerfea6a102014-07-25 00:54:53 +0200396 def __repr__(self):
397 return ('<%s overlapped#=%s result#=%s>'
398 % (self.__class__.__name__, len(self._cache),
399 len(self._results)))
400
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700401 def set_loop(self, loop):
402 self._loop = loop
403
404 def select(self, timeout=None):
405 if not self._results:
406 self._poll(timeout)
407 tmp = self._results
408 self._results = []
409 return tmp
410
Victor Stinner41063d22015-01-26 22:30:49 +0100411 def _result(self, value):
Yury Selivanov7661db62016-05-16 15:38:39 -0400412 fut = self._loop.create_future()
Victor Stinner41063d22015-01-26 22:30:49 +0100413 fut.set_result(value)
414 return fut
415
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416 def recv(self, conn, nbytes, flags=0):
417 self._register_with_iocp(conn)
418 ov = _overlapped.Overlapped(NULL)
Victor Stinner41063d22015-01-26 22:30:49 +0100419 try:
420 if isinstance(conn, socket.socket):
421 ov.WSARecv(conn.fileno(), nbytes, flags)
422 else:
423 ov.ReadFile(conn.fileno(), nbytes)
424 except BrokenPipeError:
425 return self._result(b'')
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700426
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100427 def finish_recv(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428 try:
429 return ov.getresult()
430 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200431 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
432 _overlapped.ERROR_OPERATION_ABORTED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433 raise ConnectionResetError(*exc.args)
434 else:
435 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700436
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100437 return self._register(ov, conn, finish_recv)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200439 def recv_into(self, conn, buf, flags=0):
440 self._register_with_iocp(conn)
441 ov = _overlapped.Overlapped(NULL)
442 try:
443 if isinstance(conn, socket.socket):
444 ov.WSARecvInto(conn.fileno(), buf, flags)
445 else:
446 ov.ReadFileInto(conn.fileno(), buf)
447 except BrokenPipeError:
448 return self._result(b'')
449
450 def finish_recv(trans, key, ov):
451 try:
452 return ov.getresult()
453 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200454 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
455 _overlapped.ERROR_OPERATION_ABORTED):
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200456 raise ConnectionResetError(*exc.args)
457 else:
458 raise
459
460 return self._register(ov, conn, finish_recv)
461
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700462 def send(self, conn, buf, flags=0):
463 self._register_with_iocp(conn)
464 ov = _overlapped.Overlapped(NULL)
465 if isinstance(conn, socket.socket):
466 ov.WSASend(conn.fileno(), buf, flags)
467 else:
468 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700469
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100470 def finish_send(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471 try:
472 return ov.getresult()
473 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200474 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
475 _overlapped.ERROR_OPERATION_ABORTED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476 raise ConnectionResetError(*exc.args)
477 else:
478 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700479
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100480 return self._register(ov, conn, finish_send)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700481
482 def accept(self, listener):
483 self._register_with_iocp(listener)
484 conn = self._get_accept_socket(listener.family)
485 ov = _overlapped.Overlapped(NULL)
486 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700487
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 def finish_accept(trans, key, ov):
489 ov.getresult()
490 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
491 buf = struct.pack('@P', listener.fileno())
492 conn.setsockopt(socket.SOL_SOCKET,
493 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
494 conn.settimeout(listener.gettimeout())
495 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700496
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200497 async def accept_coro(future, conn):
Victor Stinner7de26462014-01-11 00:03:21 +0100498 # Coroutine closing the accept socket if the future is cancelled
499 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200500 await future
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700501 except exceptions.CancelledError:
Victor Stinner7de26462014-01-11 00:03:21 +0100502 conn.close()
503 raise
504
505 future = self._register(ov, listener, finish_accept)
506 coro = accept_coro(future, conn)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400507 tasks.ensure_future(coro, loop=self._loop)
Victor Stinner7de26462014-01-11 00:03:21 +0100508 return future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700509
510 def connect(self, conn, address):
511 self._register_with_iocp(conn)
512 # The socket needs to be locally bound before we call ConnectEx().
513 try:
514 _overlapped.BindLocal(conn.fileno(), conn.family)
515 except OSError as e:
516 if e.winerror != errno.WSAEINVAL:
517 raise
518 # Probably already locally bound; check using getsockname().
519 if conn.getsockname()[1] == 0:
520 raise
521 ov = _overlapped.Overlapped(NULL)
522 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700523
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700524 def finish_connect(trans, key, ov):
525 ov.getresult()
526 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
527 conn.setsockopt(socket.SOL_SOCKET,
528 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
529 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700530
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531 return self._register(ov, conn, finish_connect)
532
Andrew Svetlova19fb3c2018-02-25 19:32:14 +0300533 def sendfile(self, sock, file, offset, count):
534 self._register_with_iocp(sock)
535 ov = _overlapped.Overlapped(NULL)
536 offset_low = offset & 0xffff_ffff
537 offset_high = (offset >> 32) & 0xffff_ffff
538 ov.TransmitFile(sock.fileno(),
539 msvcrt.get_osfhandle(file.fileno()),
540 offset_low, offset_high,
541 count, 0, 0)
542
543 def finish_sendfile(trans, key, ov):
544 try:
545 return ov.getresult()
546 except OSError as exc:
547 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
548 _overlapped.ERROR_OPERATION_ABORTED):
549 raise ConnectionResetError(*exc.args)
550 else:
551 raise
552 return self._register(ov, sock, finish_sendfile)
553
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 def accept_pipe(self, pipe):
555 self._register_with_iocp(pipe)
556 ov = _overlapped.Overlapped(NULL)
Victor Stinner2b77c542015-01-22 23:50:03 +0100557 connected = ov.ConnectNamedPipe(pipe.fileno())
558
559 if connected:
560 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
561 # that the pipe is connected. There is no need to wait for the
562 # completion of the connection.
Victor Stinner41063d22015-01-26 22:30:49 +0100563 return self._result(pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700564
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100565 def finish_accept_pipe(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566 ov.getresult()
567 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700568
Victor Stinner2b77c542015-01-22 23:50:03 +0100569 return self._register(ov, pipe, finish_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700570
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200571 async def connect_pipe(self, address):
Victor Stinnere0fd1572015-01-26 15:04:03 +0100572 delay = CONNECT_PIPE_INIT_DELAY
573 while True:
Yury Selivanov6370f342017-12-10 18:36:12 -0500574 # Unfortunately there is no way to do an overlapped connect to
575 # a pipe. Call CreateFile() in a loop until it doesn't fail with
576 # ERROR_PIPE_BUSY.
Victor Stinnere0fd1572015-01-26 15:04:03 +0100577 try:
578 handle = _overlapped.ConnectPipe(address)
579 break
580 except OSError as exc:
581 if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
582 raise
583
584 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
585 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200586 await tasks.sleep(delay, loop=self._loop)
Victor Stinnere0fd1572015-01-26 15:04:03 +0100587
588 return windows_utils.PipeHandle(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700589
Guido van Rossum90fb9142013-10-30 14:44:05 -0700590 def wait_for_handle(self, handle, timeout=None):
Victor Stinner4d825b42014-12-19 17:10:44 +0100591 """Wait for a handle.
592
593 Return a Future object. The result of the future is True if the wait
594 completed, or False if the wait did not complete (on timeout).
595 """
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100596 return self._wait_for_handle(handle, timeout, False)
597
598 def _wait_cancel(self, event, done_callback):
599 fut = self._wait_for_handle(event, None, True)
600 # add_done_callback() cannot be used because the wait may only complete
601 # in IocpProactor.close(), while the event loop is not running.
602 fut._done_callback = done_callback
603 return fut
604
605 def _wait_for_handle(self, handle, timeout, _is_cancel):
Guido van Rossum90fb9142013-10-30 14:44:05 -0700606 if timeout is None:
607 ms = _winapi.INFINITE
608 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100609 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
610 # round away from zero to wait *at least* timeout seconds.
611 ms = math.ceil(timeout * 1e3)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700612
613 # We only create ov so we can use ov.address as a key for the cache.
614 ov = _overlapped.Overlapped(NULL)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100615 wait_handle = _overlapped.RegisterWaitWithQueue(
Guido van Rossum90fb9142013-10-30 14:44:05 -0700616 handle, self._iocp, ov.address, ms)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100617 if _is_cancel:
618 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
619 else:
620 f = _WaitHandleFuture(ov, handle, wait_handle, self,
621 loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200622 if f._source_traceback:
623 del f._source_traceback[-1]
Guido van Rossum90fb9142013-10-30 14:44:05 -0700624
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100625 def finish_wait_for_handle(trans, key, ov):
Richard Oudkerk71196e72013-11-24 17:50:40 +0000626 # Note that this second wait means that we should only use
627 # this with handles types where a successful wait has no
628 # effect. So events or processes are all right, but locks
629 # or semaphores are not. Also note if the handle is
630 # signalled and then quickly reset, then we may return
631 # False even though we have not timed out.
Victor Stinner313a9802014-07-29 12:58:23 +0200632 return f._poll()
Guido van Rossum90fb9142013-10-30 14:44:05 -0700633
Victor Stinner313a9802014-07-29 12:58:23 +0200634 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700635 return f
636
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700637 def _register_with_iocp(self, obj):
638 # To get notifications of finished ops on this objects sent to the
639 # completion port, were must register the handle.
640 if obj not in self._registered:
641 self._registered.add(obj)
642 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
643 # XXX We could also use SetFileCompletionNotificationModes()
644 # to avoid sending notifications to completion port of ops
645 # that succeed immediately.
646
Victor Stinner2b77c542015-01-22 23:50:03 +0100647 def _register(self, ov, obj, callback):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700648 # Return a future which will be set with the result of the
649 # operation when it completes. The future's value is actually
650 # the value returned by callback().
651 f = _OverlappedFuture(ov, loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200652 if f._source_traceback:
653 del f._source_traceback[-1]
Victor Stinner2b77c542015-01-22 23:50:03 +0100654 if not ov.pending:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700655 # The operation has completed, so no need to postpone the
656 # work. We cannot take this short cut if we need the
657 # NumberOfBytes, CompletionKey values returned by
658 # PostQueuedCompletionStatus().
659 try:
660 value = callback(None, None, ov)
661 except OSError as e:
662 f.set_exception(e)
663 else:
664 f.set_result(value)
Victor Stinner42d3bde2014-07-28 00:18:43 +0200665 # Even if GetOverlappedResult() was called, we have to wait for the
666 # notification of the completion in GetQueuedCompletionStatus().
667 # Register the overlapped operation to keep a reference to the
668 # OVERLAPPED object, otherwise the memory is freed and Windows may
669 # read uninitialized memory.
Victor Stinner2b77c542015-01-22 23:50:03 +0100670
671 # Register the overlapped operation for later. Note that
672 # we only store obj to prevent it from being garbage
673 # collected too early.
674 self._cache[ov.address] = (f, ov, obj, callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700675 return f
676
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100677 def _unregister(self, ov):
678 """Unregister an overlapped object.
679
680 Call this method when its future has been cancelled. The event can
681 already be signalled (pending in the proactor event queue). It is also
682 safe if the event is never signalled (because it was cancelled).
683 """
684 self._unregistered.append(ov)
685
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686 def _get_accept_socket(self, family):
687 s = socket.socket(family)
688 s.settimeout(0)
689 return s
690
691 def _poll(self, timeout=None):
692 if timeout is None:
693 ms = INFINITE
694 elif timeout < 0:
695 raise ValueError("negative timeout")
696 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100697 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
698 # round away from zero to wait *at least* timeout seconds.
699 ms = math.ceil(timeout * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700700 if ms >= INFINITE:
701 raise ValueError("timeout too big")
Victor Stinner313a9802014-07-29 12:58:23 +0200702
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700703 while True:
704 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
705 if status is None:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100706 break
Victor Stinner313a9802014-07-29 12:58:23 +0200707 ms = 0
708
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700709 err, transferred, key, address = status
710 try:
711 f, ov, obj, callback = self._cache.pop(address)
712 except KeyError:
Victor Stinner42d3bde2014-07-28 00:18:43 +0200713 if self._loop.get_debug():
714 self._loop.call_exception_handler({
715 'message': ('GetQueuedCompletionStatus() returned an '
716 'unexpected event'),
717 'status': ('err=%s transferred=%s key=%#x address=%#x'
718 % (err, transferred, key, address)),
719 })
720
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700721 # key is either zero, or it is used to return a pipe
722 # handle which should be closed to avoid a leak.
723 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
724 _winapi.CloseHandle(key)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700725 continue
Victor Stinner51e44ea2014-07-26 00:58:34 +0200726
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700727 if obj in self._stopped_serving:
728 f.cancel()
Victor Stinner42d3bde2014-07-28 00:18:43 +0200729 # Don't call the callback if _register() already read the result or
730 # if the overlapped has been cancelled
731 elif not f.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700732 try:
733 value = callback(transferred, key, ov)
734 except OSError as e:
735 f.set_exception(e)
736 self._results.append(f)
737 else:
738 f.set_result(value)
739 self._results.append(f)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700740
Andrew Svetlov7a6706b2017-12-13 17:50:16 +0200741 # Remove unregistered futures
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100742 for ov in self._unregistered:
743 self._cache.pop(ov.address, None)
744 self._unregistered.clear()
745
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700746 def _stop_serving(self, obj):
747 # obj is a socket or pipe handle. It will be closed in
748 # BaseProactorEventLoop._stop_serving() which will make any
749 # pending operations fail quickly.
750 self._stopped_serving.add(obj)
751
752 def close(self):
753 # Cancel remaining registered operations.
Victor Stinnerfea6a102014-07-25 00:54:53 +0200754 for address, (fut, ov, obj, callback) in list(self._cache.items()):
Victor Stinner3d2256f2015-01-26 11:02:59 +0100755 if fut.cancelled():
Victor Stinner752aba72015-01-22 22:47:13 +0100756 # Nothing to do with cancelled futures
757 pass
Victor Stinner1ca93922015-01-22 00:17:54 +0100758 elif isinstance(fut, _WaitCancelFuture):
759 # _WaitCancelFuture must not be cancelled
760 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700761 else:
762 try:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200763 fut.cancel()
764 except OSError as exc:
765 if self._loop is not None:
766 context = {
767 'message': 'Cancelling a future failed',
768 'exception': exc,
769 'future': fut,
770 }
771 if fut._source_traceback:
772 context['source_traceback'] = fut._source_traceback
773 self._loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700774
775 while self._cache:
776 if not self._poll(1):
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700777 logger.debug('taking long time to close proactor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700778
779 self._results = []
780 if self._iocp is not None:
781 _winapi.CloseHandle(self._iocp)
782 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700783
Victor Stinnerfea6a102014-07-25 00:54:53 +0200784 def __del__(self):
785 self.close()
786
Guido van Rossum59691282013-10-30 14:52:03 -0700787
788class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
789
790 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
791 self._proc = windows_utils.Popen(
792 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
793 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700794
Guido van Rossum59691282013-10-30 14:52:03 -0700795 def callback(f):
796 returncode = self._proc.poll()
797 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700798
Guido van Rossum59691282013-10-30 14:52:03 -0700799 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
800 f.add_done_callback(callback)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800801
802
803SelectorEventLoop = _WindowsSelectorEventLoop
804
805
Yury Selivanov8f404292018-06-07 20:44:57 -0400806class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800807 _loop_factory = SelectorEventLoop
808
809
Yury Selivanov8f404292018-06-07 20:44:57 -0400810class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
811 _loop_factory = ProactorEventLoop
812
813
Victor Stinner6ea29c52018-09-25 08:27:08 -0700814DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy