blob: d22edec51efc1fbde3c1a01a3569a7b4ec0d0789 [file] [log] [blame]
Yury Selivanovb0b0e622014-02-18 22:27:48 -05001"""Selector and proactor event loops for Windows."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Victor Stinner4271dfd2017-11-28 15:19:56 +01003import _overlapped
Victor Stinnerf2e17682014-01-31 16:25:24 +01004import _winapi
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import errno
Victor Stinnerf2e17682014-01-31 16:25:24 +01006import math
Andrew Svetlova19fb3c2018-02-25 19:32:14 +03007import msvcrt
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008import socket
Victor Stinnerf2e17682014-01-31 16:25:24 +01009import struct
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080012from . import events
Guido van Rossum59691282013-10-30 14:52:03 -070013from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014from . import futures
15from . import proactor_events
16from . import selector_events
17from . import tasks
18from . import windows_utils
Victor Stinnerf951d282014-06-29 00:46:45 +020019from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
Yury Selivanov6370f342017-12-10 18:36:12 -050022__all__ = (
23 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
24 'DefaultEventLoopPolicy',
25)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
27
28NULL = 0
29INFINITE = 0xffffffff
30ERROR_CONNECTION_REFUSED = 1225
31ERROR_CONNECTION_ABORTED = 1236
32
Victor Stinner7ffa2c52015-01-22 22:55:08 +010033# Initial delay in seconds for connect_pipe() before retrying to connect
34CONNECT_PIPE_INIT_DELAY = 0.001
35
36# Maximum delay in seconds for connect_pipe() before retrying to connect
37CONNECT_PIPE_MAX_DELAY = 0.100
38
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039
40class _OverlappedFuture(futures.Future):
41 """Subclass of Future which represents an overlapped operation.
42
43 Cancelling it will immediately cancel the overlapped operation.
44 """
45
46 def __init__(self, ov, *, loop=None):
47 super().__init__(loop=loop)
Victor Stinnerfea6a102014-07-25 00:54:53 +020048 if self._source_traceback:
49 del self._source_traceback[-1]
Victor Stinner18a28dc2014-07-25 13:05:20 +020050 self._ov = ov
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051
Victor Stinner313a9802014-07-29 12:58:23 +020052 def _repr_info(self):
53 info = super()._repr_info()
Victor Stinner18a28dc2014-07-25 13:05:20 +020054 if self._ov is not None:
55 state = 'pending' if self._ov.pending else 'completed'
Yury Selivanov6370f342017-12-10 18:36:12 -050056 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
Victor Stinner313a9802014-07-29 12:58:23 +020057 return info
Victor Stinnere912e652014-07-12 03:11:53 +020058
Victor Stinner18a28dc2014-07-25 13:05:20 +020059 def _cancel_overlapped(self):
60 if self._ov is None:
61 return
62 try:
63 self._ov.cancel()
64 except OSError as exc:
65 context = {
66 'message': 'Cancelling an overlapped future failed',
67 'exception': exc,
68 'future': self,
69 }
70 if self._source_traceback:
71 context['source_traceback'] = self._source_traceback
72 self._loop.call_exception_handler(context)
73 self._ov = None
74
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 def cancel(self):
Victor Stinner18a28dc2014-07-25 13:05:20 +020076 self._cancel_overlapped()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 return super().cancel()
78
Victor Stinner18a28dc2014-07-25 13:05:20 +020079 def set_exception(self, exception):
80 super().set_exception(exception)
81 self._cancel_overlapped()
82
Victor Stinner51e44ea2014-07-26 00:58:34 +020083 def set_result(self, result):
84 super().set_result(result)
85 self._ov = None
86
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070087
Victor Stinnerd0a28de2015-01-21 23:39:51 +010088class _BaseWaitHandleFuture(futures.Future):
Guido van Rossum90fb9142013-10-30 14:44:05 -070089 """Subclass of Future which represents a wait handle."""
90
Victor Stinnerd0a28de2015-01-21 23:39:51 +010091 def __init__(self, ov, handle, wait_handle, *, loop=None):
Guido van Rossum90fb9142013-10-30 14:44:05 -070092 super().__init__(loop=loop)
Victor Stinner313a9802014-07-29 12:58:23 +020093 if self._source_traceback:
94 del self._source_traceback[-1]
Victor Stinnerd0a28de2015-01-21 23:39:51 +010095 # Keep a reference to the Overlapped object to keep it alive until the
96 # wait is unregistered
Victor Stinner313a9802014-07-29 12:58:23 +020097 self._ov = ov
Victor Stinner18a28dc2014-07-25 13:05:20 +020098 self._handle = handle
Guido van Rossum90fb9142013-10-30 14:44:05 -070099 self._wait_handle = wait_handle
100
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100101 # Should we call UnregisterWaitEx() if the wait completes
102 # or is cancelled?
103 self._registered = True
104
Victor Stinner18a28dc2014-07-25 13:05:20 +0200105 def _poll(self):
106 # non-blocking wait: use a timeout of 0 millisecond
107 return (_winapi.WaitForSingleObject(self._handle, 0) ==
108 _winapi.WAIT_OBJECT_0)
109
Victor Stinner313a9802014-07-29 12:58:23 +0200110 def _repr_info(self):
111 info = super()._repr_info()
Yury Selivanov6370f342017-12-10 18:36:12 -0500112 info.append(f'handle={self._handle:#x}')
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100113 if self._handle is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200114 state = 'signaled' if self._poll() else 'waiting'
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100115 info.append(state)
116 if self._wait_handle is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500117 info.append(f'wait_handle={self._wait_handle:#x}')
Victor Stinner313a9802014-07-29 12:58:23 +0200118 return info
Victor Stinner18a28dc2014-07-25 13:05:20 +0200119
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100120 def _unregister_wait_cb(self, fut):
121 # The wait was unregistered: it's not safe to destroy the Overlapped
122 # object
123 self._ov = None
124
Victor Stinner313a9802014-07-29 12:58:23 +0200125 def _unregister_wait(self):
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100126 if not self._registered:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200127 return
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100128 self._registered = False
129
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100130 wait_handle = self._wait_handle
131 self._wait_handle = None
Guido van Rossum90fb9142013-10-30 14:44:05 -0700132 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100133 _overlapped.UnregisterWait(wait_handle)
Victor Stinnerb2614752014-08-25 23:20:52 +0200134 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100135 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerb2614752014-08-25 23:20:52 +0200136 context = {
137 'message': 'Failed to unregister the wait handle',
138 'exception': exc,
139 'future': self,
140 }
141 if self._source_traceback:
142 context['source_traceback'] = self._source_traceback
143 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100144 return
145 # ERROR_IO_PENDING means that the unregister is pending
146
147 self._unregister_wait_cb(None)
Victor Stinnerfea6a102014-07-25 00:54:53 +0200148
149 def cancel(self):
Victor Stinner313a9802014-07-29 12:58:23 +0200150 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100151 return super().cancel()
Victor Stinner313a9802014-07-29 12:58:23 +0200152
153 def set_exception(self, exception):
Victor Stinner313a9802014-07-29 12:58:23 +0200154 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100155 super().set_exception(exception)
Victor Stinner313a9802014-07-29 12:58:23 +0200156
157 def set_result(self, result):
Victor Stinner313a9802014-07-29 12:58:23 +0200158 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100159 super().set_result(result)
160
161
162class _WaitCancelFuture(_BaseWaitHandleFuture):
163 """Subclass of Future which represents a wait for the cancellation of a
164 _WaitHandleFuture using an event.
165 """
166
167 def __init__(self, ov, event, wait_handle, *, loop=None):
168 super().__init__(ov, event, wait_handle, loop=loop)
169
170 self._done_callback = None
171
Victor Stinner1ca93922015-01-22 00:17:54 +0100172 def cancel(self):
173 raise RuntimeError("_WaitCancelFuture must not be cancelled")
174
INADA Naokia8363622016-10-21 12:30:15 +0900175 def set_result(self, result):
176 super().set_result(result)
177 if self._done_callback is not None:
178 self._done_callback(self)
179
180 def set_exception(self, exception):
181 super().set_exception(exception)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100182 if self._done_callback is not None:
183 self._done_callback(self)
184
185
186class _WaitHandleFuture(_BaseWaitHandleFuture):
187 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
188 super().__init__(ov, handle, wait_handle, loop=loop)
189 self._proactor = proactor
190 self._unregister_proactor = True
191 self._event = _overlapped.CreateEvent(None, True, False, None)
192 self._event_fut = None
193
194 def _unregister_wait_cb(self, fut):
195 if self._event is not None:
196 _winapi.CloseHandle(self._event)
197 self._event = None
198 self._event_fut = None
199
200 # If the wait was cancelled, the wait may never be signalled, so
201 # it's required to unregister it. Otherwise, IocpProactor.close() will
202 # wait forever for an event which will never come.
203 #
204 # If the IocpProactor already received the event, it's safe to call
205 # _unregister() because we kept a reference to the Overlapped object
Martin Panter6245cb32016-04-15 02:14:19 +0000206 # which is used as a unique key.
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100207 self._proactor._unregister(self._ov)
208 self._proactor = None
209
210 super()._unregister_wait_cb(fut)
211
212 def _unregister_wait(self):
213 if not self._registered:
214 return
215 self._registered = False
216
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100217 wait_handle = self._wait_handle
218 self._wait_handle = None
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100219 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100220 _overlapped.UnregisterWaitEx(wait_handle, self._event)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100221 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100222 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100223 context = {
224 'message': 'Failed to unregister the wait handle',
225 'exception': exc,
226 'future': self,
227 }
228 if self._source_traceback:
229 context['source_traceback'] = self._source_traceback
230 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100231 return
232 # ERROR_IO_PENDING is not an error, the wait was unregistered
233
234 self._event_fut = self._proactor._wait_cancel(self._event,
235 self._unregister_wait_cb)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700236
237
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700238class PipeServer(object):
239 """Class representing a pipe server.
240
241 This is much like a bound, listening socket.
242 """
243 def __init__(self, address):
244 self._address = address
245 self._free_instances = weakref.WeakSet()
Victor Stinnerb2614752014-08-25 23:20:52 +0200246 # initialize the pipe attribute before calling _server_pipe_handle()
247 # because this function can raise an exception and the destructor calls
248 # the close() method
249 self._pipe = None
250 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251 self._pipe = self._server_pipe_handle(True)
252
253 def _get_unconnected_pipe(self):
254 # Create new instance and return previous one. This ensures
255 # that (until the server is closed) there is always at least
256 # one pipe handle for address. Therefore if a client attempt
257 # to connect it will not fail with FileNotFoundError.
258 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
259 return tmp
260
261 def _server_pipe_handle(self, first):
262 # Return a wrapper for a new pipe handle.
Victor Stinnera19b7b32015-01-26 15:03:20 +0100263 if self.closed():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264 return None
265 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
266 if first:
267 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
268 h = _winapi.CreateNamedPipe(
269 self._address, flags,
270 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
271 _winapi.PIPE_WAIT,
272 _winapi.PIPE_UNLIMITED_INSTANCES,
273 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
274 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
275 pipe = windows_utils.PipeHandle(h)
276 self._free_instances.add(pipe)
277 return pipe
278
Victor Stinnera19b7b32015-01-26 15:03:20 +0100279 def closed(self):
280 return (self._address is None)
281
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282 def close(self):
Victor Stinnerb2614752014-08-25 23:20:52 +0200283 if self._accept_pipe_future is not None:
284 self._accept_pipe_future.cancel()
285 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286 # Close all instances which have not been connected to by a client.
287 if self._address is not None:
288 for pipe in self._free_instances:
289 pipe.close()
290 self._pipe = None
291 self._address = None
292 self._free_instances.clear()
293
294 __del__ = close
295
296
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800297class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298 """Windows version of selector event loop."""
299
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300
301class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
302 """Windows version of proactor event loop using IOCP."""
303
304 def __init__(self, proactor=None):
305 if proactor is None:
306 proactor = IocpProactor()
307 super().__init__(proactor)
308
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200309 async def create_pipe_connection(self, protocol_factory, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700310 f = self._proactor.connect_pipe(address)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200311 pipe = await f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312 protocol = protocol_factory()
313 trans = self._make_duplex_pipe_transport(pipe, protocol,
314 extra={'addr': address})
315 return trans, protocol
316
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200317 async def start_serving_pipe(self, protocol_factory, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700318 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700319
Victor Stinnerb2614752014-08-25 23:20:52 +0200320 def loop_accept_pipe(f=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700321 pipe = None
322 try:
323 if f:
324 pipe = f.result()
325 server._free_instances.discard(pipe)
Victor Stinnera19b7b32015-01-26 15:03:20 +0100326
327 if server.closed():
328 # A client connected before the server was closed:
329 # drop the client (close the pipe) and exit
330 pipe.close()
331 return
332
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333 protocol = protocol_factory()
334 self._make_duplex_pipe_transport(
335 pipe, protocol, extra={'addr': address})
Victor Stinnera19b7b32015-01-26 15:03:20 +0100336
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337 pipe = server._get_unconnected_pipe()
338 if pipe is None:
339 return
Victor Stinnera19b7b32015-01-26 15:03:20 +0100340
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700341 f = self._proactor.accept_pipe(pipe)
Yury Selivanovff827f02014-02-18 18:02:19 -0500342 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 if pipe and pipe.fileno() != -1:
Yury Selivanovff827f02014-02-18 18:02:19 -0500344 self.call_exception_handler({
345 'message': 'Pipe accept failed',
346 'exception': exc,
347 'pipe': pipe,
348 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349 pipe.close()
Victor Stinnerb2614752014-08-25 23:20:52 +0200350 elif self._debug:
351 logger.warning("Accept pipe failed on pipe %r",
352 pipe, exc_info=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 except futures.CancelledError:
354 if pipe:
355 pipe.close()
356 else:
Victor Stinnerb2614752014-08-25 23:20:52 +0200357 server._accept_pipe_future = f
358 f.add_done_callback(loop_accept_pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700359
Victor Stinnerb2614752014-08-25 23:20:52 +0200360 self.call_soon(loop_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 return [server]
362
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200363 async def _make_subprocess_transport(self, protocol, args, shell,
364 stdin, stdout, stderr, bufsize,
365 extra=None, **kwargs):
Yury Selivanov7661db62016-05-16 15:38:39 -0400366 waiter = self.create_future()
Guido van Rossum59691282013-10-30 14:52:03 -0700367 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
368 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100369 waiter=waiter, extra=extra,
370 **kwargs)
Victor Stinner4bf22e02015-01-15 14:24:22 +0100371 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200372 await waiter
373 except Exception:
Victor Stinner4bf22e02015-01-15 14:24:22 +0100374 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200375 await transp._wait()
376 raise
Victor Stinner4bf22e02015-01-15 14:24:22 +0100377
Guido van Rossum59691282013-10-30 14:52:03 -0700378 return transp
379
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380
381class IocpProactor:
382 """Proactor implementation using IOCP."""
383
384 def __init__(self, concurrency=0xffffffff):
385 self._loop = None
386 self._results = []
387 self._iocp = _overlapped.CreateIoCompletionPort(
388 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
389 self._cache = {}
390 self._registered = weakref.WeakSet()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100391 self._unregistered = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 self._stopped_serving = weakref.WeakSet()
393
Victor Stinnerfea6a102014-07-25 00:54:53 +0200394 def __repr__(self):
395 return ('<%s overlapped#=%s result#=%s>'
396 % (self.__class__.__name__, len(self._cache),
397 len(self._results)))
398
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 def set_loop(self, loop):
400 self._loop = loop
401
402 def select(self, timeout=None):
403 if not self._results:
404 self._poll(timeout)
405 tmp = self._results
406 self._results = []
407 return tmp
408
Victor Stinner41063d22015-01-26 22:30:49 +0100409 def _result(self, value):
Yury Selivanov7661db62016-05-16 15:38:39 -0400410 fut = self._loop.create_future()
Victor Stinner41063d22015-01-26 22:30:49 +0100411 fut.set_result(value)
412 return fut
413
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414 def recv(self, conn, nbytes, flags=0):
415 self._register_with_iocp(conn)
416 ov = _overlapped.Overlapped(NULL)
Victor Stinner41063d22015-01-26 22:30:49 +0100417 try:
418 if isinstance(conn, socket.socket):
419 ov.WSARecv(conn.fileno(), nbytes, flags)
420 else:
421 ov.ReadFile(conn.fileno(), nbytes)
422 except BrokenPipeError:
423 return self._result(b'')
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700424
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100425 def finish_recv(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 try:
427 return ov.getresult()
428 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200429 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
430 _overlapped.ERROR_OPERATION_ABORTED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700431 raise ConnectionResetError(*exc.args)
432 else:
433 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700434
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100435 return self._register(ov, conn, finish_recv)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200437 def recv_into(self, conn, buf, flags=0):
438 self._register_with_iocp(conn)
439 ov = _overlapped.Overlapped(NULL)
440 try:
441 if isinstance(conn, socket.socket):
442 ov.WSARecvInto(conn.fileno(), buf, flags)
443 else:
444 ov.ReadFileInto(conn.fileno(), buf)
445 except BrokenPipeError:
446 return self._result(b'')
447
448 def finish_recv(trans, key, ov):
449 try:
450 return ov.getresult()
451 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200452 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
453 _overlapped.ERROR_OPERATION_ABORTED):
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200454 raise ConnectionResetError(*exc.args)
455 else:
456 raise
457
458 return self._register(ov, conn, finish_recv)
459
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460 def send(self, conn, buf, flags=0):
461 self._register_with_iocp(conn)
462 ov = _overlapped.Overlapped(NULL)
463 if isinstance(conn, socket.socket):
464 ov.WSASend(conn.fileno(), buf, flags)
465 else:
466 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700467
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100468 def finish_send(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469 try:
470 return ov.getresult()
471 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200472 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
473 _overlapped.ERROR_OPERATION_ABORTED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474 raise ConnectionResetError(*exc.args)
475 else:
476 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700477
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100478 return self._register(ov, conn, finish_send)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700479
480 def accept(self, listener):
481 self._register_with_iocp(listener)
482 conn = self._get_accept_socket(listener.family)
483 ov = _overlapped.Overlapped(NULL)
484 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700485
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700486 def finish_accept(trans, key, ov):
487 ov.getresult()
488 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
489 buf = struct.pack('@P', listener.fileno())
490 conn.setsockopt(socket.SOL_SOCKET,
491 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
492 conn.settimeout(listener.gettimeout())
493 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700494
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200495 async def accept_coro(future, conn):
Victor Stinner7de26462014-01-11 00:03:21 +0100496 # Coroutine closing the accept socket if the future is cancelled
497 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200498 await future
Victor Stinner7de26462014-01-11 00:03:21 +0100499 except futures.CancelledError:
500 conn.close()
501 raise
502
503 future = self._register(ov, listener, finish_accept)
504 coro = accept_coro(future, conn)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400505 tasks.ensure_future(coro, loop=self._loop)
Victor Stinner7de26462014-01-11 00:03:21 +0100506 return future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700507
508 def connect(self, conn, address):
509 self._register_with_iocp(conn)
510 # The socket needs to be locally bound before we call ConnectEx().
511 try:
512 _overlapped.BindLocal(conn.fileno(), conn.family)
513 except OSError as e:
514 if e.winerror != errno.WSAEINVAL:
515 raise
516 # Probably already locally bound; check using getsockname().
517 if conn.getsockname()[1] == 0:
518 raise
519 ov = _overlapped.Overlapped(NULL)
520 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700521
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700522 def finish_connect(trans, key, ov):
523 ov.getresult()
524 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
525 conn.setsockopt(socket.SOL_SOCKET,
526 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
527 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700528
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700529 return self._register(ov, conn, finish_connect)
530
Andrew Svetlova19fb3c2018-02-25 19:32:14 +0300531 def sendfile(self, sock, file, offset, count):
532 self._register_with_iocp(sock)
533 ov = _overlapped.Overlapped(NULL)
534 offset_low = offset & 0xffff_ffff
535 offset_high = (offset >> 32) & 0xffff_ffff
536 ov.TransmitFile(sock.fileno(),
537 msvcrt.get_osfhandle(file.fileno()),
538 offset_low, offset_high,
539 count, 0, 0)
540
541 def finish_sendfile(trans, key, ov):
542 try:
543 return ov.getresult()
544 except OSError as exc:
545 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
546 _overlapped.ERROR_OPERATION_ABORTED):
547 raise ConnectionResetError(*exc.args)
548 else:
549 raise
550 return self._register(ov, sock, finish_sendfile)
551
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552 def accept_pipe(self, pipe):
553 self._register_with_iocp(pipe)
554 ov = _overlapped.Overlapped(NULL)
Victor Stinner2b77c542015-01-22 23:50:03 +0100555 connected = ov.ConnectNamedPipe(pipe.fileno())
556
557 if connected:
558 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
559 # that the pipe is connected. There is no need to wait for the
560 # completion of the connection.
Victor Stinner41063d22015-01-26 22:30:49 +0100561 return self._result(pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700562
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100563 def finish_accept_pipe(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700564 ov.getresult()
565 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700566
Victor Stinner2b77c542015-01-22 23:50:03 +0100567 return self._register(ov, pipe, finish_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200569 async def connect_pipe(self, address):
Victor Stinnere0fd1572015-01-26 15:04:03 +0100570 delay = CONNECT_PIPE_INIT_DELAY
571 while True:
Yury Selivanov6370f342017-12-10 18:36:12 -0500572 # Unfortunately there is no way to do an overlapped connect to
573 # a pipe. Call CreateFile() in a loop until it doesn't fail with
574 # ERROR_PIPE_BUSY.
Victor Stinnere0fd1572015-01-26 15:04:03 +0100575 try:
576 handle = _overlapped.ConnectPipe(address)
577 break
578 except OSError as exc:
579 if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
580 raise
581
582 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
583 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200584 await tasks.sleep(delay, loop=self._loop)
Victor Stinnere0fd1572015-01-26 15:04:03 +0100585
586 return windows_utils.PipeHandle(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587
Guido van Rossum90fb9142013-10-30 14:44:05 -0700588 def wait_for_handle(self, handle, timeout=None):
Victor Stinner4d825b42014-12-19 17:10:44 +0100589 """Wait for a handle.
590
591 Return a Future object. The result of the future is True if the wait
592 completed, or False if the wait did not complete (on timeout).
593 """
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100594 return self._wait_for_handle(handle, timeout, False)
595
596 def _wait_cancel(self, event, done_callback):
597 fut = self._wait_for_handle(event, None, True)
598 # add_done_callback() cannot be used because the wait may only complete
599 # in IocpProactor.close(), while the event loop is not running.
600 fut._done_callback = done_callback
601 return fut
602
603 def _wait_for_handle(self, handle, timeout, _is_cancel):
Guido van Rossum90fb9142013-10-30 14:44:05 -0700604 if timeout is None:
605 ms = _winapi.INFINITE
606 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100607 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
608 # round away from zero to wait *at least* timeout seconds.
609 ms = math.ceil(timeout * 1e3)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700610
611 # We only create ov so we can use ov.address as a key for the cache.
612 ov = _overlapped.Overlapped(NULL)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100613 wait_handle = _overlapped.RegisterWaitWithQueue(
Guido van Rossum90fb9142013-10-30 14:44:05 -0700614 handle, self._iocp, ov.address, ms)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100615 if _is_cancel:
616 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
617 else:
618 f = _WaitHandleFuture(ov, handle, wait_handle, self,
619 loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200620 if f._source_traceback:
621 del f._source_traceback[-1]
Guido van Rossum90fb9142013-10-30 14:44:05 -0700622
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100623 def finish_wait_for_handle(trans, key, ov):
Richard Oudkerk71196e72013-11-24 17:50:40 +0000624 # Note that this second wait means that we should only use
625 # this with handles types where a successful wait has no
626 # effect. So events or processes are all right, but locks
627 # or semaphores are not. Also note if the handle is
628 # signalled and then quickly reset, then we may return
629 # False even though we have not timed out.
Victor Stinner313a9802014-07-29 12:58:23 +0200630 return f._poll()
Guido van Rossum90fb9142013-10-30 14:44:05 -0700631
Victor Stinner313a9802014-07-29 12:58:23 +0200632 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700633 return f
634
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700635 def _register_with_iocp(self, obj):
636 # To get notifications of finished ops on this objects sent to the
637 # completion port, were must register the handle.
638 if obj not in self._registered:
639 self._registered.add(obj)
640 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
641 # XXX We could also use SetFileCompletionNotificationModes()
642 # to avoid sending notifications to completion port of ops
643 # that succeed immediately.
644
Victor Stinner2b77c542015-01-22 23:50:03 +0100645 def _register(self, ov, obj, callback):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700646 # Return a future which will be set with the result of the
647 # operation when it completes. The future's value is actually
648 # the value returned by callback().
649 f = _OverlappedFuture(ov, loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200650 if f._source_traceback:
651 del f._source_traceback[-1]
Victor Stinner2b77c542015-01-22 23:50:03 +0100652 if not ov.pending:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700653 # The operation has completed, so no need to postpone the
654 # work. We cannot take this short cut if we need the
655 # NumberOfBytes, CompletionKey values returned by
656 # PostQueuedCompletionStatus().
657 try:
658 value = callback(None, None, ov)
659 except OSError as e:
660 f.set_exception(e)
661 else:
662 f.set_result(value)
Victor Stinner42d3bde2014-07-28 00:18:43 +0200663 # Even if GetOverlappedResult() was called, we have to wait for the
664 # notification of the completion in GetQueuedCompletionStatus().
665 # Register the overlapped operation to keep a reference to the
666 # OVERLAPPED object, otherwise the memory is freed and Windows may
667 # read uninitialized memory.
Victor Stinner2b77c542015-01-22 23:50:03 +0100668
669 # Register the overlapped operation for later. Note that
670 # we only store obj to prevent it from being garbage
671 # collected too early.
672 self._cache[ov.address] = (f, ov, obj, callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700673 return f
674
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100675 def _unregister(self, ov):
676 """Unregister an overlapped object.
677
678 Call this method when its future has been cancelled. The event can
679 already be signalled (pending in the proactor event queue). It is also
680 safe if the event is never signalled (because it was cancelled).
681 """
682 self._unregistered.append(ov)
683
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700684 def _get_accept_socket(self, family):
685 s = socket.socket(family)
686 s.settimeout(0)
687 return s
688
689 def _poll(self, timeout=None):
690 if timeout is None:
691 ms = INFINITE
692 elif timeout < 0:
693 raise ValueError("negative timeout")
694 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100695 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
696 # round away from zero to wait *at least* timeout seconds.
697 ms = math.ceil(timeout * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700698 if ms >= INFINITE:
699 raise ValueError("timeout too big")
Victor Stinner313a9802014-07-29 12:58:23 +0200700
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700701 while True:
702 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
703 if status is None:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100704 break
Victor Stinner313a9802014-07-29 12:58:23 +0200705 ms = 0
706
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700707 err, transferred, key, address = status
708 try:
709 f, ov, obj, callback = self._cache.pop(address)
710 except KeyError:
Victor Stinner42d3bde2014-07-28 00:18:43 +0200711 if self._loop.get_debug():
712 self._loop.call_exception_handler({
713 'message': ('GetQueuedCompletionStatus() returned an '
714 'unexpected event'),
715 'status': ('err=%s transferred=%s key=%#x address=%#x'
716 % (err, transferred, key, address)),
717 })
718
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700719 # key is either zero, or it is used to return a pipe
720 # handle which should be closed to avoid a leak.
721 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
722 _winapi.CloseHandle(key)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700723 continue
Victor Stinner51e44ea2014-07-26 00:58:34 +0200724
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700725 if obj in self._stopped_serving:
726 f.cancel()
Victor Stinner42d3bde2014-07-28 00:18:43 +0200727 # Don't call the callback if _register() already read the result or
728 # if the overlapped has been cancelled
729 elif not f.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700730 try:
731 value = callback(transferred, key, ov)
732 except OSError as e:
733 f.set_exception(e)
734 self._results.append(f)
735 else:
736 f.set_result(value)
737 self._results.append(f)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700738
Andrew Svetlov7a6706b2017-12-13 17:50:16 +0200739 # Remove unregistered futures
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100740 for ov in self._unregistered:
741 self._cache.pop(ov.address, None)
742 self._unregistered.clear()
743
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700744 def _stop_serving(self, obj):
745 # obj is a socket or pipe handle. It will be closed in
746 # BaseProactorEventLoop._stop_serving() which will make any
747 # pending operations fail quickly.
748 self._stopped_serving.add(obj)
749
750 def close(self):
751 # Cancel remaining registered operations.
Victor Stinnerfea6a102014-07-25 00:54:53 +0200752 for address, (fut, ov, obj, callback) in list(self._cache.items()):
Victor Stinner3d2256f2015-01-26 11:02:59 +0100753 if fut.cancelled():
Victor Stinner752aba72015-01-22 22:47:13 +0100754 # Nothing to do with cancelled futures
755 pass
Victor Stinner1ca93922015-01-22 00:17:54 +0100756 elif isinstance(fut, _WaitCancelFuture):
757 # _WaitCancelFuture must not be cancelled
758 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700759 else:
760 try:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200761 fut.cancel()
762 except OSError as exc:
763 if self._loop is not None:
764 context = {
765 'message': 'Cancelling a future failed',
766 'exception': exc,
767 'future': fut,
768 }
769 if fut._source_traceback:
770 context['source_traceback'] = fut._source_traceback
771 self._loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700772
773 while self._cache:
774 if not self._poll(1):
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700775 logger.debug('taking long time to close proactor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700776
777 self._results = []
778 if self._iocp is not None:
779 _winapi.CloseHandle(self._iocp)
780 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700781
Victor Stinnerfea6a102014-07-25 00:54:53 +0200782 def __del__(self):
783 self.close()
784
Guido van Rossum59691282013-10-30 14:52:03 -0700785
786class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
787
788 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
789 self._proc = windows_utils.Popen(
790 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
791 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700792
Guido van Rossum59691282013-10-30 14:52:03 -0700793 def callback(f):
794 returncode = self._proc.poll()
795 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700796
Guido van Rossum59691282013-10-30 14:52:03 -0700797 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
798 f.add_done_callback(callback)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800799
800
801SelectorEventLoop = _WindowsSelectorEventLoop
802
803
804class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
805 _loop_factory = SelectorEventLoop
806
807
808DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy