blob: e3778688ab5be48d3d4b8a06b5e128e644666529 [file] [log] [blame]
Yury Selivanovb0b0e622014-02-18 22:27:48 -05001"""Selector and proactor event loops for Windows."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Victor Stinner4271dfd2017-11-28 15:19:56 +01003import _overlapped
Victor Stinnerf2e17682014-01-31 16:25:24 +01004import _winapi
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import errno
Victor Stinnerf2e17682014-01-31 16:25:24 +01006import math
Miss Islington (bot)632c1cb2018-02-25 09:10:58 -08007import msvcrt
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008import socket
Victor Stinnerf2e17682014-01-31 16:25:24 +01009import struct
Victor Stinnerd5a6adf2019-01-15 13:05:28 +010010import time
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080013from . import events
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import futures
16from . import proactor_events
17from . import selector_events
18from . import tasks
19from . import windows_utils
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021
22
Yury Selivanov6370f342017-12-10 18:36:12 -050023__all__ = (
24 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
Miss Islington (bot)07384432018-06-07 18:31:50 -070025 'DefaultEventLoopPolicy', 'WindowsSelectorEventLoopPolicy',
26 'WindowsProactorEventLoopPolicy',
Yury Selivanov6370f342017-12-10 18:36:12 -050027)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
29
30NULL = 0
31INFINITE = 0xffffffff
32ERROR_CONNECTION_REFUSED = 1225
33ERROR_CONNECTION_ABORTED = 1236
34
Victor Stinner7ffa2c52015-01-22 22:55:08 +010035# Initial delay in seconds for connect_pipe() before retrying to connect
36CONNECT_PIPE_INIT_DELAY = 0.001
37
38# Maximum delay in seconds for connect_pipe() before retrying to connect
39CONNECT_PIPE_MAX_DELAY = 0.100
40
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
42class _OverlappedFuture(futures.Future):
43 """Subclass of Future which represents an overlapped operation.
44
45 Cancelling it will immediately cancel the overlapped operation.
46 """
47
48 def __init__(self, ov, *, loop=None):
49 super().__init__(loop=loop)
Victor Stinnerfea6a102014-07-25 00:54:53 +020050 if self._source_traceback:
51 del self._source_traceback[-1]
Victor Stinner18a28dc2014-07-25 13:05:20 +020052 self._ov = ov
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053
Victor Stinner313a9802014-07-29 12:58:23 +020054 def _repr_info(self):
55 info = super()._repr_info()
Victor Stinner18a28dc2014-07-25 13:05:20 +020056 if self._ov is not None:
57 state = 'pending' if self._ov.pending else 'completed'
Yury Selivanov6370f342017-12-10 18:36:12 -050058 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
Victor Stinner313a9802014-07-29 12:58:23 +020059 return info
Victor Stinnere912e652014-07-12 03:11:53 +020060
Victor Stinner18a28dc2014-07-25 13:05:20 +020061 def _cancel_overlapped(self):
62 if self._ov is None:
63 return
64 try:
65 self._ov.cancel()
66 except OSError as exc:
67 context = {
68 'message': 'Cancelling an overlapped future failed',
69 'exception': exc,
70 'future': self,
71 }
72 if self._source_traceback:
73 context['source_traceback'] = self._source_traceback
74 self._loop.call_exception_handler(context)
75 self._ov = None
76
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 def cancel(self):
Victor Stinner18a28dc2014-07-25 13:05:20 +020078 self._cancel_overlapped()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079 return super().cancel()
80
Victor Stinner18a28dc2014-07-25 13:05:20 +020081 def set_exception(self, exception):
82 super().set_exception(exception)
83 self._cancel_overlapped()
84
Victor Stinner51e44ea2014-07-26 00:58:34 +020085 def set_result(self, result):
86 super().set_result(result)
87 self._ov = None
88
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070089
Victor Stinnerd0a28de2015-01-21 23:39:51 +010090class _BaseWaitHandleFuture(futures.Future):
Guido van Rossum90fb9142013-10-30 14:44:05 -070091 """Subclass of Future which represents a wait handle."""
92
Victor Stinnerd0a28de2015-01-21 23:39:51 +010093 def __init__(self, ov, handle, wait_handle, *, loop=None):
Guido van Rossum90fb9142013-10-30 14:44:05 -070094 super().__init__(loop=loop)
Victor Stinner313a9802014-07-29 12:58:23 +020095 if self._source_traceback:
96 del self._source_traceback[-1]
Victor Stinnerd0a28de2015-01-21 23:39:51 +010097 # Keep a reference to the Overlapped object to keep it alive until the
98 # wait is unregistered
Victor Stinner313a9802014-07-29 12:58:23 +020099 self._ov = ov
Victor Stinner18a28dc2014-07-25 13:05:20 +0200100 self._handle = handle
Guido van Rossum90fb9142013-10-30 14:44:05 -0700101 self._wait_handle = wait_handle
102
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100103 # Should we call UnregisterWaitEx() if the wait completes
104 # or is cancelled?
105 self._registered = True
106
Victor Stinner18a28dc2014-07-25 13:05:20 +0200107 def _poll(self):
108 # non-blocking wait: use a timeout of 0 millisecond
109 return (_winapi.WaitForSingleObject(self._handle, 0) ==
110 _winapi.WAIT_OBJECT_0)
111
Victor Stinner313a9802014-07-29 12:58:23 +0200112 def _repr_info(self):
113 info = super()._repr_info()
Yury Selivanov6370f342017-12-10 18:36:12 -0500114 info.append(f'handle={self._handle:#x}')
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100115 if self._handle is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200116 state = 'signaled' if self._poll() else 'waiting'
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100117 info.append(state)
118 if self._wait_handle is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500119 info.append(f'wait_handle={self._wait_handle:#x}')
Victor Stinner313a9802014-07-29 12:58:23 +0200120 return info
Victor Stinner18a28dc2014-07-25 13:05:20 +0200121
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100122 def _unregister_wait_cb(self, fut):
123 # The wait was unregistered: it's not safe to destroy the Overlapped
124 # object
125 self._ov = None
126
Victor Stinner313a9802014-07-29 12:58:23 +0200127 def _unregister_wait(self):
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100128 if not self._registered:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200129 return
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100130 self._registered = False
131
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100132 wait_handle = self._wait_handle
133 self._wait_handle = None
Guido van Rossum90fb9142013-10-30 14:44:05 -0700134 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100135 _overlapped.UnregisterWait(wait_handle)
Victor Stinnerb2614752014-08-25 23:20:52 +0200136 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100137 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerb2614752014-08-25 23:20:52 +0200138 context = {
139 'message': 'Failed to unregister the wait handle',
140 'exception': exc,
141 'future': self,
142 }
143 if self._source_traceback:
144 context['source_traceback'] = self._source_traceback
145 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100146 return
147 # ERROR_IO_PENDING means that the unregister is pending
148
149 self._unregister_wait_cb(None)
Victor Stinnerfea6a102014-07-25 00:54:53 +0200150
151 def cancel(self):
Victor Stinner313a9802014-07-29 12:58:23 +0200152 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100153 return super().cancel()
Victor Stinner313a9802014-07-29 12:58:23 +0200154
155 def set_exception(self, exception):
Victor Stinner313a9802014-07-29 12:58:23 +0200156 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100157 super().set_exception(exception)
Victor Stinner313a9802014-07-29 12:58:23 +0200158
159 def set_result(self, result):
Victor Stinner313a9802014-07-29 12:58:23 +0200160 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100161 super().set_result(result)
162
163
164class _WaitCancelFuture(_BaseWaitHandleFuture):
165 """Subclass of Future which represents a wait for the cancellation of a
166 _WaitHandleFuture using an event.
167 """
168
169 def __init__(self, ov, event, wait_handle, *, loop=None):
170 super().__init__(ov, event, wait_handle, loop=loop)
171
172 self._done_callback = None
173
Victor Stinner1ca93922015-01-22 00:17:54 +0100174 def cancel(self):
175 raise RuntimeError("_WaitCancelFuture must not be cancelled")
176
INADA Naokia8363622016-10-21 12:30:15 +0900177 def set_result(self, result):
178 super().set_result(result)
179 if self._done_callback is not None:
180 self._done_callback(self)
181
182 def set_exception(self, exception):
183 super().set_exception(exception)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100184 if self._done_callback is not None:
185 self._done_callback(self)
186
187
188class _WaitHandleFuture(_BaseWaitHandleFuture):
189 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
190 super().__init__(ov, handle, wait_handle, loop=loop)
191 self._proactor = proactor
192 self._unregister_proactor = True
193 self._event = _overlapped.CreateEvent(None, True, False, None)
194 self._event_fut = None
195
196 def _unregister_wait_cb(self, fut):
197 if self._event is not None:
198 _winapi.CloseHandle(self._event)
199 self._event = None
200 self._event_fut = None
201
202 # If the wait was cancelled, the wait may never be signalled, so
203 # it's required to unregister it. Otherwise, IocpProactor.close() will
204 # wait forever for an event which will never come.
205 #
206 # If the IocpProactor already received the event, it's safe to call
207 # _unregister() because we kept a reference to the Overlapped object
Martin Panter6245cb32016-04-15 02:14:19 +0000208 # which is used as a unique key.
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100209 self._proactor._unregister(self._ov)
210 self._proactor = None
211
212 super()._unregister_wait_cb(fut)
213
214 def _unregister_wait(self):
215 if not self._registered:
216 return
217 self._registered = False
218
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100219 wait_handle = self._wait_handle
220 self._wait_handle = None
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100221 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100222 _overlapped.UnregisterWaitEx(wait_handle, self._event)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100223 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100224 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100225 context = {
226 'message': 'Failed to unregister the wait handle',
227 'exception': exc,
228 'future': self,
229 }
230 if self._source_traceback:
231 context['source_traceback'] = self._source_traceback
232 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100233 return
234 # ERROR_IO_PENDING is not an error, the wait was unregistered
235
236 self._event_fut = self._proactor._wait_cancel(self._event,
237 self._unregister_wait_cb)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700238
239
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240class PipeServer(object):
241 """Class representing a pipe server.
242
243 This is much like a bound, listening socket.
244 """
245 def __init__(self, address):
246 self._address = address
247 self._free_instances = weakref.WeakSet()
Victor Stinnerb2614752014-08-25 23:20:52 +0200248 # initialize the pipe attribute before calling _server_pipe_handle()
249 # because this function can raise an exception and the destructor calls
250 # the close() method
251 self._pipe = None
252 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700253 self._pipe = self._server_pipe_handle(True)
254
255 def _get_unconnected_pipe(self):
256 # Create new instance and return previous one. This ensures
257 # that (until the server is closed) there is always at least
258 # one pipe handle for address. Therefore if a client attempt
259 # to connect it will not fail with FileNotFoundError.
260 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
261 return tmp
262
263 def _server_pipe_handle(self, first):
264 # Return a wrapper for a new pipe handle.
Victor Stinnera19b7b32015-01-26 15:03:20 +0100265 if self.closed():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700266 return None
267 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
268 if first:
269 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
270 h = _winapi.CreateNamedPipe(
271 self._address, flags,
272 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
273 _winapi.PIPE_WAIT,
274 _winapi.PIPE_UNLIMITED_INSTANCES,
275 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
276 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
277 pipe = windows_utils.PipeHandle(h)
278 self._free_instances.add(pipe)
279 return pipe
280
Victor Stinnera19b7b32015-01-26 15:03:20 +0100281 def closed(self):
282 return (self._address is None)
283
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284 def close(self):
Victor Stinnerb2614752014-08-25 23:20:52 +0200285 if self._accept_pipe_future is not None:
286 self._accept_pipe_future.cancel()
287 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288 # Close all instances which have not been connected to by a client.
289 if self._address is not None:
290 for pipe in self._free_instances:
291 pipe.close()
292 self._pipe = None
293 self._address = None
294 self._free_instances.clear()
295
296 __del__ = close
297
298
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800299class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 """Windows version of selector event loop."""
301
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302
303class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
304 """Windows version of proactor event loop using IOCP."""
305
306 def __init__(self, proactor=None):
307 if proactor is None:
308 proactor = IocpProactor()
309 super().__init__(proactor)
310
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200311 async def create_pipe_connection(self, protocol_factory, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312 f = self._proactor.connect_pipe(address)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200313 pipe = await f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 protocol = protocol_factory()
315 trans = self._make_duplex_pipe_transport(pipe, protocol,
316 extra={'addr': address})
317 return trans, protocol
318
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200319 async def start_serving_pipe(self, protocol_factory, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700321
Victor Stinnerb2614752014-08-25 23:20:52 +0200322 def loop_accept_pipe(f=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323 pipe = None
324 try:
325 if f:
326 pipe = f.result()
327 server._free_instances.discard(pipe)
Victor Stinnera19b7b32015-01-26 15:03:20 +0100328
329 if server.closed():
330 # A client connected before the server was closed:
331 # drop the client (close the pipe) and exit
332 pipe.close()
333 return
334
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 protocol = protocol_factory()
336 self._make_duplex_pipe_transport(
337 pipe, protocol, extra={'addr': address})
Victor Stinnera19b7b32015-01-26 15:03:20 +0100338
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 pipe = server._get_unconnected_pipe()
340 if pipe is None:
341 return
Victor Stinnera19b7b32015-01-26 15:03:20 +0100342
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 f = self._proactor.accept_pipe(pipe)
Yury Selivanovff827f02014-02-18 18:02:19 -0500344 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 if pipe and pipe.fileno() != -1:
Yury Selivanovff827f02014-02-18 18:02:19 -0500346 self.call_exception_handler({
347 'message': 'Pipe accept failed',
348 'exception': exc,
349 'pipe': pipe,
350 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 pipe.close()
Victor Stinnerb2614752014-08-25 23:20:52 +0200352 elif self._debug:
353 logger.warning("Accept pipe failed on pipe %r",
354 pipe, exc_info=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355 except futures.CancelledError:
356 if pipe:
357 pipe.close()
358 else:
Victor Stinnerb2614752014-08-25 23:20:52 +0200359 server._accept_pipe_future = f
360 f.add_done_callback(loop_accept_pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700361
Victor Stinnerb2614752014-08-25 23:20:52 +0200362 self.call_soon(loop_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700363 return [server]
364
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200365 async def _make_subprocess_transport(self, protocol, args, shell,
366 stdin, stdout, stderr, bufsize,
367 extra=None, **kwargs):
Yury Selivanov7661db62016-05-16 15:38:39 -0400368 waiter = self.create_future()
Guido van Rossum59691282013-10-30 14:52:03 -0700369 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
370 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100371 waiter=waiter, extra=extra,
372 **kwargs)
Victor Stinner4bf22e02015-01-15 14:24:22 +0100373 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200374 await waiter
375 except Exception:
Victor Stinner4bf22e02015-01-15 14:24:22 +0100376 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200377 await transp._wait()
378 raise
Victor Stinner4bf22e02015-01-15 14:24:22 +0100379
Guido van Rossum59691282013-10-30 14:52:03 -0700380 return transp
381
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382
383class IocpProactor:
384 """Proactor implementation using IOCP."""
385
386 def __init__(self, concurrency=0xffffffff):
387 self._loop = None
388 self._results = []
389 self._iocp = _overlapped.CreateIoCompletionPort(
390 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
391 self._cache = {}
392 self._registered = weakref.WeakSet()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100393 self._unregistered = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394 self._stopped_serving = weakref.WeakSet()
395
Victor Stinnerd5a6adf2019-01-15 13:05:28 +0100396 def _check_closed(self):
397 if self._iocp is None:
398 raise RuntimeError('IocpProactor is closed')
399
Victor Stinnerfea6a102014-07-25 00:54:53 +0200400 def __repr__(self):
Victor Stinnerd5a6adf2019-01-15 13:05:28 +0100401 info = ['overlapped#=%s' % len(self._cache),
402 'result#=%s' % len(self._results)]
403 if self._iocp is None:
404 info.append('closed')
405 return '<%s %s>' % (self.__class__.__name__, " ".join(info))
Victor Stinnerfea6a102014-07-25 00:54:53 +0200406
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407 def set_loop(self, loop):
408 self._loop = loop
409
410 def select(self, timeout=None):
411 if not self._results:
412 self._poll(timeout)
413 tmp = self._results
414 self._results = []
415 return tmp
416
Victor Stinner41063d22015-01-26 22:30:49 +0100417 def _result(self, value):
Yury Selivanov7661db62016-05-16 15:38:39 -0400418 fut = self._loop.create_future()
Victor Stinner41063d22015-01-26 22:30:49 +0100419 fut.set_result(value)
420 return fut
421
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422 def recv(self, conn, nbytes, flags=0):
423 self._register_with_iocp(conn)
424 ov = _overlapped.Overlapped(NULL)
Victor Stinner41063d22015-01-26 22:30:49 +0100425 try:
426 if isinstance(conn, socket.socket):
427 ov.WSARecv(conn.fileno(), nbytes, flags)
428 else:
429 ov.ReadFile(conn.fileno(), nbytes)
430 except BrokenPipeError:
431 return self._result(b'')
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700432
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100433 def finish_recv(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434 try:
435 return ov.getresult()
436 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200437 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
438 _overlapped.ERROR_OPERATION_ABORTED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439 raise ConnectionResetError(*exc.args)
440 else:
441 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700442
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100443 return self._register(ov, conn, finish_recv)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200445 def recv_into(self, conn, buf, flags=0):
446 self._register_with_iocp(conn)
447 ov = _overlapped.Overlapped(NULL)
448 try:
449 if isinstance(conn, socket.socket):
450 ov.WSARecvInto(conn.fileno(), buf, flags)
451 else:
452 ov.ReadFileInto(conn.fileno(), buf)
453 except BrokenPipeError:
454 return self._result(b'')
455
456 def finish_recv(trans, key, ov):
457 try:
458 return ov.getresult()
459 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200460 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
461 _overlapped.ERROR_OPERATION_ABORTED):
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200462 raise ConnectionResetError(*exc.args)
463 else:
464 raise
465
466 return self._register(ov, conn, finish_recv)
467
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700468 def send(self, conn, buf, flags=0):
469 self._register_with_iocp(conn)
470 ov = _overlapped.Overlapped(NULL)
471 if isinstance(conn, socket.socket):
472 ov.WSASend(conn.fileno(), buf, flags)
473 else:
474 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700475
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100476 def finish_send(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700477 try:
478 return ov.getresult()
479 except OSError as exc:
Andrew Svetlov7c684072018-01-27 21:22:47 +0200480 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
481 _overlapped.ERROR_OPERATION_ABORTED):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482 raise ConnectionResetError(*exc.args)
483 else:
484 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700485
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100486 return self._register(ov, conn, finish_send)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700487
488 def accept(self, listener):
489 self._register_with_iocp(listener)
490 conn = self._get_accept_socket(listener.family)
491 ov = _overlapped.Overlapped(NULL)
492 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700493
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700494 def finish_accept(trans, key, ov):
495 ov.getresult()
496 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
497 buf = struct.pack('@P', listener.fileno())
498 conn.setsockopt(socket.SOL_SOCKET,
499 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
500 conn.settimeout(listener.gettimeout())
501 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700502
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200503 async def accept_coro(future, conn):
Victor Stinner7de26462014-01-11 00:03:21 +0100504 # Coroutine closing the accept socket if the future is cancelled
505 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200506 await future
Victor Stinner7de26462014-01-11 00:03:21 +0100507 except futures.CancelledError:
508 conn.close()
509 raise
510
511 future = self._register(ov, listener, finish_accept)
512 coro = accept_coro(future, conn)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400513 tasks.ensure_future(coro, loop=self._loop)
Victor Stinner7de26462014-01-11 00:03:21 +0100514 return future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700515
516 def connect(self, conn, address):
517 self._register_with_iocp(conn)
518 # The socket needs to be locally bound before we call ConnectEx().
519 try:
520 _overlapped.BindLocal(conn.fileno(), conn.family)
521 except OSError as e:
522 if e.winerror != errno.WSAEINVAL:
523 raise
524 # Probably already locally bound; check using getsockname().
525 if conn.getsockname()[1] == 0:
526 raise
527 ov = _overlapped.Overlapped(NULL)
528 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700529
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530 def finish_connect(trans, key, ov):
531 ov.getresult()
532 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
533 conn.setsockopt(socket.SOL_SOCKET,
534 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
535 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700536
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537 return self._register(ov, conn, finish_connect)
538
Miss Islington (bot)632c1cb2018-02-25 09:10:58 -0800539 def sendfile(self, sock, file, offset, count):
540 self._register_with_iocp(sock)
541 ov = _overlapped.Overlapped(NULL)
542 offset_low = offset & 0xffff_ffff
543 offset_high = (offset >> 32) & 0xffff_ffff
544 ov.TransmitFile(sock.fileno(),
545 msvcrt.get_osfhandle(file.fileno()),
546 offset_low, offset_high,
547 count, 0, 0)
548
549 def finish_sendfile(trans, key, ov):
550 try:
551 return ov.getresult()
552 except OSError as exc:
553 if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
554 _overlapped.ERROR_OPERATION_ABORTED):
555 raise ConnectionResetError(*exc.args)
556 else:
557 raise
558 return self._register(ov, sock, finish_sendfile)
559
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700560 def accept_pipe(self, pipe):
561 self._register_with_iocp(pipe)
562 ov = _overlapped.Overlapped(NULL)
Victor Stinner2b77c542015-01-22 23:50:03 +0100563 connected = ov.ConnectNamedPipe(pipe.fileno())
564
565 if connected:
566 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
567 # that the pipe is connected. There is no need to wait for the
568 # completion of the connection.
Victor Stinner41063d22015-01-26 22:30:49 +0100569 return self._result(pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700570
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100571 def finish_accept_pipe(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700572 ov.getresult()
573 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700574
Victor Stinner2b77c542015-01-22 23:50:03 +0100575 return self._register(ov, pipe, finish_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200577 async def connect_pipe(self, address):
Victor Stinnere0fd1572015-01-26 15:04:03 +0100578 delay = CONNECT_PIPE_INIT_DELAY
579 while True:
Yury Selivanov6370f342017-12-10 18:36:12 -0500580 # Unfortunately there is no way to do an overlapped connect to
581 # a pipe. Call CreateFile() in a loop until it doesn't fail with
582 # ERROR_PIPE_BUSY.
Victor Stinnere0fd1572015-01-26 15:04:03 +0100583 try:
584 handle = _overlapped.ConnectPipe(address)
585 break
586 except OSError as exc:
587 if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
588 raise
589
590 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
591 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200592 await tasks.sleep(delay, loop=self._loop)
Victor Stinnere0fd1572015-01-26 15:04:03 +0100593
594 return windows_utils.PipeHandle(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700595
Guido van Rossum90fb9142013-10-30 14:44:05 -0700596 def wait_for_handle(self, handle, timeout=None):
Victor Stinner4d825b42014-12-19 17:10:44 +0100597 """Wait for a handle.
598
599 Return a Future object. The result of the future is True if the wait
600 completed, or False if the wait did not complete (on timeout).
601 """
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100602 return self._wait_for_handle(handle, timeout, False)
603
604 def _wait_cancel(self, event, done_callback):
605 fut = self._wait_for_handle(event, None, True)
606 # add_done_callback() cannot be used because the wait may only complete
607 # in IocpProactor.close(), while the event loop is not running.
608 fut._done_callback = done_callback
609 return fut
610
611 def _wait_for_handle(self, handle, timeout, _is_cancel):
Victor Stinnerd5a6adf2019-01-15 13:05:28 +0100612 self._check_closed()
613
Guido van Rossum90fb9142013-10-30 14:44:05 -0700614 if timeout is None:
615 ms = _winapi.INFINITE
616 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100617 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
618 # round away from zero to wait *at least* timeout seconds.
619 ms = math.ceil(timeout * 1e3)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700620
621 # We only create ov so we can use ov.address as a key for the cache.
622 ov = _overlapped.Overlapped(NULL)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100623 wait_handle = _overlapped.RegisterWaitWithQueue(
Guido van Rossum90fb9142013-10-30 14:44:05 -0700624 handle, self._iocp, ov.address, ms)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100625 if _is_cancel:
626 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
627 else:
628 f = _WaitHandleFuture(ov, handle, wait_handle, self,
629 loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200630 if f._source_traceback:
631 del f._source_traceback[-1]
Guido van Rossum90fb9142013-10-30 14:44:05 -0700632
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100633 def finish_wait_for_handle(trans, key, ov):
Richard Oudkerk71196e72013-11-24 17:50:40 +0000634 # Note that this second wait means that we should only use
635 # this with handles types where a successful wait has no
636 # effect. So events or processes are all right, but locks
637 # or semaphores are not. Also note if the handle is
638 # signalled and then quickly reset, then we may return
639 # False even though we have not timed out.
Victor Stinner313a9802014-07-29 12:58:23 +0200640 return f._poll()
Guido van Rossum90fb9142013-10-30 14:44:05 -0700641
Victor Stinner313a9802014-07-29 12:58:23 +0200642 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700643 return f
644
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700645 def _register_with_iocp(self, obj):
646 # To get notifications of finished ops on this objects sent to the
647 # completion port, were must register the handle.
648 if obj not in self._registered:
649 self._registered.add(obj)
650 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
651 # XXX We could also use SetFileCompletionNotificationModes()
652 # to avoid sending notifications to completion port of ops
653 # that succeed immediately.
654
Victor Stinner2b77c542015-01-22 23:50:03 +0100655 def _register(self, ov, obj, callback):
Victor Stinnerd5a6adf2019-01-15 13:05:28 +0100656 self._check_closed()
657
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700658 # Return a future which will be set with the result of the
659 # operation when it completes. The future's value is actually
660 # the value returned by callback().
661 f = _OverlappedFuture(ov, loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200662 if f._source_traceback:
663 del f._source_traceback[-1]
Victor Stinner2b77c542015-01-22 23:50:03 +0100664 if not ov.pending:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700665 # The operation has completed, so no need to postpone the
666 # work. We cannot take this short cut if we need the
667 # NumberOfBytes, CompletionKey values returned by
668 # PostQueuedCompletionStatus().
669 try:
670 value = callback(None, None, ov)
671 except OSError as e:
672 f.set_exception(e)
673 else:
674 f.set_result(value)
Victor Stinner42d3bde2014-07-28 00:18:43 +0200675 # Even if GetOverlappedResult() was called, we have to wait for the
676 # notification of the completion in GetQueuedCompletionStatus().
677 # Register the overlapped operation to keep a reference to the
678 # OVERLAPPED object, otherwise the memory is freed and Windows may
679 # read uninitialized memory.
Victor Stinner2b77c542015-01-22 23:50:03 +0100680
681 # Register the overlapped operation for later. Note that
682 # we only store obj to prevent it from being garbage
683 # collected too early.
684 self._cache[ov.address] = (f, ov, obj, callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700685 return f
686
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100687 def _unregister(self, ov):
688 """Unregister an overlapped object.
689
690 Call this method when its future has been cancelled. The event can
691 already be signalled (pending in the proactor event queue). It is also
692 safe if the event is never signalled (because it was cancelled).
693 """
Victor Stinnerd5a6adf2019-01-15 13:05:28 +0100694 self._check_closed()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100695 self._unregistered.append(ov)
696
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700697 def _get_accept_socket(self, family):
698 s = socket.socket(family)
699 s.settimeout(0)
700 return s
701
702 def _poll(self, timeout=None):
703 if timeout is None:
704 ms = INFINITE
705 elif timeout < 0:
706 raise ValueError("negative timeout")
707 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100708 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
709 # round away from zero to wait *at least* timeout seconds.
710 ms = math.ceil(timeout * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700711 if ms >= INFINITE:
712 raise ValueError("timeout too big")
Victor Stinner313a9802014-07-29 12:58:23 +0200713
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700714 while True:
715 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
716 if status is None:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100717 break
Victor Stinner313a9802014-07-29 12:58:23 +0200718 ms = 0
719
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700720 err, transferred, key, address = status
721 try:
722 f, ov, obj, callback = self._cache.pop(address)
723 except KeyError:
Victor Stinner42d3bde2014-07-28 00:18:43 +0200724 if self._loop.get_debug():
725 self._loop.call_exception_handler({
726 'message': ('GetQueuedCompletionStatus() returned an '
727 'unexpected event'),
728 'status': ('err=%s transferred=%s key=%#x address=%#x'
729 % (err, transferred, key, address)),
730 })
731
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700732 # key is either zero, or it is used to return a pipe
733 # handle which should be closed to avoid a leak.
734 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
735 _winapi.CloseHandle(key)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700736 continue
Victor Stinner51e44ea2014-07-26 00:58:34 +0200737
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700738 if obj in self._stopped_serving:
739 f.cancel()
Victor Stinner42d3bde2014-07-28 00:18:43 +0200740 # Don't call the callback if _register() already read the result or
741 # if the overlapped has been cancelled
742 elif not f.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700743 try:
744 value = callback(transferred, key, ov)
745 except OSError as e:
746 f.set_exception(e)
747 self._results.append(f)
748 else:
749 f.set_result(value)
750 self._results.append(f)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700751
Andrew Svetlov7a6706b2017-12-13 17:50:16 +0200752 # Remove unregistered futures
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100753 for ov in self._unregistered:
754 self._cache.pop(ov.address, None)
755 self._unregistered.clear()
756
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700757 def _stop_serving(self, obj):
758 # obj is a socket or pipe handle. It will be closed in
759 # BaseProactorEventLoop._stop_serving() which will make any
760 # pending operations fail quickly.
761 self._stopped_serving.add(obj)
762
763 def close(self):
Victor Stinnerd5a6adf2019-01-15 13:05:28 +0100764 if self._iocp is None:
765 # already closed
766 return
767
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700768 # Cancel remaining registered operations.
Victor Stinnerfea6a102014-07-25 00:54:53 +0200769 for address, (fut, ov, obj, callback) in list(self._cache.items()):
Victor Stinner3d2256f2015-01-26 11:02:59 +0100770 if fut.cancelled():
Victor Stinner752aba72015-01-22 22:47:13 +0100771 # Nothing to do with cancelled futures
772 pass
Victor Stinner1ca93922015-01-22 00:17:54 +0100773 elif isinstance(fut, _WaitCancelFuture):
774 # _WaitCancelFuture must not be cancelled
775 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700776 else:
777 try:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200778 fut.cancel()
779 except OSError as exc:
780 if self._loop is not None:
781 context = {
782 'message': 'Cancelling a future failed',
783 'exception': exc,
784 'future': fut,
785 }
786 if fut._source_traceback:
787 context['source_traceback'] = fut._source_traceback
788 self._loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700789
Victor Stinnerd5a6adf2019-01-15 13:05:28 +0100790 # Wait until all cancelled overlapped complete: don't exit with running
791 # overlapped to prevent a crash. Display progress every second if the
792 # loop is still running.
793 msg_update = 1.0
794 start_time = time.monotonic()
795 next_msg = start_time + msg_update
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700796 while self._cache:
Victor Stinnerd5a6adf2019-01-15 13:05:28 +0100797 if next_msg <= time.monotonic():
798 logger.debug('%r is running after closing for %.1f seconds',
799 self, time.monotonic() - start_time)
800 next_msg = time.monotonic() + msg_update
801
802 # handle a few events, or timeout
803 self._poll(msg_update)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700804
805 self._results = []
Victor Stinnerd5a6adf2019-01-15 13:05:28 +0100806
807 _winapi.CloseHandle(self._iocp)
808 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700809
Victor Stinnerfea6a102014-07-25 00:54:53 +0200810 def __del__(self):
811 self.close()
812
Guido van Rossum59691282013-10-30 14:52:03 -0700813
814class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
815
816 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
817 self._proc = windows_utils.Popen(
818 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
819 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700820
Guido van Rossum59691282013-10-30 14:52:03 -0700821 def callback(f):
822 returncode = self._proc.poll()
823 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700824
Guido van Rossum59691282013-10-30 14:52:03 -0700825 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
826 f.add_done_callback(callback)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800827
828
829SelectorEventLoop = _WindowsSelectorEventLoop
830
831
Miss Islington (bot)07384432018-06-07 18:31:50 -0700832class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800833 _loop_factory = SelectorEventLoop
834
835
Miss Islington (bot)07384432018-06-07 18:31:50 -0700836class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
837 _loop_factory = ProactorEventLoop
838
839
840DefaultEventLoopPolicy = WindowsSelectorEventLoopPolicy