blob: 2ec542764375e9b4a177a79226acaa69d0e6b6a3 [file] [log] [blame]
Yury Selivanovb0b0e622014-02-18 22:27:48 -05001"""Selector and proactor event loops for Windows."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Victor Stinner4271dfd2017-11-28 15:19:56 +01003import _overlapped
Victor Stinnerf2e17682014-01-31 16:25:24 +01004import _winapi
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import errno
Victor Stinnerf2e17682014-01-31 16:25:24 +01006import math
Miss Islington (bot)632c1cb2018-02-25 09:10:58 -08007import msvcrt
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008import socket
Victor Stinnerf2e17682014-01-31 16:25:24 +01009import struct
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080012from . import events
Guido van Rossum59691282013-10-30 14:52:03 -070013from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014from . import futures
15from . import proactor_events
16from . import selector_events
17from . import tasks
18from . import windows_utils
Victor Stinnerf951d282014-06-29 00:46:45 +020019from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
Yury Selivanov6370f342017-12-10 18:36:12 -050022__all__ = (
23 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
Miss Islington (bot)07384432018-06-07 18:31:50 -070024 'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
25 'WindowsProactorEventLoopPolicy',
Yury Selivanov6370f342017-12-10 18:36:12 -050026)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
28
29NULL = 0
30INFINITE = 0xffffffff
31ERROR_CONNECTION_REFUSED = 1225
32ERROR_CONNECTION_ABORTED = 1236
33
Victor Stinner7ffa2c52015-01-22 22:55:08 +010034# Initial delay in seconds for connect_pipe() before retrying to connect
35CONNECT_PIPE_INIT_DELAY = 0.001
36
37# Maximum delay in seconds for connect_pipe() before retrying to connect
38CONNECT_PIPE_MAX_DELAY = 0.100
39
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040
41class _OverlappedFuture(futures.Future):
42 """Subclass of Future which represents an overlapped operation.
43
44 Cancelling it will immediately cancel the overlapped operation.
45 """
46
47 def __init__(self, ov, *, loop=None):
48 super().__init__(loop=loop)
Victor Stinnerfea6a102014-07-25 00:54:53 +020049 if self._source_traceback:
50 del self._source_traceback[-1]
Victor Stinner18a28dc2014-07-25 13:05:20 +020051 self._ov = ov
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070052
Victor Stinner313a9802014-07-29 12:58:23 +020053 def _repr_info(self):
54 info = super()._repr_info()
Victor Stinner18a28dc2014-07-25 13:05:20 +020055 if self._ov is not None:
56 state = 'pending' if self._ov.pending else 'completed'
Yury Selivanov6370f342017-12-10 18:36:12 -050057 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
Victor Stinner313a9802014-07-29 12:58:23 +020058 return info
Victor Stinnere912e652014-07-12 03:11:53 +020059
Victor Stinner18a28dc2014-07-25 13:05:20 +020060 def _cancel_overlapped(self):
61 if self._ov is None:
62 return
63 try:
64 self._ov.cancel()
65 except OSError as exc:
66 context = {
67 'message': 'Cancelling an overlapped future failed',
68 'exception': exc,
69 'future': self,
70 }
71 if self._source_traceback:
72 context['source_traceback'] = self._source_traceback
73 self._loop.call_exception_handler(context)
74 self._ov = None
75
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 def cancel(self):
Victor Stinner18a28dc2014-07-25 13:05:20 +020077 self._cancel_overlapped()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078 return super().cancel()
79
Victor Stinner18a28dc2014-07-25 13:05:20 +020080 def set_exception(self, exception):
81 super().set_exception(exception)
82 self._cancel_overlapped()
83
Victor Stinner51e44ea2014-07-26 00:58:34 +020084 def set_result(self, result):
85 super().set_result(result)
86 self._ov = None
87
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070088
Victor Stinnerd0a28de2015-01-21 23:39:51 +010089class _BaseWaitHandleFuture(futures.Future):
Guido van Rossum90fb9142013-10-30 14:44:05 -070090 """Subclass of Future which represents a wait handle."""
91
Victor Stinnerd0a28de2015-01-21 23:39:51 +010092 def __init__(self, ov, handle, wait_handle, *, loop=None):
Guido van Rossum90fb9142013-10-30 14:44:05 -070093 super().__init__(loop=loop)
Victor Stinner313a9802014-07-29 12:58:23 +020094 if self._source_traceback:
95 del self._source_traceback[-1]
Victor Stinnerd0a28de2015-01-21 23:39:51 +010096 # Keep a reference to the Overlapped object to keep it alive until the
97 # wait is unregistered
Victor Stinner313a9802014-07-29 12:58:23 +020098 self._ov = ov
Victor Stinner18a28dc2014-07-25 13:05:20 +020099 self._handle = handle
Guido van Rossum90fb9142013-10-30 14:44:05 -0700100 self._wait_handle = wait_handle
101
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100102 # Should we call UnregisterWaitEx() if the wait completes
103 # or is cancelled?
104 self._registered = True
105
Victor Stinner18a28dc2014-07-25 13:05:20 +0200106 def _poll(self):
107 # non-blocking wait: use a timeout of 0 millisecond
108 return (_winapi.WaitForSingleObject(self._handle, 0) ==
109 _winapi.WAIT_OBJECT_0)
110
Victor Stinner313a9802014-07-29 12:58:23 +0200111 def _repr_info(self):
112 info = super()._repr_info()
Yury Selivanov6370f342017-12-10 18:36:12 -0500113 info.append(f'handle={self._handle:#x}')
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100114 if self._handle is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200115 state = 'signaled' if self._poll() else 'waiting'
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100116 info.append(state)
117 if self._wait_handle is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500118 info.append(f'wait_handle={self._wait_handle:#x}')
Victor Stinner313a9802014-07-29 12:58:23 +0200119 return info
Victor Stinner18a28dc2014-07-25 13:05:20 +0200120
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100121 def _unregister_wait_cb(self, fut):
122 # The wait was unregistered: it's not safe to destroy the Overlapped
123 # object
124 self._ov = None
125
Victor Stinner313a9802014-07-29 12:58:23 +0200126 def _unregister_wait(self):
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100127 if not self._registered:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200128 return
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100129 self._registered = False
130
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100131 wait_handle = self._wait_handle
132 self._wait_handle = None
Guido van Rossum90fb9142013-10-30 14:44:05 -0700133 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100134 _overlapped.UnregisterWait(wait_handle)
Victor Stinnerb2614752014-08-25 23:20:52 +0200135 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100136 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerb2614752014-08-25 23:20:52 +0200137 context = {
138 'message': 'Failed to unregister the wait handle',
139 'exception': exc,
140 'future': self,
141 }
142 if self._source_traceback:
143 context['source_traceback'] = self._source_traceback
144 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100145 return
146 # ERROR_IO_PENDING means that the unregister is pending
147
148 self._unregister_wait_cb(None)
Victor Stinnerfea6a102014-07-25 00:54:53 +0200149
150 def cancel(self):
Victor Stinner313a9802014-07-29 12:58:23 +0200151 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100152 return super().cancel()
Victor Stinner313a9802014-07-29 12:58:23 +0200153
154 def set_exception(self, exception):
Victor Stinner313a9802014-07-29 12:58:23 +0200155 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100156 super().set_exception(exception)
Victor Stinner313a9802014-07-29 12:58:23 +0200157
158 def set_result(self, result):
Victor Stinner313a9802014-07-29 12:58:23 +0200159 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100160 super().set_result(result)
161
162
163class _WaitCancelFuture(_BaseWaitHandleFuture):
164 """Subclass of Future which represents a wait for the cancellation of a
165 _WaitHandleFuture using an event.
166 """
167
168 def __init__(self, ov, event, wait_handle, *, loop=None):
169 super().__init__(ov, event, wait_handle, loop=loop)
170
171 self._done_callback = None
172
Victor Stinner1ca93922015-01-22 00:17:54 +0100173 def cancel(self):
174 raise RuntimeError("_WaitCancelFuture must not be cancelled")
175
INADA Naokia8363622016-10-21 12:30:15 +0900176 def set_result(self, result):
177 super().set_result(result)
178 if self._done_callback is not None:
179 self._done_callback(self)
180
181 def set_exception(self, exception):
182 super().set_exception(exception)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100183 if self._done_callback is not None:
184 self._done_callback(self)
185
186
187class _WaitHandleFuture(_BaseWaitHandleFuture):
188 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
189 super().__init__(ov, handle, wait_handle, loop=loop)
190 self._proactor = proactor
191 self._unregister_proactor = True
192 self._event = _overlapped.CreateEvent(None, True, False, None)
193 self._event_fut = None
194
195 def _unregister_wait_cb(self, fut):
196 if self._event is not None:
197 _winapi.CloseHandle(self._event)
198 self._event = None
199 self._event_fut = None
200
201 # If the wait was cancelled, the wait may never be signalled, so
202 # it's required to unregister it. Otherwise, IocpProactor.close() will
203 # wait forever for an event which will never come.
204 #
205 # If the IocpProactor already received the event, it's safe to call
206 # _unregister() because we kept a reference to the Overlapped object
Martin Panter6245cb32016-04-15 02:14:19 +0000207 # which is used as a unique key.
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100208 self._proactor._unregister(self._ov)
209 self._proactor = None
210
211 super()._unregister_wait_cb(fut)
212
213 def _unregister_wait(self):
214 if not self._registered:
215 return
216 self._registered = False
217
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100218 wait_handle = self._wait_handle
219 self._wait_handle = None
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100220 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100221 _overlapped.UnregisterWaitEx(wait_handle, self._event)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100222 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100223 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100224 context = {
225 'message': 'Failed to unregister the wait handle',
226 'exception': exc,
227 'future': self,
228 }
229 if self._source_traceback:
230 context['source_traceback'] = self._source_traceback
231 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100232 return
233 # ERROR_IO_PENDING is not an error, the wait was unregistered
234
235 self._event_fut = self._proactor._wait_cancel(self._event,
236 self._unregister_wait_cb)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700237
238
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700239class PipeServer(object):
240 """Class representing a pipe server.
241
242 This is much like a bound, listening socket.
243 """
244 def __init__(self, address):
245 self._address = address
246 self._free_instances = weakref.WeakSet()
Victor Stinnerb2614752014-08-25 23:20:52 +0200247 # initialize the pipe attribute before calling _server_pipe_handle()
248 # because this function can raise an exception and the destructor calls
249 # the close() method
250 self._pipe = None
251 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700252 self._pipe = self._server_pipe_handle(True)
253
254 def _get_unconnected_pipe(self):
255 # Create new instance and return previous one. This ensures
256 # that (until the server is closed) there is always at least
257 # one pipe handle for address. Therefore if a client attempt
258 # to connect it will not fail with FileNotFoundError.
259 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
260 return tmp
261
262 def _server_pipe_handle(self, first):
263 # Return a wrapper for a new pipe handle.
Victor Stinnera19b7b32015-01-26 15:03:20 +0100264 if self.closed():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265 return None
266 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
267 if first:
268 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
269 h = _winapi.CreateNamedPipe(
270 self._address, flags,
271 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
272 _winapi.PIPE_WAIT,
273 _winapi.PIPE_UNLIMITED_INSTANCES,
274 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
275 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
276 pipe = windows_utils.PipeHandle(h)
277 self._free_instances.add(pipe)
278 return pipe
279
Victor Stinnera19b7b32015-01-26 15:03:20 +0100280 def closed(self):
281 return (self._address is None)
282
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700283 def close(self):
Victor Stinnerb2614752014-08-25 23:20:52 +0200284 if self._accept_pipe_future is not None:
285 self._accept_pipe_future.cancel()
286 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287 # Close all instances which have not been connected to by a client.
288 if self._address is not None:
289 for pipe in self._free_instances:
290 pipe.close()
291 self._pipe = None
292 self._address = None
293 self._free_instances.clear()
294
295 __del__ = close
296
297
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800298class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700299 """Windows version of selector event loop."""
300
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301
302class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
303 """Windows version of proactor event loop using IOCP."""
304
305 def __init__(self, proactor=None):
306 if proactor is None:
307 proactor = IocpProactor()
308 super().__init__(proactor)
309
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200310 async def create_pipe_connection(self, protocol_factory, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311 f = self._proactor.connect_pipe(address)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200312 pipe = await f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313 protocol = protocol_factory()
314 trans = self._make_duplex_pipe_transport(pipe, protocol,
315 extra={'addr': address})
316 return trans, protocol
317
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200318 async def start_serving_pipe(self, protocol_factory, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700320
Victor Stinnerb2614752014-08-25 23:20:52 +0200321 def loop_accept_pipe(f=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 pipe = None
323 try:
324 if f:
325 pipe = f.result()
326 server._free_instances.discard(pipe)
Victor Stinnera19b7b32015-01-26 15:03:20 +0100327
328 if server.closed():
329 # A client connected before the server was closed:
330 # drop the client (close the pipe) and exit
331 pipe.close()
332 return
333
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334 protocol = protocol_factory()
335 self._make_duplex_pipe_transport(
336 pipe, protocol, extra={'addr': address})
Victor Stinnera19b7b32015-01-26 15:03:20 +0100337
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338 pipe = server._get_unconnected_pipe()
339 if pipe is None:
340 return
Victor Stinnera19b7b32015-01-26 15:03:20 +0100341
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342 f = self._proactor.accept_pipe(pipe)
Yury Selivanovff827f02014-02-18 18:02:19 -0500343 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344 if pipe and pipe.fileno() != -1:
Yury Selivanovff827f02014-02-18 18:02:19 -0500345 self.call_exception_handler({
346 'message': 'Pipe accept failed',
347 'exception': exc,
348 'pipe': pipe,
349 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 pipe.close()
Victor Stinnerb2614752014-08-25 23:20:52 +0200351 elif self._debug:
352 logger.warning("Accept pipe failed on pipe %r",
353 pipe, exc_info=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 except futures.CancelledError:
355 if pipe:
356 pipe.close()
357 else:
Victor Stinnerb2614752014-08-25 23:20:52 +0200358 server._accept_pipe_future = f
359 f.add_done_callback(loop_accept_pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700360
Victor Stinnerb2614752014-08-25 23:20:52 +0200361 self.call_soon(loop_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 return [server]
363
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200364 async def _make_subprocess_transport(self, protocol, args, shell,
365 stdin, stdout, stderr, bufsize,
366 extra=None, **kwargs):
Yury Selivanov7661db62016-05-16 15:38:39 -0400367 waiter = self.create_future()
Guido van Rossum59691282013-10-30 14:52:03 -0700368 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
369 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100370 waiter=waiter, extra=extra,
371 **kwargs)
Victor Stinner4bf22e02015-01-15 14:24:22 +0100372 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200373 await waiter
374 except Exception:
Victor Stinner4bf22e02015-01-15 14:24:22 +0100375 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200376 await transp._wait()
377 raise
Victor Stinner4bf22e02015-01-15 14:24:22 +0100378
Guido van Rossum59691282013-10-30 14:52:03 -0700379 return transp
380
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381
382class IocpProactor:
383 """Proactor implementation using IOCP."""
384
385 def __init__(self, concurrency=0xffffffff):
386 self._loop = None
387 self._results = []
388 self._iocp = _overlapped.CreateIoCompletionPort(
389 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
390 self._cache = {}
391 self._registered = weakref.WeakSet()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100392 self._unregistered = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700393 self._stopped_serving = weakref.WeakSet()
394
Victor Stinnerfea6a102014-07-25 00:54:53 +0200395 def __repr__(self):
396 return ('<%s overlapped#=%s result#=%s>'
397 % (self.__class__.__name__, len(self._cache),
398 len(self._results)))
399
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 def set_loop(self, loop):
401 self._loop = loop
402
403 def select(self, timeout=None):
404 if not self._results:
405 self._poll(timeout)
406 tmp = self._results
407 self._results = []
408 return tmp
409
Victor Stinner41063d22015-01-26 22:30:49 +0100410 def _result(self, value):
Yury Selivanov7661db62016-05-16 15:38:39 -0400411 fut = self._loop.create_future()
Victor Stinner41063d22015-01-26 22:30:49 +0100412 fut.set_result(value)
413 return fut
414
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415 def recv(self, conn, nbytes, flags=0):
416 self._register_with_iocp(conn)
417 ov = _overlapped.Overlapped(NULL)
Victor Stinner41063d22015-01-26 22:30:49 +0100418 try:
419 if isinstance(conn, socket.socket):
420 ov.WSARecv(conn.fileno(), nbytes, flags)
421 else:
422 ov.ReadFile(conn.fileno(), nbytes)
423 except BrokenPipeError:
424 return self._result(b'')
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700425
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100426 def finish_recv(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427 try:
428 return ov.getresult()
429 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200430 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
431 _overlapped.ERROR_OPERATION_ABORTED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432 raise ConnectionResetError(*exc.args)
433 else:
434 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700435
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100436 return self._register(ov, conn, finish_recv)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200438 def recv_into(self, conn, buf, flags=0):
439 self._register_with_iocp(conn)
440 ov = _overlapped.Overlapped(NULL)
441 try:
442 if isinstance(conn, socket.socket):
443 ov.WSARecvInto(conn.fileno(), buf, flags)
444 else:
445 ov.ReadFileInto(conn.fileno(), buf)
446 except BrokenPipeError:
447 return self._result(b'')
448
449 def finish_recv(trans, key, ov):
450 try:
451 return ov.getresult()
452 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200453 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
454 _overlapped.ERROR_OPERATION_ABORTED):
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200455 raise ConnectionResetError(*exc.args)
456 else:
457 raise
458
459 return self._register(ov, conn, finish_recv)
460
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461 def send(self, conn, buf, flags=0):
462 self._register_with_iocp(conn)
463 ov = _overlapped.Overlapped(NULL)
464 if isinstance(conn, socket.socket):
465 ov.WSASend(conn.fileno(), buf, flags)
466 else:
467 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700468
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100469 def finish_send(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700470 try:
471 return ov.getresult()
472 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200473 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
474 _overlapped.ERROR_OPERATION_ABORTED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700475 raise ConnectionResetError(*exc.args)
476 else:
477 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700478
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100479 return self._register(ov, conn, finish_send)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480
481 def accept(self, listener):
482 self._register_with_iocp(listener)
483 conn = self._get_accept_socket(listener.family)
484 ov = _overlapped.Overlapped(NULL)
485 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700486
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700487 def finish_accept(trans, key, ov):
488 ov.getresult()
489 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
490 buf = struct.pack('@P', listener.fileno())
491 conn.setsockopt(socket.SOL_SOCKET,
492 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
493 conn.settimeout(listener.gettimeout())
494 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700495
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200496 async def accept_coro(future, conn):
Victor Stinner7de26462014-01-11 00:03:21 +0100497 # Coroutine closing the accept socket if the future is cancelled
498 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200499 await future
Victor Stinner7de26462014-01-11 00:03:21 +0100500 except futures.CancelledError:
501 conn.close()
502 raise
503
504 future = self._register(ov, listener, finish_accept)
505 coro = accept_coro(future, conn)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400506 tasks.ensure_future(coro, loop=self._loop)
Victor Stinner7de26462014-01-11 00:03:21 +0100507 return future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700508
509 def connect(self, conn, address):
510 self._register_with_iocp(conn)
511 # The socket needs to be locally bound before we call ConnectEx().
512 try:
513 _overlapped.BindLocal(conn.fileno(), conn.family)
514 except OSError as e:
515 if e.winerror != errno.WSAEINVAL:
516 raise
517 # Probably already locally bound; check using getsockname().
518 if conn.getsockname()[1] == 0:
519 raise
520 ov = _overlapped.Overlapped(NULL)
521 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700522
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700523 def finish_connect(trans, key, ov):
524 ov.getresult()
525 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
526 conn.setsockopt(socket.SOL_SOCKET,
527 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
528 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700529
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530 return self._register(ov, conn, finish_connect)
531
Miss Islington (bot)632c1cb2018-02-25 09:10:58 -0800532 def sendfile(self, sock, file, offset, count):
533 self._register_with_iocp(sock)
534 ov = _overlapped.Overlapped(NULL)
535 offset_low = offset & 0xffff_ffff
536 offset_high = (offset >> 32) & 0xffff_ffff
537 ov.TransmitFile(sock.fileno(),
538 msvcrt.get_osfhandle(file.fileno()),
539 offset_low, offset_high,
540 count, 0, 0)
541
542 def finish_sendfile(trans, key, ov):
543 try:
544 return ov.getresult()
545 except OSError as exc:
546 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
547 _overlapped.ERROR_OPERATION_ABORTED):
548 raise ConnectionResetError(*exc.args)
549 else:
550 raise
551 return self._register(ov, sock, finish_sendfile)
552
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553 def accept_pipe(self, pipe):
554 self._register_with_iocp(pipe)
555 ov = _overlapped.Overlapped(NULL)
Victor Stinner2b77c542015-01-22 23:50:03 +0100556 connected = ov.ConnectNamedPipe(pipe.fileno())
557
558 if connected:
559 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
560 # that the pipe is connected. There is no need to wait for the
561 # completion of the connection.
Victor Stinner41063d22015-01-26 22:30:49 +0100562 return self._result(pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700563
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100564 def finish_accept_pipe(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700565 ov.getresult()
566 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700567
Victor Stinner2b77c542015-01-22 23:50:03 +0100568 return self._register(ov, pipe, finish_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700569
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200570 async def connect_pipe(self, address):
Victor Stinnere0fd1572015-01-26 15:04:03 +0100571 delay = CONNECT_PIPE_INIT_DELAY
572 while True:
Yury Selivanov6370f342017-12-10 18:36:12 -0500573 # Unfortunately there is no way to do an overlapped connect to
574 # a pipe. Call CreateFile() in a loop until it doesn't fail with
575 # ERROR_PIPE_BUSY.
Victor Stinnere0fd1572015-01-26 15:04:03 +0100576 try:
577 handle = _overlapped.ConnectPipe(address)
578 break
579 except OSError as exc:
580 if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
581 raise
582
583 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
584 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200585 await tasks.sleep(delay, loop=self._loop)
Victor Stinnere0fd1572015-01-26 15:04:03 +0100586
587 return windows_utils.PipeHandle(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700588
Guido van Rossum90fb9142013-10-30 14:44:05 -0700589 def wait_for_handle(self, handle, timeout=None):
Victor Stinner4d825b42014-12-19 17:10:44 +0100590 """Wait for a handle.
591
592 Return a Future object. The result of the future is True if the wait
593 completed, or False if the wait did not complete (on timeout).
594 """
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100595 return self._wait_for_handle(handle, timeout, False)
596
597 def _wait_cancel(self, event, done_callback):
598 fut = self._wait_for_handle(event, None, True)
599 # add_done_callback() cannot be used because the wait may only complete
600 # in IocpProactor.close(), while the event loop is not running.
601 fut._done_callback = done_callback
602 return fut
603
604 def _wait_for_handle(self, handle, timeout, _is_cancel):
Guido van Rossum90fb9142013-10-30 14:44:05 -0700605 if timeout is None:
606 ms = _winapi.INFINITE
607 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100608 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
609 # round away from zero to wait *at least* timeout seconds.
610 ms = math.ceil(timeout * 1e3)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700611
612 # We only create ov so we can use ov.address as a key for the cache.
613 ov = _overlapped.Overlapped(NULL)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100614 wait_handle = _overlapped.RegisterWaitWithQueue(
Guido van Rossum90fb9142013-10-30 14:44:05 -0700615 handle, self._iocp, ov.address, ms)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100616 if _is_cancel:
617 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
618 else:
619 f = _WaitHandleFuture(ov, handle, wait_handle, self,
620 loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200621 if f._source_traceback:
622 del f._source_traceback[-1]
Guido van Rossum90fb9142013-10-30 14:44:05 -0700623
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100624 def finish_wait_for_handle(trans, key, ov):
Richard Oudkerk71196e72013-11-24 17:50:40 +0000625 # Note that this second wait means that we should only use
626 # this with handles types where a successful wait has no
627 # effect. So events or processes are all right, but locks
628 # or semaphores are not. Also note if the handle is
629 # signalled and then quickly reset, then we may return
630 # False even though we have not timed out.
Victor Stinner313a9802014-07-29 12:58:23 +0200631 return f._poll()
Guido van Rossum90fb9142013-10-30 14:44:05 -0700632
Victor Stinner313a9802014-07-29 12:58:23 +0200633 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700634 return f
635
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700636 def _register_with_iocp(self, obj):
637 # To get notifications of finished ops on this objects sent to the
638 # completion port, were must register the handle.
639 if obj not in self._registered:
640 self._registered.add(obj)
641 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
642 # XXX We could also use SetFileCompletionNotificationModes()
643 # to avoid sending notifications to completion port of ops
644 # that succeed immediately.
645
Victor Stinner2b77c542015-01-22 23:50:03 +0100646 def _register(self, ov, obj, callback):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647 # Return a future which will be set with the result of the
648 # operation when it completes. The future's value is actually
649 # the value returned by callback().
650 f = _OverlappedFuture(ov, loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200651 if f._source_traceback:
652 del f._source_traceback[-1]
Victor Stinner2b77c542015-01-22 23:50:03 +0100653 if not ov.pending:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700654 # The operation has completed, so no need to postpone the
655 # work. We cannot take this short cut if we need the
656 # NumberOfBytes, CompletionKey values returned by
657 # PostQueuedCompletionStatus().
658 try:
659 value = callback(None, None, ov)
660 except OSError as e:
661 f.set_exception(e)
662 else:
663 f.set_result(value)
Victor Stinner42d3bde2014-07-28 00:18:43 +0200664 # Even if GetOverlappedResult() was called, we have to wait for the
665 # notification of the completion in GetQueuedCompletionStatus().
666 # Register the overlapped operation to keep a reference to the
667 # OVERLAPPED object, otherwise the memory is freed and Windows may
668 # read uninitialized memory.
Victor Stinner2b77c542015-01-22 23:50:03 +0100669
670 # Register the overlapped operation for later. Note that
671 # we only store obj to prevent it from being garbage
672 # collected too early.
673 self._cache[ov.address] = (f, ov, obj, callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700674 return f
675
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100676 def _unregister(self, ov):
677 """Unregister an overlapped object.
678
679 Call this method when its future has been cancelled. The event can
680 already be signalled (pending in the proactor event queue). It is also
681 safe if the event is never signalled (because it was cancelled).
682 """
683 self._unregistered.append(ov)
684
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700685 def _get_accept_socket(self, family):
686 s = socket.socket(family)
687 s.settimeout(0)
688 return s
689
690 def _poll(self, timeout=None):
691 if timeout is None:
692 ms = INFINITE
693 elif timeout < 0:
694 raise ValueError("negative timeout")
695 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100696 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
697 # round away from zero to wait *at least* timeout seconds.
698 ms = math.ceil(timeout * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700699 if ms >= INFINITE:
700 raise ValueError("timeout too big")
Victor Stinner313a9802014-07-29 12:58:23 +0200701
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700702 while True:
703 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
704 if status is None:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100705 break
Victor Stinner313a9802014-07-29 12:58:23 +0200706 ms = 0
707
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700708 err, transferred, key, address = status
709 try:
710 f, ov, obj, callback = self._cache.pop(address)
711 except KeyError:
Victor Stinner42d3bde2014-07-28 00:18:43 +0200712 if self._loop.get_debug():
713 self._loop.call_exception_handler({
714 'message': ('GetQueuedCompletionStatus() returned an '
715 'unexpected event'),
716 'status': ('err=%s transferred=%s key=%#x address=%#x'
717 % (err, transferred, key, address)),
718 })
719
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700720 # key is either zero, or it is used to return a pipe
721 # handle which should be closed to avoid a leak.
722 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
723 _winapi.CloseHandle(key)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700724 continue
Victor Stinner51e44ea2014-07-26 00:58:34 +0200725
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700726 if obj in self._stopped_serving:
727 f.cancel()
Victor Stinner42d3bde2014-07-28 00:18:43 +0200728 # Don't call the callback if _register() already read the result or
729 # if the overlapped has been cancelled
730 elif not f.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700731 try:
732 value = callback(transferred, key, ov)
733 except OSError as e:
734 f.set_exception(e)
735 self._results.append(f)
736 else:
737 f.set_result(value)
738 self._results.append(f)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700739
Andrew Svetlov7a6706b2017-12-13 17:50:16 +0200740 # Remove unregistered futures
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100741 for ov in self._unregistered:
742 self._cache.pop(ov.address, None)
743 self._unregistered.clear()
744
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700745 def _stop_serving(self, obj):
746 # obj is a socket or pipe handle. It will be closed in
747 # BaseProactorEventLoop._stop_serving() which will make any
748 # pending operations fail quickly.
749 self._stopped_serving.add(obj)
750
751 def close(self):
752 # Cancel remaining registered operations.
Victor Stinnerfea6a102014-07-25 00:54:53 +0200753 for address, (fut, ov, obj, callback) in list(self._cache.items()):
Victor Stinner3d2256f2015-01-26 11:02:59 +0100754 if fut.cancelled():
Victor Stinner752aba72015-01-22 22:47:13 +0100755 # Nothing to do with cancelled futures
756 pass
Victor Stinner1ca93922015-01-22 00:17:54 +0100757 elif isinstance(fut, _WaitCancelFuture):
758 # _WaitCancelFuture must not be cancelled
759 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700760 else:
761 try:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200762 fut.cancel()
763 except OSError as exc:
764 if self._loop is not None:
765 context = {
766 'message': 'Cancelling a future failed',
767 'exception': exc,
768 'future': fut,
769 }
770 if fut._source_traceback:
771 context['source_traceback'] = fut._source_traceback
772 self._loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700773
774 while self._cache:
775 if not self._poll(1):
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700776 logger.debug('taking long time to close proactor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700777
778 self._results = []
779 if self._iocp is not None:
780 _winapi.CloseHandle(self._iocp)
781 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700782
Victor Stinnerfea6a102014-07-25 00:54:53 +0200783 def __del__(self):
784 self.close()
785
Guido van Rossum59691282013-10-30 14:52:03 -0700786
787class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
788
789 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
790 self._proc = windows_utils.Popen(
791 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
792 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700793
Guido van Rossum59691282013-10-30 14:52:03 -0700794 def callback(f):
795 returncode = self._proc.poll()
796 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700797
Guido van Rossum59691282013-10-30 14:52:03 -0700798 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
799 f.add_done_callback(callback)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800800
801
802SelectorEventLoop = _WindowsSelectorEventLoop
803
804
Miss Islington (bot)07384432018-06-07 18:31:50 -0700805class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800806 _loop_factory = SelectorEventLoop
807
808
Miss Islington (bot)07384432018-06-07 18:31:50 -0700809class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
810 _loop_factory = ProactorEventLoop
811
812
813DefaultEventLoopPolicy = WindowsSelectorEventLoopPolicy