blob: 33ffaf97177069a03179cf90051dcdc384732f6d [file] [log] [blame]
Yury Selivanovb0b0e622014-02-18 22:27:48 -05001"""Selector and proactor event loops for Windows."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Victor Stinner4271dfd2017-11-28 15:19:56 +01003import _overlapped
Victor Stinnerf2e17682014-01-31 16:25:24 +01004import _winapi
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import errno
Victor Stinnerf2e17682014-01-31 16:25:24 +01006import math
Andrew Svetlova19fb3c2018-02-25 19:32:14 +03007import msvcrt
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008import socket
Victor Stinnerf2e17682014-01-31 16:25:24 +01009import struct
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080012from . import events
Guido van Rossum59691282013-10-30 14:52:03 -070013from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014from . import futures
Andrew Svetlov0baa72f2018-09-11 10:13:04 -070015from . import exceptions
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import proactor_events
17from . import selector_events
18from . import tasks
19from . import windows_utils
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
22
Yury Selivanov6370f342017-12-10 18:36:12 -050023__all__ = (
24 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
Yury Selivanov8f404292018-06-07 20:44:57 -040025 'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
26 'WindowsProactorEventLoopPolicy',
Yury Selivanov6370f342017-12-10 18:36:12 -050027)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
29
30NULL = 0
31INFINITE = 0xffffffff
32ERROR_CONNECTION_REFUSED = 1225
33ERROR_CONNECTION_ABORTED = 1236
34
Victor Stinner7ffa2c52015-01-22 22:55:08 +010035# Initial delay in seconds for connect_pipe() before retrying to connect
36CONNECT_PIPE_INIT_DELAY = 0.001
37
38# Maximum delay in seconds for connect_pipe() before retrying to connect
39CONNECT_PIPE_MAX_DELAY = 0.100
40
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
42class _OverlappedFuture(futures.Future):
43 """Subclass of Future which represents an overlapped operation.
44
45 Cancelling it will immediately cancel the overlapped operation.
46 """
47
48 def __init__(self, ov, *, loop=None):
49 super().__init__(loop=loop)
Victor Stinnerfea6a102014-07-25 00:54:53 +020050 if self._source_traceback:
51 del self._source_traceback[-1]
Victor Stinner18a28dc2014-07-25 13:05:20 +020052 self._ov = ov
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053
Victor Stinner313a9802014-07-29 12:58:23 +020054 def _repr_info(self):
55 info = super()._repr_info()
Victor Stinner18a28dc2014-07-25 13:05:20 +020056 if self._ov is not None:
57 state = 'pending' if self._ov.pending else 'completed'
Yury Selivanov6370f342017-12-10 18:36:12 -050058 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
Victor Stinner313a9802014-07-29 12:58:23 +020059 return info
Victor Stinnere912e652014-07-12 03:11:53 +020060
Victor Stinner18a28dc2014-07-25 13:05:20 +020061 def _cancel_overlapped(self):
62 if self._ov is None:
63 return
64 try:
65 self._ov.cancel()
66 except OSError as exc:
67 context = {
68 'message': 'Cancelling an overlapped future failed',
69 'exception': exc,
70 'future': self,
71 }
72 if self._source_traceback:
73 context['source_traceback'] = self._source_traceback
74 self._loop.call_exception_handler(context)
75 self._ov = None
76
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 def cancel(self):
Victor Stinner18a28dc2014-07-25 13:05:20 +020078 self._cancel_overlapped()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 return super().cancel()
80
Victor Stinner18a28dc2014-07-25 13:05:20 +020081 def set_exception(self, exception):
82 super().set_exception(exception)
83 self._cancel_overlapped()
84
Victor Stinner51e44ea2014-07-26 00:58:34 +020085 def set_result(self, result):
86 super().set_result(result)
87 self._ov = None
88
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070089
Victor Stinnerd0a28de2015-01-21 23:39:51 +010090class _BaseWaitHandleFuture(futures.Future):
Guido van Rossum90fb9142013-10-30 14:44:05 -070091 """Subclass of Future which represents a wait handle."""
92
Victor Stinnerd0a28de2015-01-21 23:39:51 +010093 def __init__(self, ov, handle, wait_handle, *, loop=None):
Guido van Rossum90fb9142013-10-30 14:44:05 -070094 super().__init__(loop=loop)
Victor Stinner313a9802014-07-29 12:58:23 +020095 if self._source_traceback:
96 del self._source_traceback[-1]
Victor Stinnerd0a28de2015-01-21 23:39:51 +010097 # Keep a reference to the Overlapped object to keep it alive until the
98 # wait is unregistered
Victor Stinner313a9802014-07-29 12:58:23 +020099 self._ov = ov
Victor Stinner18a28dc2014-07-25 13:05:20 +0200100 self._handle = handle
Guido van Rossum90fb9142013-10-30 14:44:05 -0700101 self._wait_handle = wait_handle
102
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100103 # Should we call UnregisterWaitEx() if the wait completes
104 # or is cancelled?
105 self._registered = True
106
Victor Stinner18a28dc2014-07-25 13:05:20 +0200107 def _poll(self):
108 # non-blocking wait: use a timeout of 0 millisecond
109 return (_winapi.WaitForSingleObject(self._handle, 0) ==
110 _winapi.WAIT_OBJECT_0)
111
Victor Stinner313a9802014-07-29 12:58:23 +0200112 def _repr_info(self):
113 info = super()._repr_info()
Yury Selivanov6370f342017-12-10 18:36:12 -0500114 info.append(f'handle={self._handle:#x}')
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100115 if self._handle is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200116 state = 'signaled' if self._poll() else 'waiting'
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100117 info.append(state)
118 if self._wait_handle is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500119 info.append(f'wait_handle={self._wait_handle:#x}')
Victor Stinner313a9802014-07-29 12:58:23 +0200120 return info
Victor Stinner18a28dc2014-07-25 13:05:20 +0200121
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100122 def _unregister_wait_cb(self, fut):
123 # The wait was unregistered: it's not safe to destroy the Overlapped
124 # object
125 self._ov = None
126
Victor Stinner313a9802014-07-29 12:58:23 +0200127 def _unregister_wait(self):
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100128 if not self._registered:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200129 return
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100130 self._registered = False
131
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100132 wait_handle = self._wait_handle
133 self._wait_handle = None
Guido van Rossum90fb9142013-10-30 14:44:05 -0700134 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100135 _overlapped.UnregisterWait(wait_handle)
Victor Stinnerb2614752014-08-25 23:20:52 +0200136 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100137 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerb2614752014-08-25 23:20:52 +0200138 context = {
139 'message': 'Failed to unregister the wait handle',
140 'exception': exc,
141 'future': self,
142 }
143 if self._source_traceback:
144 context['source_traceback'] = self._source_traceback
145 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100146 return
147 # ERROR_IO_PENDING means that the unregister is pending
148
149 self._unregister_wait_cb(None)
Victor Stinnerfea6a102014-07-25 00:54:53 +0200150
151 def cancel(self):
Victor Stinner313a9802014-07-29 12:58:23 +0200152 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100153 return super().cancel()
Victor Stinner313a9802014-07-29 12:58:23 +0200154
155 def set_exception(self, exception):
Victor Stinner313a9802014-07-29 12:58:23 +0200156 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100157 super().set_exception(exception)
Victor Stinner313a9802014-07-29 12:58:23 +0200158
159 def set_result(self, result):
Victor Stinner313a9802014-07-29 12:58:23 +0200160 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100161 super().set_result(result)
162
163
164class _WaitCancelFuture(_BaseWaitHandleFuture):
165 """Subclass of Future which represents a wait for the cancellation of a
166 _WaitHandleFuture using an event.
167 """
168
169 def __init__(self, ov, event, wait_handle, *, loop=None):
170 super().__init__(ov, event, wait_handle, loop=loop)
171
172 self._done_callback = None
173
Victor Stinner1ca93922015-01-22 00:17:54 +0100174 def cancel(self):
175 raise RuntimeError("_WaitCancelFuture must not be cancelled")
176
INADA Naokia8363622016-10-21 12:30:15 +0900177 def set_result(self, result):
178 super().set_result(result)
179 if self._done_callback is not None:
180 self._done_callback(self)
181
182 def set_exception(self, exception):
183 super().set_exception(exception)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100184 if self._done_callback is not None:
185 self._done_callback(self)
186
187
188class _WaitHandleFuture(_BaseWaitHandleFuture):
189 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
190 super().__init__(ov, handle, wait_handle, loop=loop)
191 self._proactor = proactor
192 self._unregister_proactor = True
193 self._event = _overlapped.CreateEvent(None, True, False, None)
194 self._event_fut = None
195
196 def _unregister_wait_cb(self, fut):
197 if self._event is not None:
198 _winapi.CloseHandle(self._event)
199 self._event = None
200 self._event_fut = None
201
202 # If the wait was cancelled, the wait may never be signalled, so
203 # it's required to unregister it. Otherwise, IocpProactor.close() will
204 # wait forever for an event which will never come.
205 #
206 # If the IocpProactor already received the event, it's safe to call
207 # _unregister() because we kept a reference to the Overlapped object
Martin Panter6245cb32016-04-15 02:14:19 +0000208 # which is used as a unique key.
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100209 self._proactor._unregister(self._ov)
210 self._proactor = None
211
212 super()._unregister_wait_cb(fut)
213
214 def _unregister_wait(self):
215 if not self._registered:
216 return
217 self._registered = False
218
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100219 wait_handle = self._wait_handle
220 self._wait_handle = None
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100221 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100222 _overlapped.UnregisterWaitEx(wait_handle, self._event)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100223 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100224 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100225 context = {
226 'message': 'Failed to unregister the wait handle',
227 'exception': exc,
228 'future': self,
229 }
230 if self._source_traceback:
231 context['source_traceback'] = self._source_traceback
232 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100233 return
234 # ERROR_IO_PENDING is not an error, the wait was unregistered
235
236 self._event_fut = self._proactor._wait_cancel(self._event,
237 self._unregister_wait_cb)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700238
239
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240class PipeServer(object):
241 """Class representing a pipe server.
242
243 This is much like a bound, listening socket.
244 """
245 def __init__(self, address):
246 self._address = address
247 self._free_instances = weakref.WeakSet()
Victor Stinnerb2614752014-08-25 23:20:52 +0200248 # initialize the pipe attribute before calling _server_pipe_handle()
249 # because this function can raise an exception and the destructor calls
250 # the close() method
251 self._pipe = None
252 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700253 self._pipe = self._server_pipe_handle(True)
254
255 def _get_unconnected_pipe(self):
256 # Create new instance and return previous one. This ensures
257 # that (until the server is closed) there is always at least
258 # one pipe handle for address. Therefore if a client attempt
259 # to connect it will not fail with FileNotFoundError.
260 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
261 return tmp
262
263 def _server_pipe_handle(self, first):
264 # Return a wrapper for a new pipe handle.
Victor Stinnera19b7b32015-01-26 15:03:20 +0100265 if self.closed():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700266 return None
267 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
268 if first:
269 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
270 h = _winapi.CreateNamedPipe(
271 self._address, flags,
272 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
273 _winapi.PIPE_WAIT,
274 _winapi.PIPE_UNLIMITED_INSTANCES,
275 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
276 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
277 pipe = windows_utils.PipeHandle(h)
278 self._free_instances.add(pipe)
279 return pipe
280
Victor Stinnera19b7b32015-01-26 15:03:20 +0100281 def closed(self):
282 return (self._address is None)
283
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284 def close(self):
Victor Stinnerb2614752014-08-25 23:20:52 +0200285 if self._accept_pipe_future is not None:
286 self._accept_pipe_future.cancel()
287 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288 # Close all instances which have not been connected to by a client.
289 if self._address is not None:
290 for pipe in self._free_instances:
291 pipe.close()
292 self._pipe = None
293 self._address = None
294 self._free_instances.clear()
295
296 __del__ = close
297
298
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800299class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 """Windows version of selector event loop."""
301
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302
303class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
304 """Windows version of proactor event loop using IOCP."""
305
306 def __init__(self, proactor=None):
307 if proactor is None:
308 proactor = IocpProactor()
309 super().__init__(proactor)
310
Vladimir Matveevb5c8cfa2018-12-18 13:56:17 -0800311 def run_forever(self):
312 try:
313 assert self._self_reading_future is None
314 self.call_soon(self._loop_self_reading)
315 super().run_forever()
316 finally:
317 if self._self_reading_future is not None:
318 self._self_reading_future.cancel()
319 self._self_reading_future = None
320
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200321 async def create_pipe_connection(self, protocol_factory, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 f = self._proactor.connect_pipe(address)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200323 pipe = await f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324 protocol = protocol_factory()
325 trans = self._make_duplex_pipe_transport(pipe, protocol,
326 extra={'addr': address})
327 return trans, protocol
328
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200329 async def start_serving_pipe(self, protocol_factory, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700331
Victor Stinnerb2614752014-08-25 23:20:52 +0200332 def loop_accept_pipe(f=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333 pipe = None
334 try:
335 if f:
336 pipe = f.result()
337 server._free_instances.discard(pipe)
Victor Stinnera19b7b32015-01-26 15:03:20 +0100338
339 if server.closed():
340 # A client connected before the server was closed:
341 # drop the client (close the pipe) and exit
342 pipe.close()
343 return
344
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 protocol = protocol_factory()
346 self._make_duplex_pipe_transport(
347 pipe, protocol, extra={'addr': address})
Victor Stinnera19b7b32015-01-26 15:03:20 +0100348
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349 pipe = server._get_unconnected_pipe()
350 if pipe is None:
351 return
Victor Stinnera19b7b32015-01-26 15:03:20 +0100352
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353 f = self._proactor.accept_pipe(pipe)
Yury Selivanovff827f02014-02-18 18:02:19 -0500354 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355 if pipe and pipe.fileno() != -1:
Yury Selivanovff827f02014-02-18 18:02:19 -0500356 self.call_exception_handler({
357 'message': 'Pipe accept failed',
358 'exception': exc,
359 'pipe': pipe,
360 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 pipe.close()
Victor Stinnerb2614752014-08-25 23:20:52 +0200362 elif self._debug:
363 logger.warning("Accept pipe failed on pipe %r",
364 pipe, exc_info=True)
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700365 except exceptions.CancelledError:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 if pipe:
367 pipe.close()
368 else:
Victor Stinnerb2614752014-08-25 23:20:52 +0200369 server._accept_pipe_future = f
370 f.add_done_callback(loop_accept_pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700371
Victor Stinnerb2614752014-08-25 23:20:52 +0200372 self.call_soon(loop_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373 return [server]
374
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200375 async def _make_subprocess_transport(self, protocol, args, shell,
376 stdin, stdout, stderr, bufsize,
377 extra=None, **kwargs):
Yury Selivanov7661db62016-05-16 15:38:39 -0400378 waiter = self.create_future()
Guido van Rossum59691282013-10-30 14:52:03 -0700379 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
380 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100381 waiter=waiter, extra=extra,
382 **kwargs)
Victor Stinner4bf22e02015-01-15 14:24:22 +0100383 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200384 await waiter
385 except Exception:
Victor Stinner4bf22e02015-01-15 14:24:22 +0100386 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200387 await transp._wait()
388 raise
Victor Stinner4bf22e02015-01-15 14:24:22 +0100389
Guido van Rossum59691282013-10-30 14:52:03 -0700390 return transp
391
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392
393class IocpProactor:
394 """Proactor implementation using IOCP."""
395
396 def __init__(self, concurrency=0xffffffff):
397 self._loop = None
398 self._results = []
399 self._iocp = _overlapped.CreateIoCompletionPort(
400 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
401 self._cache = {}
402 self._registered = weakref.WeakSet()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100403 self._unregistered = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404 self._stopped_serving = weakref.WeakSet()
405
Victor Stinnerfea6a102014-07-25 00:54:53 +0200406 def __repr__(self):
407 return ('<%s overlapped#=%s result#=%s>'
408 % (self.__class__.__name__, len(self._cache),
409 len(self._results)))
410
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411 def set_loop(self, loop):
412 self._loop = loop
413
414 def select(self, timeout=None):
415 if not self._results:
416 self._poll(timeout)
417 tmp = self._results
418 self._results = []
419 return tmp
420
Victor Stinner41063d22015-01-26 22:30:49 +0100421 def _result(self, value):
Yury Selivanov7661db62016-05-16 15:38:39 -0400422 fut = self._loop.create_future()
Victor Stinner41063d22015-01-26 22:30:49 +0100423 fut.set_result(value)
424 return fut
425
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 def recv(self, conn, nbytes, flags=0):
427 self._register_with_iocp(conn)
428 ov = _overlapped.Overlapped(NULL)
Victor Stinner41063d22015-01-26 22:30:49 +0100429 try:
430 if isinstance(conn, socket.socket):
431 ov.WSARecv(conn.fileno(), nbytes, flags)
432 else:
433 ov.ReadFile(conn.fileno(), nbytes)
434 except BrokenPipeError:
435 return self._result(b'')
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700436
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100437 def finish_recv(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438 try:
439 return ov.getresult()
440 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200441 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
442 _overlapped.ERROR_OPERATION_ABORTED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700443 raise ConnectionResetError(*exc.args)
444 else:
445 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700446
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100447 return self._register(ov, conn, finish_recv)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200449 def recv_into(self, conn, buf, flags=0):
450 self._register_with_iocp(conn)
451 ov = _overlapped.Overlapped(NULL)
452 try:
453 if isinstance(conn, socket.socket):
454 ov.WSARecvInto(conn.fileno(), buf, flags)
455 else:
456 ov.ReadFileInto(conn.fileno(), buf)
457 except BrokenPipeError:
458 return self._result(b'')
459
460 def finish_recv(trans, key, ov):
461 try:
462 return ov.getresult()
463 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200464 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
465 _overlapped.ERROR_OPERATION_ABORTED):
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200466 raise ConnectionResetError(*exc.args)
467 else:
468 raise
469
470 return self._register(ov, conn, finish_recv)
471
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700472 def send(self, conn, buf, flags=0):
473 self._register_with_iocp(conn)
474 ov = _overlapped.Overlapped(NULL)
475 if isinstance(conn, socket.socket):
476 ov.WSASend(conn.fileno(), buf, flags)
477 else:
478 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700479
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100480 def finish_send(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700481 try:
482 return ov.getresult()
483 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200484 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
485 _overlapped.ERROR_OPERATION_ABORTED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700486 raise ConnectionResetError(*exc.args)
487 else:
488 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700489
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100490 return self._register(ov, conn, finish_send)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700491
492 def accept(self, listener):
493 self._register_with_iocp(listener)
494 conn = self._get_accept_socket(listener.family)
495 ov = _overlapped.Overlapped(NULL)
496 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700497
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 def finish_accept(trans, key, ov):
499 ov.getresult()
500 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
501 buf = struct.pack('@P', listener.fileno())
502 conn.setsockopt(socket.SOL_SOCKET,
503 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
504 conn.settimeout(listener.gettimeout())
505 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700506
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200507 async def accept_coro(future, conn):
Victor Stinner7de26462014-01-11 00:03:21 +0100508 # Coroutine closing the accept socket if the future is cancelled
509 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200510 await future
Andrew Svetlov0baa72f2018-09-11 10:13:04 -0700511 except exceptions.CancelledError:
Victor Stinner7de26462014-01-11 00:03:21 +0100512 conn.close()
513 raise
514
515 future = self._register(ov, listener, finish_accept)
516 coro = accept_coro(future, conn)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400517 tasks.ensure_future(coro, loop=self._loop)
Victor Stinner7de26462014-01-11 00:03:21 +0100518 return future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700519
520 def connect(self, conn, address):
521 self._register_with_iocp(conn)
522 # The socket needs to be locally bound before we call ConnectEx().
523 try:
524 _overlapped.BindLocal(conn.fileno(), conn.family)
525 except OSError as e:
526 if e.winerror != errno.WSAEINVAL:
527 raise
528 # Probably already locally bound; check using getsockname().
529 if conn.getsockname()[1] == 0:
530 raise
531 ov = _overlapped.Overlapped(NULL)
532 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700533
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700534 def finish_connect(trans, key, ov):
535 ov.getresult()
536 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
537 conn.setsockopt(socket.SOL_SOCKET,
538 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
539 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700540
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700541 return self._register(ov, conn, finish_connect)
542
Andrew Svetlova19fb3c2018-02-25 19:32:14 +0300543 def sendfile(self, sock, file, offset, count):
544 self._register_with_iocp(sock)
545 ov = _overlapped.Overlapped(NULL)
546 offset_low = offset & 0xffff_ffff
547 offset_high = (offset >> 32) & 0xffff_ffff
548 ov.TransmitFile(sock.fileno(),
549 msvcrt.get_osfhandle(file.fileno()),
550 offset_low, offset_high,
551 count, 0, 0)
552
553 def finish_sendfile(trans, key, ov):
554 try:
555 return ov.getresult()
556 except OSError as exc:
557 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
558 _overlapped.ERROR_OPERATION_ABORTED):
559 raise ConnectionResetError(*exc.args)
560 else:
561 raise
562 return self._register(ov, sock, finish_sendfile)
563
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700564 def accept_pipe(self, pipe):
565 self._register_with_iocp(pipe)
566 ov = _overlapped.Overlapped(NULL)
Victor Stinner2b77c542015-01-22 23:50:03 +0100567 connected = ov.ConnectNamedPipe(pipe.fileno())
568
569 if connected:
570 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
571 # that the pipe is connected. There is no need to wait for the
572 # completion of the connection.
Victor Stinner41063d22015-01-26 22:30:49 +0100573 return self._result(pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700574
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100575 def finish_accept_pipe(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576 ov.getresult()
577 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700578
Victor Stinner2b77c542015-01-22 23:50:03 +0100579 return self._register(ov, pipe, finish_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700580
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200581 async def connect_pipe(self, address):
Victor Stinnere0fd1572015-01-26 15:04:03 +0100582 delay = CONNECT_PIPE_INIT_DELAY
583 while True:
Yury Selivanov6370f342017-12-10 18:36:12 -0500584 # Unfortunately there is no way to do an overlapped connect to
585 # a pipe. Call CreateFile() in a loop until it doesn't fail with
586 # ERROR_PIPE_BUSY.
Victor Stinnere0fd1572015-01-26 15:04:03 +0100587 try:
588 handle = _overlapped.ConnectPipe(address)
589 break
590 except OSError as exc:
591 if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
592 raise
593
594 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
595 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200596 await tasks.sleep(delay, loop=self._loop)
Victor Stinnere0fd1572015-01-26 15:04:03 +0100597
598 return windows_utils.PipeHandle(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700599
Guido van Rossum90fb9142013-10-30 14:44:05 -0700600 def wait_for_handle(self, handle, timeout=None):
Victor Stinner4d825b42014-12-19 17:10:44 +0100601 """Wait for a handle.
602
603 Return a Future object. The result of the future is True if the wait
604 completed, or False if the wait did not complete (on timeout).
605 """
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100606 return self._wait_for_handle(handle, timeout, False)
607
608 def _wait_cancel(self, event, done_callback):
609 fut = self._wait_for_handle(event, None, True)
610 # add_done_callback() cannot be used because the wait may only complete
611 # in IocpProactor.close(), while the event loop is not running.
612 fut._done_callback = done_callback
613 return fut
614
615 def _wait_for_handle(self, handle, timeout, _is_cancel):
Guido van Rossum90fb9142013-10-30 14:44:05 -0700616 if timeout is None:
617 ms = _winapi.INFINITE
618 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100619 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
620 # round away from zero to wait *at least* timeout seconds.
621 ms = math.ceil(timeout * 1e3)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700622
623 # We only create ov so we can use ov.address as a key for the cache.
624 ov = _overlapped.Overlapped(NULL)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100625 wait_handle = _overlapped.RegisterWaitWithQueue(
Guido van Rossum90fb9142013-10-30 14:44:05 -0700626 handle, self._iocp, ov.address, ms)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100627 if _is_cancel:
628 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
629 else:
630 f = _WaitHandleFuture(ov, handle, wait_handle, self,
631 loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200632 if f._source_traceback:
633 del f._source_traceback[-1]
Guido van Rossum90fb9142013-10-30 14:44:05 -0700634
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100635 def finish_wait_for_handle(trans, key, ov):
Richard Oudkerk71196e72013-11-24 17:50:40 +0000636 # Note that this second wait means that we should only use
637 # this with handles types where a successful wait has no
638 # effect. So events or processes are all right, but locks
639 # or semaphores are not. Also note if the handle is
640 # signalled and then quickly reset, then we may return
641 # False even though we have not timed out.
Victor Stinner313a9802014-07-29 12:58:23 +0200642 return f._poll()
Guido van Rossum90fb9142013-10-30 14:44:05 -0700643
Victor Stinner313a9802014-07-29 12:58:23 +0200644 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700645 return f
646
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647 def _register_with_iocp(self, obj):
648 # To get notifications of finished ops on this objects sent to the
649 # completion port, were must register the handle.
650 if obj not in self._registered:
651 self._registered.add(obj)
652 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
653 # XXX We could also use SetFileCompletionNotificationModes()
654 # to avoid sending notifications to completion port of ops
655 # that succeed immediately.
656
Victor Stinner2b77c542015-01-22 23:50:03 +0100657 def _register(self, ov, obj, callback):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700658 # Return a future which will be set with the result of the
659 # operation when it completes. The future's value is actually
660 # the value returned by callback().
661 f = _OverlappedFuture(ov, loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200662 if f._source_traceback:
663 del f._source_traceback[-1]
Victor Stinner2b77c542015-01-22 23:50:03 +0100664 if not ov.pending:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700665 # The operation has completed, so no need to postpone the
666 # work. We cannot take this short cut if we need the
667 # NumberOfBytes, CompletionKey values returned by
668 # PostQueuedCompletionStatus().
669 try:
670 value = callback(None, None, ov)
671 except OSError as e:
672 f.set_exception(e)
673 else:
674 f.set_result(value)
Victor Stinner42d3bde2014-07-28 00:18:43 +0200675 # Even if GetOverlappedResult() was called, we have to wait for the
676 # notification of the completion in GetQueuedCompletionStatus().
677 # Register the overlapped operation to keep a reference to the
678 # OVERLAPPED object, otherwise the memory is freed and Windows may
679 # read uninitialized memory.
Victor Stinner2b77c542015-01-22 23:50:03 +0100680
681 # Register the overlapped operation for later. Note that
682 # we only store obj to prevent it from being garbage
683 # collected too early.
684 self._cache[ov.address] = (f, ov, obj, callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700685 return f
686
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100687 def _unregister(self, ov):
688 """Unregister an overlapped object.
689
690 Call this method when its future has been cancelled. The event can
691 already be signalled (pending in the proactor event queue). It is also
692 safe if the event is never signalled (because it was cancelled).
693 """
694 self._unregistered.append(ov)
695
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700696 def _get_accept_socket(self, family):
697 s = socket.socket(family)
698 s.settimeout(0)
699 return s
700
701 def _poll(self, timeout=None):
702 if timeout is None:
703 ms = INFINITE
704 elif timeout < 0:
705 raise ValueError("negative timeout")
706 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100707 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
708 # round away from zero to wait *at least* timeout seconds.
709 ms = math.ceil(timeout * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700710 if ms >= INFINITE:
711 raise ValueError("timeout too big")
Victor Stinner313a9802014-07-29 12:58:23 +0200712
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700713 while True:
714 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
715 if status is None:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100716 break
Victor Stinner313a9802014-07-29 12:58:23 +0200717 ms = 0
718
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700719 err, transferred, key, address = status
720 try:
721 f, ov, obj, callback = self._cache.pop(address)
722 except KeyError:
Victor Stinner42d3bde2014-07-28 00:18:43 +0200723 if self._loop.get_debug():
724 self._loop.call_exception_handler({
725 'message': ('GetQueuedCompletionStatus() returned an '
726 'unexpected event'),
727 'status': ('err=%s transferred=%s key=%#x address=%#x'
728 % (err, transferred, key, address)),
729 })
730
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700731 # key is either zero, or it is used to return a pipe
732 # handle which should be closed to avoid a leak.
733 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
734 _winapi.CloseHandle(key)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700735 continue
Victor Stinner51e44ea2014-07-26 00:58:34 +0200736
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700737 if obj in self._stopped_serving:
738 f.cancel()
Victor Stinner42d3bde2014-07-28 00:18:43 +0200739 # Don't call the callback if _register() already read the result or
740 # if the overlapped has been cancelled
741 elif not f.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700742 try:
743 value = callback(transferred, key, ov)
744 except OSError as e:
745 f.set_exception(e)
746 self._results.append(f)
747 else:
748 f.set_result(value)
749 self._results.append(f)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700750
Andrew Svetlov7a6706b2017-12-13 17:50:16 +0200751 # Remove unregistered futures
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100752 for ov in self._unregistered:
753 self._cache.pop(ov.address, None)
754 self._unregistered.clear()
755
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700756 def _stop_serving(self, obj):
757 # obj is a socket or pipe handle. It will be closed in
758 # BaseProactorEventLoop._stop_serving() which will make any
759 # pending operations fail quickly.
760 self._stopped_serving.add(obj)
761
762 def close(self):
763 # Cancel remaining registered operations.
Victor Stinnerfea6a102014-07-25 00:54:53 +0200764 for address, (fut, ov, obj, callback) in list(self._cache.items()):
Victor Stinner3d2256f2015-01-26 11:02:59 +0100765 if fut.cancelled():
Victor Stinner752aba72015-01-22 22:47:13 +0100766 # Nothing to do with cancelled futures
767 pass
Victor Stinner1ca93922015-01-22 00:17:54 +0100768 elif isinstance(fut, _WaitCancelFuture):
769 # _WaitCancelFuture must not be cancelled
770 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700771 else:
772 try:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200773 fut.cancel()
774 except OSError as exc:
775 if self._loop is not None:
776 context = {
777 'message': 'Cancelling a future failed',
778 'exception': exc,
779 'future': fut,
780 }
781 if fut._source_traceback:
782 context['source_traceback'] = fut._source_traceback
783 self._loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700784
785 while self._cache:
786 if not self._poll(1):
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700787 logger.debug('taking long time to close proactor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700788
789 self._results = []
790 if self._iocp is not None:
791 _winapi.CloseHandle(self._iocp)
792 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700793
Victor Stinnerfea6a102014-07-25 00:54:53 +0200794 def __del__(self):
795 self.close()
796
Guido van Rossum59691282013-10-30 14:52:03 -0700797
798class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
799
800 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
801 self._proc = windows_utils.Popen(
802 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
803 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700804
Guido van Rossum59691282013-10-30 14:52:03 -0700805 def callback(f):
806 returncode = self._proc.poll()
807 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700808
Guido van Rossum59691282013-10-30 14:52:03 -0700809 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
810 f.add_done_callback(callback)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800811
812
813SelectorEventLoop = _WindowsSelectorEventLoop
814
815
Yury Selivanov8f404292018-06-07 20:44:57 -0400816class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800817 _loop_factory = SelectorEventLoop
818
819
Yury Selivanov8f404292018-06-07 20:44:57 -0400820class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
821 _loop_factory = ProactorEventLoop
822
823
Victor Stinner6ea29c52018-09-25 08:27:08 -0700824DefaultEventLoopPolicy = WindowsProactorEventLoopPolicy