blob: f91fcddb2aad32596d106fa5fdc22f2776e27c66 [file] [log] [blame]
Yury Selivanovb0b0e622014-02-18 22:27:48 -05001"""Selector and proactor event loops for Windows."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Victor Stinner4271dfd2017-11-28 15:19:56 +01003import _overlapped
Victor Stinnerf2e17682014-01-31 16:25:24 +01004import _winapi
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import errno
Victor Stinnerf2e17682014-01-31 16:25:24 +01006import math
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007import socket
Victor Stinnerf2e17682014-01-31 16:25:24 +01008import struct
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011from . import events
Guido van Rossum59691282013-10-30 14:52:03 -070012from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013from . import futures
14from . import proactor_events
15from . import selector_events
16from . import tasks
17from . import windows_utils
Victor Stinnerf951d282014-06-29 00:46:45 +020018from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019
20
Yury Selivanov6370f342017-12-10 18:36:12 -050021__all__ = (
22 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
23 'DefaultEventLoopPolicy',
24)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
27NULL = 0
28INFINITE = 0xffffffff
29ERROR_CONNECTION_REFUSED = 1225
30ERROR_CONNECTION_ABORTED = 1236
31
Victor Stinner7ffa2c52015-01-22 22:55:08 +010032# Initial delay in seconds for connect_pipe() before retrying to connect
33CONNECT_PIPE_INIT_DELAY = 0.001
34
35# Maximum delay in seconds for connect_pipe() before retrying to connect
36CONNECT_PIPE_MAX_DELAY = 0.100
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
39class _OverlappedFuture(futures.Future):
40 """Subclass of Future which represents an overlapped operation.
41
42 Cancelling it will immediately cancel the overlapped operation.
43 """
44
45 def __init__(self, ov, *, loop=None):
46 super().__init__(loop=loop)
Victor Stinnerfea6a102014-07-25 00:54:53 +020047 if self._source_traceback:
48 del self._source_traceback[-1]
Victor Stinner18a28dc2014-07-25 13:05:20 +020049 self._ov = ov
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
Victor Stinner313a9802014-07-29 12:58:23 +020051 def _repr_info(self):
52 info = super()._repr_info()
Victor Stinner18a28dc2014-07-25 13:05:20 +020053 if self._ov is not None:
54 state = 'pending' if self._ov.pending else 'completed'
Yury Selivanov6370f342017-12-10 18:36:12 -050055 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
Victor Stinner313a9802014-07-29 12:58:23 +020056 return info
Victor Stinnere912e652014-07-12 03:11:53 +020057
Victor Stinner18a28dc2014-07-25 13:05:20 +020058 def _cancel_overlapped(self):
59 if self._ov is None:
60 return
61 try:
62 self._ov.cancel()
63 except OSError as exc:
64 context = {
65 'message': 'Cancelling an overlapped future failed',
66 'exception': exc,
67 'future': self,
68 }
69 if self._source_traceback:
70 context['source_traceback'] = self._source_traceback
71 self._loop.call_exception_handler(context)
72 self._ov = None
73
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 def cancel(self):
Victor Stinner18a28dc2014-07-25 13:05:20 +020075 self._cancel_overlapped()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 return super().cancel()
77
Victor Stinner18a28dc2014-07-25 13:05:20 +020078 def set_exception(self, exception):
79 super().set_exception(exception)
80 self._cancel_overlapped()
81
Victor Stinner51e44ea2014-07-26 00:58:34 +020082 def set_result(self, result):
83 super().set_result(result)
84 self._ov = None
85
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086
Victor Stinnerd0a28de2015-01-21 23:39:51 +010087class _BaseWaitHandleFuture(futures.Future):
Guido van Rossum90fb9142013-10-30 14:44:05 -070088 """Subclass of Future which represents a wait handle."""
89
Victor Stinnerd0a28de2015-01-21 23:39:51 +010090 def __init__(self, ov, handle, wait_handle, *, loop=None):
Guido van Rossum90fb9142013-10-30 14:44:05 -070091 super().__init__(loop=loop)
Victor Stinner313a9802014-07-29 12:58:23 +020092 if self._source_traceback:
93 del self._source_traceback[-1]
Victor Stinnerd0a28de2015-01-21 23:39:51 +010094 # Keep a reference to the Overlapped object to keep it alive until the
95 # wait is unregistered
Victor Stinner313a9802014-07-29 12:58:23 +020096 self._ov = ov
Victor Stinner18a28dc2014-07-25 13:05:20 +020097 self._handle = handle
Guido van Rossum90fb9142013-10-30 14:44:05 -070098 self._wait_handle = wait_handle
99
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100100 # Should we call UnregisterWaitEx() if the wait completes
101 # or is cancelled?
102 self._registered = True
103
Victor Stinner18a28dc2014-07-25 13:05:20 +0200104 def _poll(self):
105 # non-blocking wait: use a timeout of 0 millisecond
106 return (_winapi.WaitForSingleObject(self._handle, 0) ==
107 _winapi.WAIT_OBJECT_0)
108
Victor Stinner313a9802014-07-29 12:58:23 +0200109 def _repr_info(self):
110 info = super()._repr_info()
Yury Selivanov6370f342017-12-10 18:36:12 -0500111 info.append(f'handle={self._handle:#x}')
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100112 if self._handle is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200113 state = 'signaled' if self._poll() else 'waiting'
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100114 info.append(state)
115 if self._wait_handle is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500116 info.append(f'wait_handle={self._wait_handle:#x}')
Victor Stinner313a9802014-07-29 12:58:23 +0200117 return info
Victor Stinner18a28dc2014-07-25 13:05:20 +0200118
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100119 def _unregister_wait_cb(self, fut):
120 # The wait was unregistered: it's not safe to destroy the Overlapped
121 # object
122 self._ov = None
123
Victor Stinner313a9802014-07-29 12:58:23 +0200124 def _unregister_wait(self):
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100125 if not self._registered:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200126 return
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100127 self._registered = False
128
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100129 wait_handle = self._wait_handle
130 self._wait_handle = None
Guido van Rossum90fb9142013-10-30 14:44:05 -0700131 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100132 _overlapped.UnregisterWait(wait_handle)
Victor Stinnerb2614752014-08-25 23:20:52 +0200133 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100134 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerb2614752014-08-25 23:20:52 +0200135 context = {
136 'message': 'Failed to unregister the wait handle',
137 'exception': exc,
138 'future': self,
139 }
140 if self._source_traceback:
141 context['source_traceback'] = self._source_traceback
142 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100143 return
144 # ERROR_IO_PENDING means that the unregister is pending
145
146 self._unregister_wait_cb(None)
Victor Stinnerfea6a102014-07-25 00:54:53 +0200147
148 def cancel(self):
Victor Stinner313a9802014-07-29 12:58:23 +0200149 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100150 return super().cancel()
Victor Stinner313a9802014-07-29 12:58:23 +0200151
152 def set_exception(self, exception):
Victor Stinner313a9802014-07-29 12:58:23 +0200153 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100154 super().set_exception(exception)
Victor Stinner313a9802014-07-29 12:58:23 +0200155
156 def set_result(self, result):
Victor Stinner313a9802014-07-29 12:58:23 +0200157 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100158 super().set_result(result)
159
160
161class _WaitCancelFuture(_BaseWaitHandleFuture):
162 """Subclass of Future which represents a wait for the cancellation of a
163 _WaitHandleFuture using an event.
164 """
165
166 def __init__(self, ov, event, wait_handle, *, loop=None):
167 super().__init__(ov, event, wait_handle, loop=loop)
168
169 self._done_callback = None
170
Victor Stinner1ca93922015-01-22 00:17:54 +0100171 def cancel(self):
172 raise RuntimeError("_WaitCancelFuture must not be cancelled")
173
INADA Naokia8363622016-10-21 12:30:15 +0900174 def set_result(self, result):
175 super().set_result(result)
176 if self._done_callback is not None:
177 self._done_callback(self)
178
179 def set_exception(self, exception):
180 super().set_exception(exception)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100181 if self._done_callback is not None:
182 self._done_callback(self)
183
184
185class _WaitHandleFuture(_BaseWaitHandleFuture):
186 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
187 super().__init__(ov, handle, wait_handle, loop=loop)
188 self._proactor = proactor
189 self._unregister_proactor = True
190 self._event = _overlapped.CreateEvent(None, True, False, None)
191 self._event_fut = None
192
193 def _unregister_wait_cb(self, fut):
194 if self._event is not None:
195 _winapi.CloseHandle(self._event)
196 self._event = None
197 self._event_fut = None
198
199 # If the wait was cancelled, the wait may never be signalled, so
200 # it's required to unregister it. Otherwise, IocpProactor.close() will
201 # wait forever for an event which will never come.
202 #
203 # If the IocpProactor already received the event, it's safe to call
204 # _unregister() because we kept a reference to the Overlapped object
Martin Panter6245cb32016-04-15 02:14:19 +0000205 # which is used as a unique key.
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100206 self._proactor._unregister(self._ov)
207 self._proactor = None
208
209 super()._unregister_wait_cb(fut)
210
211 def _unregister_wait(self):
212 if not self._registered:
213 return
214 self._registered = False
215
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100216 wait_handle = self._wait_handle
217 self._wait_handle = None
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100218 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100219 _overlapped.UnregisterWaitEx(wait_handle, self._event)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100220 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100221 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100222 context = {
223 'message': 'Failed to unregister the wait handle',
224 'exception': exc,
225 'future': self,
226 }
227 if self._source_traceback:
228 context['source_traceback'] = self._source_traceback
229 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100230 return
231 # ERROR_IO_PENDING is not an error, the wait was unregistered
232
233 self._event_fut = self._proactor._wait_cancel(self._event,
234 self._unregister_wait_cb)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700235
236
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700237class PipeServer(object):
238 """Class representing a pipe server.
239
240 This is much like a bound, listening socket.
241 """
242 def __init__(self, address):
243 self._address = address
244 self._free_instances = weakref.WeakSet()
Victor Stinnerb2614752014-08-25 23:20:52 +0200245 # initialize the pipe attribute before calling _server_pipe_handle()
246 # because this function can raise an exception and the destructor calls
247 # the close() method
248 self._pipe = None
249 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250 self._pipe = self._server_pipe_handle(True)
251
252 def _get_unconnected_pipe(self):
253 # Create new instance and return previous one. This ensures
254 # that (until the server is closed) there is always at least
255 # one pipe handle for address. Therefore if a client attempt
256 # to connect it will not fail with FileNotFoundError.
257 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
258 return tmp
259
260 def _server_pipe_handle(self, first):
261 # Return a wrapper for a new pipe handle.
Victor Stinnera19b7b32015-01-26 15:03:20 +0100262 if self.closed():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263 return None
264 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
265 if first:
266 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
267 h = _winapi.CreateNamedPipe(
268 self._address, flags,
269 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
270 _winapi.PIPE_WAIT,
271 _winapi.PIPE_UNLIMITED_INSTANCES,
272 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
273 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
274 pipe = windows_utils.PipeHandle(h)
275 self._free_instances.add(pipe)
276 return pipe
277
Victor Stinnera19b7b32015-01-26 15:03:20 +0100278 def closed(self):
279 return (self._address is None)
280
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281 def close(self):
Victor Stinnerb2614752014-08-25 23:20:52 +0200282 if self._accept_pipe_future is not None:
283 self._accept_pipe_future.cancel()
284 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285 # Close all instances which have not been connected to by a client.
286 if self._address is not None:
287 for pipe in self._free_instances:
288 pipe.close()
289 self._pipe = None
290 self._address = None
291 self._free_instances.clear()
292
293 __del__ = close
294
295
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800296class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 """Windows version of selector event loop."""
298
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700299
300class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
301 """Windows version of proactor event loop using IOCP."""
302
303 def __init__(self, proactor=None):
304 if proactor is None:
305 proactor = IocpProactor()
306 super().__init__(proactor)
307
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200308 async def create_pipe_connection(self, protocol_factory, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 f = self._proactor.connect_pipe(address)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200310 pipe = await f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311 protocol = protocol_factory()
312 trans = self._make_duplex_pipe_transport(pipe, protocol,
313 extra={'addr': address})
314 return trans, protocol
315
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200316 async def start_serving_pipe(self, protocol_factory, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700318
Victor Stinnerb2614752014-08-25 23:20:52 +0200319 def loop_accept_pipe(f=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320 pipe = None
321 try:
322 if f:
323 pipe = f.result()
324 server._free_instances.discard(pipe)
Victor Stinnera19b7b32015-01-26 15:03:20 +0100325
326 if server.closed():
327 # A client connected before the server was closed:
328 # drop the client (close the pipe) and exit
329 pipe.close()
330 return
331
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 protocol = protocol_factory()
333 self._make_duplex_pipe_transport(
334 pipe, protocol, extra={'addr': address})
Victor Stinnera19b7b32015-01-26 15:03:20 +0100335
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 pipe = server._get_unconnected_pipe()
337 if pipe is None:
338 return
Victor Stinnera19b7b32015-01-26 15:03:20 +0100339
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 f = self._proactor.accept_pipe(pipe)
Yury Selivanovff827f02014-02-18 18:02:19 -0500341 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342 if pipe and pipe.fileno() != -1:
Yury Selivanovff827f02014-02-18 18:02:19 -0500343 self.call_exception_handler({
344 'message': 'Pipe accept failed',
345 'exception': exc,
346 'pipe': pipe,
347 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 pipe.close()
Victor Stinnerb2614752014-08-25 23:20:52 +0200349 elif self._debug:
350 logger.warning("Accept pipe failed on pipe %r",
351 pipe, exc_info=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352 except futures.CancelledError:
353 if pipe:
354 pipe.close()
355 else:
Victor Stinnerb2614752014-08-25 23:20:52 +0200356 server._accept_pipe_future = f
357 f.add_done_callback(loop_accept_pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700358
Victor Stinnerb2614752014-08-25 23:20:52 +0200359 self.call_soon(loop_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 return [server]
361
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200362 async def _make_subprocess_transport(self, protocol, args, shell,
363 stdin, stdout, stderr, bufsize,
364 extra=None, **kwargs):
Yury Selivanov7661db62016-05-16 15:38:39 -0400365 waiter = self.create_future()
Guido van Rossum59691282013-10-30 14:52:03 -0700366 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
367 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100368 waiter=waiter, extra=extra,
369 **kwargs)
Victor Stinner4bf22e02015-01-15 14:24:22 +0100370 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200371 await waiter
372 except Exception:
Victor Stinner4bf22e02015-01-15 14:24:22 +0100373 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200374 await transp._wait()
375 raise
Victor Stinner4bf22e02015-01-15 14:24:22 +0100376
Guido van Rossum59691282013-10-30 14:52:03 -0700377 return transp
378
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379
380class IocpProactor:
381 """Proactor implementation using IOCP."""
382
383 def __init__(self, concurrency=0xffffffff):
384 self._loop = None
385 self._results = []
386 self._iocp = _overlapped.CreateIoCompletionPort(
387 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
388 self._cache = {}
389 self._registered = weakref.WeakSet()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100390 self._unregistered = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391 self._stopped_serving = weakref.WeakSet()
392
Victor Stinnerfea6a102014-07-25 00:54:53 +0200393 def __repr__(self):
394 return ('<%s overlapped#=%s result#=%s>'
395 % (self.__class__.__name__, len(self._cache),
396 len(self._results)))
397
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 def set_loop(self, loop):
399 self._loop = loop
400
401 def select(self, timeout=None):
402 if not self._results:
403 self._poll(timeout)
404 tmp = self._results
405 self._results = []
406 return tmp
407
Victor Stinner41063d22015-01-26 22:30:49 +0100408 def _result(self, value):
Yury Selivanov7661db62016-05-16 15:38:39 -0400409 fut = self._loop.create_future()
Victor Stinner41063d22015-01-26 22:30:49 +0100410 fut.set_result(value)
411 return fut
412
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413 def recv(self, conn, nbytes, flags=0):
414 self._register_with_iocp(conn)
415 ov = _overlapped.Overlapped(NULL)
Victor Stinner41063d22015-01-26 22:30:49 +0100416 try:
417 if isinstance(conn, socket.socket):
418 ov.WSARecv(conn.fileno(), nbytes, flags)
419 else:
420 ov.ReadFile(conn.fileno(), nbytes)
421 except BrokenPipeError:
422 return self._result(b'')
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700423
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100424 def finish_recv(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425 try:
426 return ov.getresult()
427 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200428 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
429 _overlapped.ERROR_OPERATION_ABORTED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700430 raise ConnectionResetError(*exc.args)
431 else:
432 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700433
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100434 return self._register(ov, conn, finish_recv)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700435
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200436 def recv_into(self, conn, buf, flags=0):
437 self._register_with_iocp(conn)
438 ov = _overlapped.Overlapped(NULL)
439 try:
440 if isinstance(conn, socket.socket):
441 ov.WSARecvInto(conn.fileno(), buf, flags)
442 else:
443 ov.ReadFileInto(conn.fileno(), buf)
444 except BrokenPipeError:
445 return self._result(b'')
446
447 def finish_recv(trans, key, ov):
448 try:
449 return ov.getresult()
450 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200451 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
452 _overlapped.ERROR_OPERATION_ABORTED):
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200453 raise ConnectionResetError(*exc.args)
454 else:
455 raise
456
457 return self._register(ov, conn, finish_recv)
458
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700459 def send(self, conn, buf, flags=0):
460 self._register_with_iocp(conn)
461 ov = _overlapped.Overlapped(NULL)
462 if isinstance(conn, socket.socket):
463 ov.WSASend(conn.fileno(), buf, flags)
464 else:
465 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700466
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100467 def finish_send(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700468 try:
469 return ov.getresult()
470 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200471 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
472 _overlapped.ERROR_OPERATION_ABORTED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473 raise ConnectionResetError(*exc.args)
474 else:
475 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700476
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100477 return self._register(ov, conn, finish_send)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700478
479 def accept(self, listener):
480 self._register_with_iocp(listener)
481 conn = self._get_accept_socket(listener.family)
482 ov = _overlapped.Overlapped(NULL)
483 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700484
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 def finish_accept(trans, key, ov):
486 ov.getresult()
487 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
488 buf = struct.pack('@P', listener.fileno())
489 conn.setsockopt(socket.SOL_SOCKET,
490 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
491 conn.settimeout(listener.gettimeout())
492 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700493
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200494 async def accept_coro(future, conn):
Victor Stinner7de26462014-01-11 00:03:21 +0100495 # Coroutine closing the accept socket if the future is cancelled
496 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200497 await future
Victor Stinner7de26462014-01-11 00:03:21 +0100498 except futures.CancelledError:
499 conn.close()
500 raise
501
502 future = self._register(ov, listener, finish_accept)
503 coro = accept_coro(future, conn)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400504 tasks.ensure_future(coro, loop=self._loop)
Victor Stinner7de26462014-01-11 00:03:21 +0100505 return future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506
507 def connect(self, conn, address):
508 self._register_with_iocp(conn)
509 # The socket needs to be locally bound before we call ConnectEx().
510 try:
511 _overlapped.BindLocal(conn.fileno(), conn.family)
512 except OSError as e:
513 if e.winerror != errno.WSAEINVAL:
514 raise
515 # Probably already locally bound; check using getsockname().
516 if conn.getsockname()[1] == 0:
517 raise
518 ov = _overlapped.Overlapped(NULL)
519 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700520
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700521 def finish_connect(trans, key, ov):
522 ov.getresult()
523 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
524 conn.setsockopt(socket.SOL_SOCKET,
525 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
526 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700527
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700528 return self._register(ov, conn, finish_connect)
529
530 def accept_pipe(self, pipe):
531 self._register_with_iocp(pipe)
532 ov = _overlapped.Overlapped(NULL)
Victor Stinner2b77c542015-01-22 23:50:03 +0100533 connected = ov.ConnectNamedPipe(pipe.fileno())
534
535 if connected:
536 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
537 # that the pipe is connected. There is no need to wait for the
538 # completion of the connection.
Victor Stinner41063d22015-01-26 22:30:49 +0100539 return self._result(pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700540
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100541 def finish_accept_pipe(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542 ov.getresult()
543 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700544
Victor Stinner2b77c542015-01-22 23:50:03 +0100545 return self._register(ov, pipe, finish_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700546
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200547 async def connect_pipe(self, address):
Victor Stinnere0fd1572015-01-26 15:04:03 +0100548 delay = CONNECT_PIPE_INIT_DELAY
549 while True:
Yury Selivanov6370f342017-12-10 18:36:12 -0500550 # Unfortunately there is no way to do an overlapped connect to
551 # a pipe. Call CreateFile() in a loop until it doesn't fail with
552 # ERROR_PIPE_BUSY.
Victor Stinnere0fd1572015-01-26 15:04:03 +0100553 try:
554 handle = _overlapped.ConnectPipe(address)
555 break
556 except OSError as exc:
557 if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
558 raise
559
560 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
561 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200562 await tasks.sleep(delay, loop=self._loop)
Victor Stinnere0fd1572015-01-26 15:04:03 +0100563
564 return windows_utils.PipeHandle(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700565
Guido van Rossum90fb9142013-10-30 14:44:05 -0700566 def wait_for_handle(self, handle, timeout=None):
Victor Stinner4d825b42014-12-19 17:10:44 +0100567 """Wait for a handle.
568
569 Return a Future object. The result of the future is True if the wait
570 completed, or False if the wait did not complete (on timeout).
571 """
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100572 return self._wait_for_handle(handle, timeout, False)
573
574 def _wait_cancel(self, event, done_callback):
575 fut = self._wait_for_handle(event, None, True)
576 # add_done_callback() cannot be used because the wait may only complete
577 # in IocpProactor.close(), while the event loop is not running.
578 fut._done_callback = done_callback
579 return fut
580
581 def _wait_for_handle(self, handle, timeout, _is_cancel):
Guido van Rossum90fb9142013-10-30 14:44:05 -0700582 if timeout is None:
583 ms = _winapi.INFINITE
584 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100585 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
586 # round away from zero to wait *at least* timeout seconds.
587 ms = math.ceil(timeout * 1e3)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700588
589 # We only create ov so we can use ov.address as a key for the cache.
590 ov = _overlapped.Overlapped(NULL)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100591 wait_handle = _overlapped.RegisterWaitWithQueue(
Guido van Rossum90fb9142013-10-30 14:44:05 -0700592 handle, self._iocp, ov.address, ms)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100593 if _is_cancel:
594 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
595 else:
596 f = _WaitHandleFuture(ov, handle, wait_handle, self,
597 loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200598 if f._source_traceback:
599 del f._source_traceback[-1]
Guido van Rossum90fb9142013-10-30 14:44:05 -0700600
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100601 def finish_wait_for_handle(trans, key, ov):
Richard Oudkerk71196e72013-11-24 17:50:40 +0000602 # Note that this second wait means that we should only use
603 # this with handles types where a successful wait has no
604 # effect. So events or processes are all right, but locks
605 # or semaphores are not. Also note if the handle is
606 # signalled and then quickly reset, then we may return
607 # False even though we have not timed out.
Victor Stinner313a9802014-07-29 12:58:23 +0200608 return f._poll()
Guido van Rossum90fb9142013-10-30 14:44:05 -0700609
Victor Stinner313a9802014-07-29 12:58:23 +0200610 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700611 return f
612
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700613 def _register_with_iocp(self, obj):
614 # To get notifications of finished ops on this objects sent to the
615 # completion port, were must register the handle.
616 if obj not in self._registered:
617 self._registered.add(obj)
618 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
619 # XXX We could also use SetFileCompletionNotificationModes()
620 # to avoid sending notifications to completion port of ops
621 # that succeed immediately.
622
Victor Stinner2b77c542015-01-22 23:50:03 +0100623 def _register(self, ov, obj, callback):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700624 # Return a future which will be set with the result of the
625 # operation when it completes. The future's value is actually
626 # the value returned by callback().
627 f = _OverlappedFuture(ov, loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200628 if f._source_traceback:
629 del f._source_traceback[-1]
Victor Stinner2b77c542015-01-22 23:50:03 +0100630 if not ov.pending:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700631 # The operation has completed, so no need to postpone the
632 # work. We cannot take this short cut if we need the
633 # NumberOfBytes, CompletionKey values returned by
634 # PostQueuedCompletionStatus().
635 try:
636 value = callback(None, None, ov)
637 except OSError as e:
638 f.set_exception(e)
639 else:
640 f.set_result(value)
Victor Stinner42d3bde2014-07-28 00:18:43 +0200641 # Even if GetOverlappedResult() was called, we have to wait for the
642 # notification of the completion in GetQueuedCompletionStatus().
643 # Register the overlapped operation to keep a reference to the
644 # OVERLAPPED object, otherwise the memory is freed and Windows may
645 # read uninitialized memory.
Victor Stinner2b77c542015-01-22 23:50:03 +0100646
647 # Register the overlapped operation for later. Note that
648 # we only store obj to prevent it from being garbage
649 # collected too early.
650 self._cache[ov.address] = (f, ov, obj, callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700651 return f
652
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100653 def _unregister(self, ov):
654 """Unregister an overlapped object.
655
656 Call this method when its future has been cancelled. The event can
657 already be signalled (pending in the proactor event queue). It is also
658 safe if the event is never signalled (because it was cancelled).
659 """
660 self._unregistered.append(ov)
661
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700662 def _get_accept_socket(self, family):
663 s = socket.socket(family)
664 s.settimeout(0)
665 return s
666
667 def _poll(self, timeout=None):
668 if timeout is None:
669 ms = INFINITE
670 elif timeout < 0:
671 raise ValueError("negative timeout")
672 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100673 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
674 # round away from zero to wait *at least* timeout seconds.
675 ms = math.ceil(timeout * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700676 if ms >= INFINITE:
677 raise ValueError("timeout too big")
Victor Stinner313a9802014-07-29 12:58:23 +0200678
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700679 while True:
680 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
681 if status is None:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100682 break
Victor Stinner313a9802014-07-29 12:58:23 +0200683 ms = 0
684
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700685 err, transferred, key, address = status
686 try:
687 f, ov, obj, callback = self._cache.pop(address)
688 except KeyError:
Victor Stinner42d3bde2014-07-28 00:18:43 +0200689 if self._loop.get_debug():
690 self._loop.call_exception_handler({
691 'message': ('GetQueuedCompletionStatus() returned an '
692 'unexpected event'),
693 'status': ('err=%s transferred=%s key=%#x address=%#x'
694 % (err, transferred, key, address)),
695 })
696
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700697 # key is either zero, or it is used to return a pipe
698 # handle which should be closed to avoid a leak.
699 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
700 _winapi.CloseHandle(key)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700701 continue
Victor Stinner51e44ea2014-07-26 00:58:34 +0200702
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700703 if obj in self._stopped_serving:
704 f.cancel()
Victor Stinner42d3bde2014-07-28 00:18:43 +0200705 # Don't call the callback if _register() already read the result or
706 # if the overlapped has been cancelled
707 elif not f.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700708 try:
709 value = callback(transferred, key, ov)
710 except OSError as e:
711 f.set_exception(e)
712 self._results.append(f)
713 else:
714 f.set_result(value)
715 self._results.append(f)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700716
Andrew Svetlov7a6706b2017-12-13 17:50:16 +0200717 # Remove unregistered futures
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100718 for ov in self._unregistered:
719 self._cache.pop(ov.address, None)
720 self._unregistered.clear()
721
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700722 def _stop_serving(self, obj):
723 # obj is a socket or pipe handle. It will be closed in
724 # BaseProactorEventLoop._stop_serving() which will make any
725 # pending operations fail quickly.
726 self._stopped_serving.add(obj)
727
728 def close(self):
729 # Cancel remaining registered operations.
Victor Stinnerfea6a102014-07-25 00:54:53 +0200730 for address, (fut, ov, obj, callback) in list(self._cache.items()):
Victor Stinner3d2256f2015-01-26 11:02:59 +0100731 if fut.cancelled():
Victor Stinner752aba72015-01-22 22:47:13 +0100732 # Nothing to do with cancelled futures
733 pass
Victor Stinner1ca93922015-01-22 00:17:54 +0100734 elif isinstance(fut, _WaitCancelFuture):
735 # _WaitCancelFuture must not be cancelled
736 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700737 else:
738 try:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200739 fut.cancel()
740 except OSError as exc:
741 if self._loop is not None:
742 context = {
743 'message': 'Cancelling a future failed',
744 'exception': exc,
745 'future': fut,
746 }
747 if fut._source_traceback:
748 context['source_traceback'] = fut._source_traceback
749 self._loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700750
751 while self._cache:
752 if not self._poll(1):
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700753 logger.debug('taking long time to close proactor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700754
755 self._results = []
756 if self._iocp is not None:
757 _winapi.CloseHandle(self._iocp)
758 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700759
Victor Stinnerfea6a102014-07-25 00:54:53 +0200760 def __del__(self):
761 self.close()
762
Guido van Rossum59691282013-10-30 14:52:03 -0700763
764class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
765
766 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
767 self._proc = windows_utils.Popen(
768 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
769 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700770
Guido van Rossum59691282013-10-30 14:52:03 -0700771 def callback(f):
772 returncode = self._proc.poll()
773 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700774
Guido van Rossum59691282013-10-30 14:52:03 -0700775 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
776 f.add_done_callback(callback)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800777
778
779SelectorEventLoop = _WindowsSelectorEventLoop
780
781
782class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
783 _loop_factory = SelectorEventLoop
784
785
786DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy