blob: da81ab435b9a6ffcda41ebe5fdacc4e9b5836cc2 [file] [log] [blame]
Yury Selivanovb0b0e622014-02-18 22:27:48 -05001"""Selector and proactor event loops for Windows."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Miss Islington (bot)cf48a142022-01-04 01:22:26 -08003import sys
4
5if sys.platform != 'win32': # pragma: no cover
6 raise ImportError('win32 only')
7
Victor Stinner4271dfd2017-11-28 15:19:56 +01008import _overlapped
Victor Stinnerf2e17682014-01-31 16:25:24 +01009import _winapi
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010import errno
Victor Stinnerf2e17682014-01-31 16:25:24 +010011import math
Andrew Svetlova19fb3c2018-02-25 19:32:14 +030012import msvcrt
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013import socket
Victor Stinnerf2e17682014-01-31 16:25:24 +010014import struct
Victor Stinnerb1e45732019-01-15 11:48:00 +010015import time
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080018from . import events
Guido van Rossum59691282013-10-30 14:52:03 -070019from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import futures
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070021from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import proactor_events
23from . import selector_events
24from . import tasks
25from . import windows_utils
Victor Stinnerf951d282014-06-29 00:46:45 +020026from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
28
Yury Selivanov6370f342017-12-10 18:36:12 -050029__all__ = (
30 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
Yury Selivanov8f404292018-06-07 20:44:57 -040031 'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
32 'WindowsProactorEventLoopPolicy',
Yury Selivanov6370f342017-12-10 18:36:12 -050033)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034
35
36NULL = 0
37INFINITE = 0xffffffff
38ERROR_CONNECTION_REFUSED = 1225
39ERROR_CONNECTION_ABORTED = 1236
40
Victor Stinner7ffa2c52015-01-22 22:55:08 +010041# Initial delay in seconds for connect_pipe() before retrying to connect
42CONNECT_PIPE_INIT_DELAY = 0.001
43
44# Maximum delay in seconds for connect_pipe() before retrying to connect
45CONNECT_PIPE_MAX_DELAY = 0.100
46
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48class _OverlappedFuture(futures.Future):
49 """Subclass of Future which represents an overlapped operation.
50
51 Cancelling it will immediately cancel the overlapped operation.
52 """
53
54 def __init__(self, ov, *, loop=None):
55 super().__init__(loop=loop)
Victor Stinnerfea6a102014-07-25 00:54:53 +020056 if self._source_traceback:
57 del self._source_traceback[-1]
Victor Stinner18a28dc2014-07-25 13:05:20 +020058 self._ov = ov
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059
Victor Stinner313a9802014-07-29 12:58:23 +020060 def _repr_info(self):
61 info = super()._repr_info()
Victor Stinner18a28dc2014-07-25 13:05:20 +020062 if self._ov is not None:
63 state = 'pending' if self._ov.pending else 'completed'
Yury Selivanov6370f342017-12-10 18:36:12 -050064 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
Victor Stinner313a9802014-07-29 12:58:23 +020065 return info
Victor Stinnere912e652014-07-12 03:11:53 +020066
Victor Stinner18a28dc2014-07-25 13:05:20 +020067 def _cancel_overlapped(self):
68 if self._ov is None:
69 return
70 try:
71 self._ov.cancel()
72 except OSError as exc:
73 context = {
74 'message': 'Cancelling an overlapped future failed',
75 'exception': exc,
76 'future': self,
77 }
78 if self._source_traceback:
79 context['source_traceback'] = self._source_traceback
80 self._loop.call_exception_handler(context)
81 self._ov = None
82
Chris Jerdonek1ce58412020-05-15 16:55:50 -070083 def cancel(self, msg=None):
Victor Stinner18a28dc2014-07-25 13:05:20 +020084 self._cancel_overlapped()
Chris Jerdonek1ce58412020-05-15 16:55:50 -070085 return super().cancel(msg=msg)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086
Victor Stinner18a28dc2014-07-25 13:05:20 +020087 def set_exception(self, exception):
88 super().set_exception(exception)
89 self._cancel_overlapped()
90
Victor Stinner51e44ea2014-07-26 00:58:34 +020091 def set_result(self, result):
92 super().set_result(result)
93 self._ov = None
94
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070095
Victor Stinnerd0a28de2015-01-21 23:39:51 +010096class _BaseWaitHandleFuture(futures.Future):
Guido van Rossum90fb9142013-10-30 14:44:05 -070097 """Subclass of Future which represents a wait handle."""
98
Victor Stinnerd0a28de2015-01-21 23:39:51 +010099 def __init__(self, ov, handle, wait_handle, *, loop=None):
Guido van Rossum90fb9142013-10-30 14:44:05 -0700100 super().__init__(loop=loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200101 if self._source_traceback:
102 del self._source_traceback[-1]
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100103 # Keep a reference to the Overlapped object to keep it alive until the
104 # wait is unregistered
Victor Stinner313a9802014-07-29 12:58:23 +0200105 self._ov = ov
Victor Stinner18a28dc2014-07-25 13:05:20 +0200106 self._handle = handle
Guido van Rossum90fb9142013-10-30 14:44:05 -0700107 self._wait_handle = wait_handle
108
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100109 # Should we call UnregisterWaitEx() if the wait completes
110 # or is cancelled?
111 self._registered = True
112
Victor Stinner18a28dc2014-07-25 13:05:20 +0200113 def _poll(self):
114 # non-blocking wait: use a timeout of 0 millisecond
115 return (_winapi.WaitForSingleObject(self._handle, 0) ==
116 _winapi.WAIT_OBJECT_0)
117
Victor Stinner313a9802014-07-29 12:58:23 +0200118 def _repr_info(self):
119 info = super()._repr_info()
Yury Selivanov6370f342017-12-10 18:36:12 -0500120 info.append(f'handle={self._handle:#x}')
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100121 if self._handle is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200122 state = 'signaled' if self._poll() else 'waiting'
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100123 info.append(state)
124 if self._wait_handle is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500125 info.append(f'wait_handle={self._wait_handle:#x}')
Victor Stinner313a9802014-07-29 12:58:23 +0200126 return info
Victor Stinner18a28dc2014-07-25 13:05:20 +0200127
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100128 def _unregister_wait_cb(self, fut):
129 # The wait was unregistered: it's not safe to destroy the Overlapped
130 # object
131 self._ov = None
132
Victor Stinner313a9802014-07-29 12:58:23 +0200133 def _unregister_wait(self):
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100134 if not self._registered:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200135 return
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100136 self._registered = False
137
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100138 wait_handle = self._wait_handle
139 self._wait_handle = None
Guido van Rossum90fb9142013-10-30 14:44:05 -0700140 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100141 _overlapped.UnregisterWait(wait_handle)
Victor Stinnerb2614752014-08-25 23:20:52 +0200142 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100143 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerb2614752014-08-25 23:20:52 +0200144 context = {
145 'message': 'Failed to unregister the wait handle',
146 'exception': exc,
147 'future': self,
148 }
149 if self._source_traceback:
150 context['source_traceback'] = self._source_traceback
151 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100152 return
153 # ERROR_IO_PENDING means that the unregister is pending
154
155 self._unregister_wait_cb(None)
Victor Stinnerfea6a102014-07-25 00:54:53 +0200156
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700157 def cancel(self, msg=None):
Victor Stinner313a9802014-07-29 12:58:23 +0200158 self._unregister_wait()
Chris Jerdonek1ce58412020-05-15 16:55:50 -0700159 return super().cancel(msg=msg)
Victor Stinner313a9802014-07-29 12:58:23 +0200160
161 def set_exception(self, exception):
Victor Stinner313a9802014-07-29 12:58:23 +0200162 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100163 super().set_exception(exception)
Victor Stinner313a9802014-07-29 12:58:23 +0200164
165 def set_result(self, result):
Victor Stinner313a9802014-07-29 12:58:23 +0200166 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100167 super().set_result(result)
168
169
170class _WaitCancelFuture(_BaseWaitHandleFuture):
171 """Subclass of Future which represents a wait for the cancellation of a
172 _WaitHandleFuture using an event.
173 """
174
175 def __init__(self, ov, event, wait_handle, *, loop=None):
176 super().__init__(ov, event, wait_handle, loop=loop)
177
178 self._done_callback = None
179
Victor Stinner1ca93922015-01-22 00:17:54 +0100180 def cancel(self):
181 raise RuntimeError("_WaitCancelFuture must not be cancelled")
182
INADA Naokia8363622016-10-21 12:30:15 +0900183 def set_result(self, result):
184 super().set_result(result)
185 if self._done_callback is not None:
186 self._done_callback(self)
187
188 def set_exception(self, exception):
189 super().set_exception(exception)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100190 if self._done_callback is not None:
191 self._done_callback(self)
192
193
194class _WaitHandleFuture(_BaseWaitHandleFuture):
195 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
196 super().__init__(ov, handle, wait_handle, loop=loop)
197 self._proactor = proactor
198 self._unregister_proactor = True
199 self._event = _overlapped.CreateEvent(None, True, False, None)
200 self._event_fut = None
201
202 def _unregister_wait_cb(self, fut):
203 if self._event is not None:
204 _winapi.CloseHandle(self._event)
205 self._event = None
206 self._event_fut = None
207
208 # If the wait was cancelled, the wait may never be signalled, so
209 # it's required to unregister it. Otherwise, IocpProactor.close() will
210 # wait forever for an event which will never come.
211 #
212 # If the IocpProactor already received the event, it's safe to call
213 # _unregister() because we kept a reference to the Overlapped object
Martin Panter6245cb32016-04-15 02:14:19 +0000214 # which is used as a unique key.
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100215 self._proactor._unregister(self._ov)
216 self._proactor = None
217
218 super()._unregister_wait_cb(fut)
219
220 def _unregister_wait(self):
221 if not self._registered:
222 return
223 self._registered = False
224
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100225 wait_handle = self._wait_handle
226 self._wait_handle = None
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100227 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100228 _overlapped.UnregisterWaitEx(wait_handle, self._event)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100229 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100230 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100231 context = {
232 'message': 'Failed to unregister the wait handle',
233 'exception': exc,
234 'future': self,
235 }
236 if self._source_traceback:
237 context['source_traceback'] = self._source_traceback
238 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100239 return
240 # ERROR_IO_PENDING is not an error, the wait was unregistered
241
242 self._event_fut = self._proactor._wait_cancel(self._event,
243 self._unregister_wait_cb)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700244
245
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700246class PipeServer(object):
247 """Class representing a pipe server.
248
249 This is much like a bound, listening socket.
250 """
251 def __init__(self, address):
252 self._address = address
253 self._free_instances = weakref.WeakSet()
Victor Stinnerb2614752014-08-25 23:20:52 +0200254 # initialize the pipe attribute before calling _server_pipe_handle()
255 # because this function can raise an exception and the destructor calls
256 # the close() method
257 self._pipe = None
258 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700259 self._pipe = self._server_pipe_handle(True)
260
261 def _get_unconnected_pipe(self):
262 # Create new instance and return previous one. This ensures
263 # that (until the server is closed) there is always at least
264 # one pipe handle for address. Therefore if a client attempt
265 # to connect it will not fail with FileNotFoundError.
266 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
267 return tmp
268
269 def _server_pipe_handle(self, first):
270 # Return a wrapper for a new pipe handle.
Victor Stinnera19b7b32015-01-26 15:03:20 +0100271 if self.closed():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272 return None
273 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
274 if first:
275 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
276 h = _winapi.CreateNamedPipe(
277 self._address, flags,
278 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
279 _winapi.PIPE_WAIT,
280 _winapi.PIPE_UNLIMITED_INSTANCES,
281 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
282 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
283 pipe = windows_utils.PipeHandle(h)
284 self._free_instances.add(pipe)
285 return pipe
286
Victor Stinnera19b7b32015-01-26 15:03:20 +0100287 def closed(self):
288 return (self._address is None)
289
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700290 def close(self):
Victor Stinnerb2614752014-08-25 23:20:52 +0200291 if self._accept_pipe_future is not None:
292 self._accept_pipe_future.cancel()
293 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 # Close all instances which have not been connected to by a client.
295 if self._address is not None:
296 for pipe in self._free_instances:
297 pipe.close()
298 self._pipe = None
299 self._address = None
300 self._free_instances.clear()
301
302 __del__ = close
303
304
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800305class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306 """Windows version of selector event loop."""
307
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308
309class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
310 """Windows version of proactor event loop using IOCP."""
311
312 def __init__(self, proactor=None):
313 if proactor is None:
314 proactor = IocpProactor()
315 super().__init__(proactor)
316
Vladimir Matveevb5c8cfa2018-12-18 13:56:17 -0800317 def run_forever(self):
318 try:
319 assert self._self_reading_future is None
320 self.call_soon(self._loop_self_reading)
321 super().run_forever()
322 finally:
323 if self._self_reading_future is not None:
Vladimir Matveev67ba5472019-01-05 12:44:59 -0800324 ov = self._self_reading_future._ov
Vladimir Matveevb5c8cfa2018-12-18 13:56:17 -0800325 self._self_reading_future.cancel()
Ben Darnellea5a6362020-08-31 15:57:52 -0400326 # self_reading_future was just cancelled so if it hasn't been
327 # finished yet, it never will be (it's possible that it has
328 # already finished and its callback is waiting in the queue,
329 # where it could still happen if the event loop is restarted).
330 # Unregister it otherwise IocpProactor.close will wait for it
331 # forever
Vladimir Matveev67ba5472019-01-05 12:44:59 -0800332 if ov is not None:
333 self._proactor._unregister(ov)
Vladimir Matveevb5c8cfa2018-12-18 13:56:17 -0800334 self._self_reading_future = None
335
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200336 async def create_pipe_connection(self, protocol_factory, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337 f = self._proactor.connect_pipe(address)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200338 pipe = await f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 protocol = protocol_factory()
340 trans = self._make_duplex_pipe_transport(pipe, protocol,
341 extra={'addr': address})
342 return trans, protocol
343
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200344 async def start_serving_pipe(self, protocol_factory, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700346
Victor Stinnerb2614752014-08-25 23:20:52 +0200347 def loop_accept_pipe(f=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 pipe = None
349 try:
350 if f:
351 pipe = f.result()
352 server._free_instances.discard(pipe)
Victor Stinnera19b7b32015-01-26 15:03:20 +0100353
354 if server.closed():
355 # A client connected before the server was closed:
356 # drop the client (close the pipe) and exit
357 pipe.close()
358 return
359
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 protocol = protocol_factory()
361 self._make_duplex_pipe_transport(
362 pipe, protocol, extra={'addr': address})
Victor Stinnera19b7b32015-01-26 15:03:20 +0100363
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364 pipe = server._get_unconnected_pipe()
365 if pipe is None:
366 return
Victor Stinnera19b7b32015-01-26 15:03:20 +0100367
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 f = self._proactor.accept_pipe(pipe)
Yury Selivanovff827f02014-02-18 18:02:19 -0500369 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370 if pipe and pipe.fileno() != -1:
Yury Selivanovff827f02014-02-18 18:02:19 -0500371 self.call_exception_handler({
372 'message': 'Pipe accept failed',
373 'exception': exc,
374 'pipe': pipe,
375 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376 pipe.close()
Victor Stinnerb2614752014-08-25 23:20:52 +0200377 elif self._debug:
378 logger.warning("Accept pipe failed on pipe %r",
379 pipe, exc_info=True)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700380 except exceptions.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381 if pipe:
382 pipe.close()
383 else:
Victor Stinnerb2614752014-08-25 23:20:52 +0200384 server._accept_pipe_future = f
385 f.add_done_callback(loop_accept_pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700386
Victor Stinnerb2614752014-08-25 23:20:52 +0200387 self.call_soon(loop_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 return [server]
389
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200390 async def _make_subprocess_transport(self, protocol, args, shell,
391 stdin, stdout, stderr, bufsize,
392 extra=None, **kwargs):
Yury Selivanov7661db62016-05-16 15:38:39 -0400393 waiter = self.create_future()
Guido van Rossum59691282013-10-30 14:52:03 -0700394 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
395 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100396 waiter=waiter, extra=extra,
397 **kwargs)
Victor Stinner4bf22e02015-01-15 14:24:22 +0100398 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200399 await waiter
Yury Selivanov431b5402019-05-27 14:45:12 +0200400 except (SystemExit, KeyboardInterrupt):
401 raise
402 except BaseException:
Victor Stinner4bf22e02015-01-15 14:24:22 +0100403 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200404 await transp._wait()
405 raise
Victor Stinner4bf22e02015-01-15 14:24:22 +0100406
Guido van Rossum59691282013-10-30 14:52:03 -0700407 return transp
408
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409
410class IocpProactor:
411 """Proactor implementation using IOCP."""
412
413 def __init__(self, concurrency=0xffffffff):
414 self._loop = None
415 self._results = []
416 self._iocp = _overlapped.CreateIoCompletionPort(
417 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
418 self._cache = {}
419 self._registered = weakref.WeakSet()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100420 self._unregistered = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421 self._stopped_serving = weakref.WeakSet()
422
Victor Stinner9b076812019-01-10 11:23:26 +0100423 def _check_closed(self):
424 if self._iocp is None:
425 raise RuntimeError('IocpProactor is closed')
426
Victor Stinnerfea6a102014-07-25 00:54:53 +0200427 def __repr__(self):
Victor Stinner9b076812019-01-10 11:23:26 +0100428 info = ['overlapped#=%s' % len(self._cache),
429 'result#=%s' % len(self._results)]
430 if self._iocp is None:
431 info.append('closed')
432 return '<%s %s>' % (self.__class__.__name__, " ".join(info))
Victor Stinnerfea6a102014-07-25 00:54:53 +0200433
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434 def set_loop(self, loop):
435 self._loop = loop
436
437 def select(self, timeout=None):
438 if not self._results:
439 self._poll(timeout)
440 tmp = self._results
441 self._results = []
442 return tmp
443
Victor Stinner41063d22015-01-26 22:30:49 +0100444 def _result(self, value):
Yury Selivanov7661db62016-05-16 15:38:39 -0400445 fut = self._loop.create_future()
Victor Stinner41063d22015-01-26 22:30:49 +0100446 fut.set_result(value)
447 return fut
448
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700449 def recv(self, conn, nbytes, flags=0):
450 self._register_with_iocp(conn)
451 ov = _overlapped.Overlapped(NULL)
Victor Stinner41063d22015-01-26 22:30:49 +0100452 try:
453 if isinstance(conn, socket.socket):
454 ov.WSARecv(conn.fileno(), nbytes, flags)
455 else:
456 ov.ReadFile(conn.fileno(), nbytes)
457 except BrokenPipeError:
458 return self._result(b'')
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700459
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100460 def finish_recv(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461 try:
462 return ov.getresult()
463 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200464 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
465 _overlapped.ERROR_OPERATION_ABORTED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466 raise ConnectionResetError(*exc.args)
467 else:
468 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700469
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100470 return self._register(ov, conn, finish_recv)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200472 def recv_into(self, conn, buf, flags=0):
473 self._register_with_iocp(conn)
474 ov = _overlapped.Overlapped(NULL)
475 try:
476 if isinstance(conn, socket.socket):
477 ov.WSARecvInto(conn.fileno(), buf, flags)
478 else:
479 ov.ReadFileInto(conn.fileno(), buf)
480 except BrokenPipeError:
Victor Stinner602a9712020-08-04 02:40:10 +0200481 return self._result(0)
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200482
483 def finish_recv(trans, key, ov):
484 try:
485 return ov.getresult()
486 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200487 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
488 _overlapped.ERROR_OPERATION_ABORTED):
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200489 raise ConnectionResetError(*exc.args)
490 else:
491 raise
492
493 return self._register(ov, conn, finish_recv)
494
Andrew Svetlovbafd4b52019-05-28 12:52:15 +0300495 def recvfrom(self, conn, nbytes, flags=0):
496 self._register_with_iocp(conn)
497 ov = _overlapped.Overlapped(NULL)
498 try:
499 ov.WSARecvFrom(conn.fileno(), nbytes, flags)
500 except BrokenPipeError:
501 return self._result((b'', None))
502
503 def finish_recv(trans, key, ov):
504 try:
505 return ov.getresult()
506 except OSError as exc:
507 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
508 _overlapped.ERROR_OPERATION_ABORTED):
509 raise ConnectionResetError(*exc.args)
510 else:
511 raise
512
513 return self._register(ov, conn, finish_recv)
514
515 def sendto(self, conn, buf, flags=0, addr=None):
516 self._register_with_iocp(conn)
517 ov = _overlapped.Overlapped(NULL)
518
519 ov.WSASendTo(conn.fileno(), buf, flags, addr)
520
521 def finish_send(trans, key, ov):
522 try:
523 return ov.getresult()
524 except OSError as exc:
525 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
526 _overlapped.ERROR_OPERATION_ABORTED):
527 raise ConnectionResetError(*exc.args)
528 else:
529 raise
530
531 return self._register(ov, conn, finish_send)
532
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533 def send(self, conn, buf, flags=0):
534 self._register_with_iocp(conn)
535 ov = _overlapped.Overlapped(NULL)
536 if isinstance(conn, socket.socket):
537 ov.WSASend(conn.fileno(), buf, flags)
538 else:
539 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700540
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100541 def finish_send(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542 try:
543 return ov.getresult()
544 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200545 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
546 _overlapped.ERROR_OPERATION_ABORTED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700547 raise ConnectionResetError(*exc.args)
548 else:
549 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700550
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100551 return self._register(ov, conn, finish_send)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552
553 def accept(self, listener):
554 self._register_with_iocp(listener)
555 conn = self._get_accept_socket(listener.family)
556 ov = _overlapped.Overlapped(NULL)
557 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700558
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700559 def finish_accept(trans, key, ov):
560 ov.getresult()
561 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
562 buf = struct.pack('@P', listener.fileno())
563 conn.setsockopt(socket.SOL_SOCKET,
564 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
565 conn.settimeout(listener.gettimeout())
566 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700567
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200568 async def accept_coro(future, conn):
Victor Stinner7de26462014-01-11 00:03:21 +0100569 # Coroutine closing the accept socket if the future is cancelled
570 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200571 await future
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700572 except exceptions.CancelledError:
Victor Stinner7de26462014-01-11 00:03:21 +0100573 conn.close()
574 raise
575
576 future = self._register(ov, listener, finish_accept)
577 coro = accept_coro(future, conn)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400578 tasks.ensure_future(coro, loop=self._loop)
Victor Stinner7de26462014-01-11 00:03:21 +0100579 return future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700580
581 def connect(self, conn, address):
Andrew Svetlovbafd4b52019-05-28 12:52:15 +0300582 if conn.type == socket.SOCK_DGRAM:
583 # WSAConnect will complete immediately for UDP sockets so we don't
584 # need to register any IOCP operation
585 _overlapped.WSAConnect(conn.fileno(), address)
586 fut = self._loop.create_future()
587 fut.set_result(None)
588 return fut
589
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700590 self._register_with_iocp(conn)
591 # The socket needs to be locally bound before we call ConnectEx().
592 try:
593 _overlapped.BindLocal(conn.fileno(), conn.family)
594 except OSError as e:
595 if e.winerror != errno.WSAEINVAL:
596 raise
597 # Probably already locally bound; check using getsockname().
598 if conn.getsockname()[1] == 0:
599 raise
600 ov = _overlapped.Overlapped(NULL)
601 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700602
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700603 def finish_connect(trans, key, ov):
604 ov.getresult()
605 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
606 conn.setsockopt(socket.SOL_SOCKET,
607 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
608 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700609
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700610 return self._register(ov, conn, finish_connect)
611
Andrew Svetlova19fb3c2018-02-25 19:32:14 +0300612 def sendfile(self, sock, file, offset, count):
613 self._register_with_iocp(sock)
614 ov = _overlapped.Overlapped(NULL)
615 offset_low = offset & 0xffff_ffff
616 offset_high = (offset >> 32) & 0xffff_ffff
617 ov.TransmitFile(sock.fileno(),
618 msvcrt.get_osfhandle(file.fileno()),
619 offset_low, offset_high,
620 count, 0, 0)
621
622 def finish_sendfile(trans, key, ov):
623 try:
624 return ov.getresult()
625 except OSError as exc:
626 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
627 _overlapped.ERROR_OPERATION_ABORTED):
628 raise ConnectionResetError(*exc.args)
629 else:
630 raise
631 return self._register(ov, sock, finish_sendfile)
632
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700633 def accept_pipe(self, pipe):
634 self._register_with_iocp(pipe)
635 ov = _overlapped.Overlapped(NULL)
Victor Stinner2b77c542015-01-22 23:50:03 +0100636 connected = ov.ConnectNamedPipe(pipe.fileno())
637
638 if connected:
639 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
640 # that the pipe is connected. There is no need to wait for the
641 # completion of the connection.
Victor Stinner41063d22015-01-26 22:30:49 +0100642 return self._result(pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700643
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100644 def finish_accept_pipe(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700645 ov.getresult()
646 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700647
Victor Stinner2b77c542015-01-22 23:50:03 +0100648 return self._register(ov, pipe, finish_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700649
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200650 async def connect_pipe(self, address):
Victor Stinnere0fd1572015-01-26 15:04:03 +0100651 delay = CONNECT_PIPE_INIT_DELAY
652 while True:
Yury Selivanov6370f342017-12-10 18:36:12 -0500653 # Unfortunately there is no way to do an overlapped connect to
654 # a pipe. Call CreateFile() in a loop until it doesn't fail with
655 # ERROR_PIPE_BUSY.
Victor Stinnere0fd1572015-01-26 15:04:03 +0100656 try:
657 handle = _overlapped.ConnectPipe(address)
658 break
659 except OSError as exc:
660 if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
661 raise
662
663 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
664 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
Andrew Svetlov23b4b692019-05-27 22:56:22 +0300665 await tasks.sleep(delay)
Victor Stinnere0fd1572015-01-26 15:04:03 +0100666
667 return windows_utils.PipeHandle(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700668
Guido van Rossum90fb9142013-10-30 14:44:05 -0700669 def wait_for_handle(self, handle, timeout=None):
Victor Stinner4d825b42014-12-19 17:10:44 +0100670 """Wait for a handle.
671
672 Return a Future object. The result of the future is True if the wait
673 completed, or False if the wait did not complete (on timeout).
674 """
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100675 return self._wait_for_handle(handle, timeout, False)
676
677 def _wait_cancel(self, event, done_callback):
678 fut = self._wait_for_handle(event, None, True)
679 # add_done_callback() cannot be used because the wait may only complete
680 # in IocpProactor.close(), while the event loop is not running.
681 fut._done_callback = done_callback
682 return fut
683
684 def _wait_for_handle(self, handle, timeout, _is_cancel):
Victor Stinner9b076812019-01-10 11:23:26 +0100685 self._check_closed()
686
Guido van Rossum90fb9142013-10-30 14:44:05 -0700687 if timeout is None:
688 ms = _winapi.INFINITE
689 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100690 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
691 # round away from zero to wait *at least* timeout seconds.
692 ms = math.ceil(timeout * 1e3)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700693
694 # We only create ov so we can use ov.address as a key for the cache.
695 ov = _overlapped.Overlapped(NULL)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100696 wait_handle = _overlapped.RegisterWaitWithQueue(
Guido van Rossum90fb9142013-10-30 14:44:05 -0700697 handle, self._iocp, ov.address, ms)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100698 if _is_cancel:
699 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
700 else:
701 f = _WaitHandleFuture(ov, handle, wait_handle, self,
702 loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200703 if f._source_traceback:
704 del f._source_traceback[-1]
Guido van Rossum90fb9142013-10-30 14:44:05 -0700705
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100706 def finish_wait_for_handle(trans, key, ov):
Richard Oudkerk71196e72013-11-24 17:50:40 +0000707 # Note that this second wait means that we should only use
708 # this with handles types where a successful wait has no
709 # effect. So events or processes are all right, but locks
710 # or semaphores are not. Also note if the handle is
711 # signalled and then quickly reset, then we may return
712 # False even though we have not timed out.
Victor Stinner313a9802014-07-29 12:58:23 +0200713 return f._poll()
Guido van Rossum90fb9142013-10-30 14:44:05 -0700714
Victor Stinner313a9802014-07-29 12:58:23 +0200715 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700716 return f
717
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700718 def _register_with_iocp(self, obj):
719 # To get notifications of finished ops on this objects sent to the
720 # completion port, were must register the handle.
721 if obj not in self._registered:
722 self._registered.add(obj)
723 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
724 # XXX We could also use SetFileCompletionNotificationModes()
725 # to avoid sending notifications to completion port of ops
726 # that succeed immediately.
727
Victor Stinner2b77c542015-01-22 23:50:03 +0100728 def _register(self, ov, obj, callback):
Victor Stinner9b076812019-01-10 11:23:26 +0100729 self._check_closed()
730
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700731 # Return a future which will be set with the result of the
732 # operation when it completes. The future's value is actually
733 # the value returned by callback().
734 f = _OverlappedFuture(ov, loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200735 if f._source_traceback:
736 del f._source_traceback[-1]
Victor Stinner2b77c542015-01-22 23:50:03 +0100737 if not ov.pending:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700738 # The operation has completed, so no need to postpone the
739 # work. We cannot take this short cut if we need the
740 # NumberOfBytes, CompletionKey values returned by
741 # PostQueuedCompletionStatus().
742 try:
743 value = callback(None, None, ov)
744 except OSError as e:
745 f.set_exception(e)
746 else:
747 f.set_result(value)
Victor Stinner42d3bde2014-07-28 00:18:43 +0200748 # Even if GetOverlappedResult() was called, we have to wait for the
749 # notification of the completion in GetQueuedCompletionStatus().
750 # Register the overlapped operation to keep a reference to the
751 # OVERLAPPED object, otherwise the memory is freed and Windows may
752 # read uninitialized memory.
Victor Stinner2b77c542015-01-22 23:50:03 +0100753
754 # Register the overlapped operation for later. Note that
755 # we only store obj to prevent it from being garbage
756 # collected too early.
757 self._cache[ov.address] = (f, ov, obj, callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700758 return f
759
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100760 def _unregister(self, ov):
761 """Unregister an overlapped object.
762
763 Call this method when its future has been cancelled. The event can
764 already be signalled (pending in the proactor event queue). It is also
765 safe if the event is never signalled (because it was cancelled).
766 """
Victor Stinner9b076812019-01-10 11:23:26 +0100767 self._check_closed()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100768 self._unregistered.append(ov)
769
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700770 def _get_accept_socket(self, family):
771 s = socket.socket(family)
772 s.settimeout(0)
773 return s
774
775 def _poll(self, timeout=None):
776 if timeout is None:
777 ms = INFINITE
778 elif timeout < 0:
779 raise ValueError("negative timeout")
780 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100781 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
782 # round away from zero to wait *at least* timeout seconds.
783 ms = math.ceil(timeout * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700784 if ms >= INFINITE:
785 raise ValueError("timeout too big")
Victor Stinner313a9802014-07-29 12:58:23 +0200786
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700787 while True:
788 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
789 if status is None:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100790 break
Victor Stinner313a9802014-07-29 12:58:23 +0200791 ms = 0
792
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700793 err, transferred, key, address = status
794 try:
795 f, ov, obj, callback = self._cache.pop(address)
796 except KeyError:
Victor Stinner42d3bde2014-07-28 00:18:43 +0200797 if self._loop.get_debug():
798 self._loop.call_exception_handler({
799 'message': ('GetQueuedCompletionStatus() returned an '
800 'unexpected event'),
801 'status': ('err=%s transferred=%s key=%#x address=%#x'
802 % (err, transferred, key, address)),
803 })
804
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700805 # key is either zero, or it is used to return a pipe
806 # handle which should be closed to avoid a leak.
807 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
808 _winapi.CloseHandle(key)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700809 continue
Victor Stinner51e44ea2014-07-26 00:58:34 +0200810
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700811 if obj in self._stopped_serving:
812 f.cancel()
Victor Stinner42d3bde2014-07-28 00:18:43 +0200813 # Don't call the callback if _register() already read the result or
814 # if the overlapped has been cancelled
815 elif not f.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700816 try:
817 value = callback(transferred, key, ov)
818 except OSError as e:
819 f.set_exception(e)
820 self._results.append(f)
821 else:
822 f.set_result(value)
823 self._results.append(f)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700824
Andrew Svetlov7a6706b2017-12-13 17:50:16 +0200825 # Remove unregistered futures
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100826 for ov in self._unregistered:
827 self._cache.pop(ov.address, None)
828 self._unregistered.clear()
829
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700830 def _stop_serving(self, obj):
831 # obj is a socket or pipe handle. It will be closed in
832 # BaseProactorEventLoop._stop_serving() which will make any
833 # pending operations fail quickly.
834 self._stopped_serving.add(obj)
835
836 def close(self):
Victor Stinner9b076812019-01-10 11:23:26 +0100837 if self._iocp is None:
838 # already closed
839 return
840
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700841 # Cancel remaining registered operations.
Victor Stinnerfea6a102014-07-25 00:54:53 +0200842 for address, (fut, ov, obj, callback) in list(self._cache.items()):
Victor Stinner3d2256f2015-01-26 11:02:59 +0100843 if fut.cancelled():
Victor Stinner752aba72015-01-22 22:47:13 +0100844 # Nothing to do with cancelled futures
845 pass
Victor Stinner1ca93922015-01-22 00:17:54 +0100846 elif isinstance(fut, _WaitCancelFuture):
847 # _WaitCancelFuture must not be cancelled
848 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700849 else:
850 try:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200851 fut.cancel()
852 except OSError as exc:
853 if self._loop is not None:
854 context = {
855 'message': 'Cancelling a future failed',
856 'exception': exc,
857 'future': fut,
858 }
859 if fut._source_traceback:
860 context['source_traceback'] = fut._source_traceback
861 self._loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700862
Victor Stinnerb1e45732019-01-15 11:48:00 +0100863 # Wait until all cancelled overlapped complete: don't exit with running
864 # overlapped to prevent a crash. Display progress every second if the
865 # loop is still running.
866 msg_update = 1.0
867 start_time = time.monotonic()
868 next_msg = start_time + msg_update
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700869 while self._cache:
Victor Stinnerb1e45732019-01-15 11:48:00 +0100870 if next_msg <= time.monotonic():
Victor Stinnerb91140f2019-01-15 12:13:48 +0100871 logger.debug('%r is running after closing for %.1f seconds',
872 self, time.monotonic() - start_time)
Victor Stinnerb1e45732019-01-15 11:48:00 +0100873 next_msg = time.monotonic() + msg_update
874
875 # handle a few events, or timeout
876 self._poll(msg_update)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700877
878 self._results = []
Victor Stinner9b076812019-01-10 11:23:26 +0100879
880 _winapi.CloseHandle(self._iocp)
881 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700882
Victor Stinnerfea6a102014-07-25 00:54:53 +0200883 def __del__(self):
884 self.close()
885
Guido van Rossum59691282013-10-30 14:52:03 -0700886
887class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
888
889 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
890 self._proc = windows_utils.Popen(
891 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
892 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700893
Guido van Rossum59691282013-10-30 14:52:03 -0700894 def callback(f):
895 returncode = self._proc.poll()
896 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700897
Guido van Rossum59691282013-10-30 14:52:03 -0700898 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
899 f.add_done_callback(callback)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800900
901
902SelectorEventLoop = _WindowsSelectorEventLoop
903
904
Yury Selivanov8f404292018-06-07 20:44:57 -0400905class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800906 _loop_factory = SelectorEventLoop
907
908
Yury Selivanov8f404292018-06-07 20:44:57 -0400909class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
910 _loop_factory = ProactorEventLoop
911
912
Victor Stinner6ea29c52018-09-25 08:27:08 -0700913DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy