blob: 890fce8b4050c566c5467153464e814544849fac [file] [log] [blame]
Yury Selivanovb0b0e622014-02-18 22:27:48 -05001"""Selector and proactor event loops for Windows."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Victor Stinner4271dfd2017-11-28 15:19:56 +01003import _overlapped
Victor Stinnerf2e17682014-01-31 16:25:24 +01004import _winapi
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import errno
Victor Stinnerf2e17682014-01-31 16:25:24 +01006import math
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007import socket
Victor Stinnerf2e17682014-01-31 16:25:24 +01008import struct
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011from . import events
Guido van Rossum59691282013-10-30 14:52:03 -070012from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013from . import futures
14from . import proactor_events
15from . import selector_events
16from . import tasks
17from . import windows_utils
Victor Stinnerf951d282014-06-29 00:46:45 +020018from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019
20
Yury Selivanov6370f342017-12-10 18:36:12 -050021__all__ = (
22 'SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
23 'DefaultEventLoopPolicy',
24)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
27NULL = 0
28INFINITE = 0xffffffff
29ERROR_CONNECTION_REFUSED = 1225
30ERROR_CONNECTION_ABORTED = 1236
31
Victor Stinner7ffa2c52015-01-22 22:55:08 +010032# Initial delay in seconds for connect_pipe() before retrying to connect
33CONNECT_PIPE_INIT_DELAY = 0.001
34
35# Maximum delay in seconds for connect_pipe() before retrying to connect
36CONNECT_PIPE_MAX_DELAY = 0.100
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
39class _OverlappedFuture(futures.Future):
40 """Subclass of Future which represents an overlapped operation.
41
42 Cancelling it will immediately cancel the overlapped operation.
43 """
44
45 def __init__(self, ov, *, loop=None):
46 super().__init__(loop=loop)
Victor Stinnerfea6a102014-07-25 00:54:53 +020047 if self._source_traceback:
48 del self._source_traceback[-1]
Victor Stinner18a28dc2014-07-25 13:05:20 +020049 self._ov = ov
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
Victor Stinner313a9802014-07-29 12:58:23 +020051 def _repr_info(self):
52 info = super()._repr_info()
Victor Stinner18a28dc2014-07-25 13:05:20 +020053 if self._ov is not None:
54 state = 'pending' if self._ov.pending else 'completed'
Yury Selivanov6370f342017-12-10 18:36:12 -050055 info.insert(1, f'overlapped=<{state}, {self._ov.address:#x}>')
Victor Stinner313a9802014-07-29 12:58:23 +020056 return info
Victor Stinnere912e652014-07-12 03:11:53 +020057
Victor Stinner18a28dc2014-07-25 13:05:20 +020058 def _cancel_overlapped(self):
59 if self._ov is None:
60 return
61 try:
62 self._ov.cancel()
63 except OSError as exc:
64 context = {
65 'message': 'Cancelling an overlapped future failed',
66 'exception': exc,
67 'future': self,
68 }
69 if self._source_traceback:
70 context['source_traceback'] = self._source_traceback
71 self._loop.call_exception_handler(context)
72 self._ov = None
73
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 def cancel(self):
Victor Stinner18a28dc2014-07-25 13:05:20 +020075 self._cancel_overlapped()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 return super().cancel()
77
Victor Stinner18a28dc2014-07-25 13:05:20 +020078 def set_exception(self, exception):
79 super().set_exception(exception)
80 self._cancel_overlapped()
81
Victor Stinner51e44ea2014-07-26 00:58:34 +020082 def set_result(self, result):
83 super().set_result(result)
84 self._ov = None
85
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086
Victor Stinnerd0a28de2015-01-21 23:39:51 +010087class _BaseWaitHandleFuture(futures.Future):
Guido van Rossum90fb9142013-10-30 14:44:05 -070088 """Subclass of Future which represents a wait handle."""
89
Victor Stinnerd0a28de2015-01-21 23:39:51 +010090 def __init__(self, ov, handle, wait_handle, *, loop=None):
Guido van Rossum90fb9142013-10-30 14:44:05 -070091 super().__init__(loop=loop)
Victor Stinner313a9802014-07-29 12:58:23 +020092 if self._source_traceback:
93 del self._source_traceback[-1]
Victor Stinnerd0a28de2015-01-21 23:39:51 +010094 # Keep a reference to the Overlapped object to keep it alive until the
95 # wait is unregistered
Victor Stinner313a9802014-07-29 12:58:23 +020096 self._ov = ov
Victor Stinner18a28dc2014-07-25 13:05:20 +020097 self._handle = handle
Guido van Rossum90fb9142013-10-30 14:44:05 -070098 self._wait_handle = wait_handle
99
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100100 # Should we call UnregisterWaitEx() if the wait completes
101 # or is cancelled?
102 self._registered = True
103
Victor Stinner18a28dc2014-07-25 13:05:20 +0200104 def _poll(self):
105 # non-blocking wait: use a timeout of 0 millisecond
106 return (_winapi.WaitForSingleObject(self._handle, 0) ==
107 _winapi.WAIT_OBJECT_0)
108
Victor Stinner313a9802014-07-29 12:58:23 +0200109 def _repr_info(self):
110 info = super()._repr_info()
Yury Selivanov6370f342017-12-10 18:36:12 -0500111 info.append(f'handle={self._handle:#x}')
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100112 if self._handle is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200113 state = 'signaled' if self._poll() else 'waiting'
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100114 info.append(state)
115 if self._wait_handle is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500116 info.append(f'wait_handle={self._wait_handle:#x}')
Victor Stinner313a9802014-07-29 12:58:23 +0200117 return info
Victor Stinner18a28dc2014-07-25 13:05:20 +0200118
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100119 def _unregister_wait_cb(self, fut):
120 # The wait was unregistered: it's not safe to destroy the Overlapped
121 # object
122 self._ov = None
123
Victor Stinner313a9802014-07-29 12:58:23 +0200124 def _unregister_wait(self):
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100125 if not self._registered:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200126 return
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100127 self._registered = False
128
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100129 wait_handle = self._wait_handle
130 self._wait_handle = None
Guido van Rossum90fb9142013-10-30 14:44:05 -0700131 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100132 _overlapped.UnregisterWait(wait_handle)
Victor Stinnerb2614752014-08-25 23:20:52 +0200133 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100134 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerb2614752014-08-25 23:20:52 +0200135 context = {
136 'message': 'Failed to unregister the wait handle',
137 'exception': exc,
138 'future': self,
139 }
140 if self._source_traceback:
141 context['source_traceback'] = self._source_traceback
142 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100143 return
144 # ERROR_IO_PENDING means that the unregister is pending
145
146 self._unregister_wait_cb(None)
Victor Stinnerfea6a102014-07-25 00:54:53 +0200147
148 def cancel(self):
Victor Stinner313a9802014-07-29 12:58:23 +0200149 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100150 return super().cancel()
Victor Stinner313a9802014-07-29 12:58:23 +0200151
152 def set_exception(self, exception):
Victor Stinner313a9802014-07-29 12:58:23 +0200153 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100154 super().set_exception(exception)
Victor Stinner313a9802014-07-29 12:58:23 +0200155
156 def set_result(self, result):
Victor Stinner313a9802014-07-29 12:58:23 +0200157 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100158 super().set_result(result)
159
160
161class _WaitCancelFuture(_BaseWaitHandleFuture):
162 """Subclass of Future which represents a wait for the cancellation of a
163 _WaitHandleFuture using an event.
164 """
165
166 def __init__(self, ov, event, wait_handle, *, loop=None):
167 super().__init__(ov, event, wait_handle, loop=loop)
168
169 self._done_callback = None
170
Victor Stinner1ca93922015-01-22 00:17:54 +0100171 def cancel(self):
172 raise RuntimeError("_WaitCancelFuture must not be cancelled")
173
INADA Naokia8363622016-10-21 12:30:15 +0900174 def set_result(self, result):
175 super().set_result(result)
176 if self._done_callback is not None:
177 self._done_callback(self)
178
179 def set_exception(self, exception):
180 super().set_exception(exception)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100181 if self._done_callback is not None:
182 self._done_callback(self)
183
184
185class _WaitHandleFuture(_BaseWaitHandleFuture):
186 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
187 super().__init__(ov, handle, wait_handle, loop=loop)
188 self._proactor = proactor
189 self._unregister_proactor = True
190 self._event = _overlapped.CreateEvent(None, True, False, None)
191 self._event_fut = None
192
193 def _unregister_wait_cb(self, fut):
194 if self._event is not None:
195 _winapi.CloseHandle(self._event)
196 self._event = None
197 self._event_fut = None
198
199 # If the wait was cancelled, the wait may never be signalled, so
200 # it's required to unregister it. Otherwise, IocpProactor.close() will
201 # wait forever for an event which will never come.
202 #
203 # If the IocpProactor already received the event, it's safe to call
204 # _unregister() because we kept a reference to the Overlapped object
Martin Panter6245cb32016-04-15 02:14:19 +0000205 # which is used as a unique key.
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100206 self._proactor._unregister(self._ov)
207 self._proactor = None
208
209 super()._unregister_wait_cb(fut)
210
211 def _unregister_wait(self):
212 if not self._registered:
213 return
214 self._registered = False
215
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100216 wait_handle = self._wait_handle
217 self._wait_handle = None
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100218 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100219 _overlapped.UnregisterWaitEx(wait_handle, self._event)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100220 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100221 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100222 context = {
223 'message': 'Failed to unregister the wait handle',
224 'exception': exc,
225 'future': self,
226 }
227 if self._source_traceback:
228 context['source_traceback'] = self._source_traceback
229 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100230 return
231 # ERROR_IO_PENDING is not an error, the wait was unregistered
232
233 self._event_fut = self._proactor._wait_cancel(self._event,
234 self._unregister_wait_cb)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700235
236
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700237class PipeServer(object):
238 """Class representing a pipe server.
239
240 This is much like a bound, listening socket.
241 """
242 def __init__(self, address):
243 self._address = address
244 self._free_instances = weakref.WeakSet()
Victor Stinnerb2614752014-08-25 23:20:52 +0200245 # initialize the pipe attribute before calling _server_pipe_handle()
246 # because this function can raise an exception and the destructor calls
247 # the close() method
248 self._pipe = None
249 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250 self._pipe = self._server_pipe_handle(True)
251
252 def _get_unconnected_pipe(self):
253 # Create new instance and return previous one. This ensures
254 # that (until the server is closed) there is always at least
255 # one pipe handle for address. Therefore if a client attempt
256 # to connect it will not fail with FileNotFoundError.
257 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
258 return tmp
259
260 def _server_pipe_handle(self, first):
261 # Return a wrapper for a new pipe handle.
Victor Stinnera19b7b32015-01-26 15:03:20 +0100262 if self.closed():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263 return None
264 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
265 if first:
266 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
267 h = _winapi.CreateNamedPipe(
268 self._address, flags,
269 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
270 _winapi.PIPE_WAIT,
271 _winapi.PIPE_UNLIMITED_INSTANCES,
272 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
273 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
274 pipe = windows_utils.PipeHandle(h)
275 self._free_instances.add(pipe)
276 return pipe
277
Victor Stinnera19b7b32015-01-26 15:03:20 +0100278 def closed(self):
279 return (self._address is None)
280
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281 def close(self):
Victor Stinnerb2614752014-08-25 23:20:52 +0200282 if self._accept_pipe_future is not None:
283 self._accept_pipe_future.cancel()
284 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285 # Close all instances which have not been connected to by a client.
286 if self._address is not None:
287 for pipe in self._free_instances:
288 pipe.close()
289 self._pipe = None
290 self._address = None
291 self._free_instances.clear()
292
293 __del__ = close
294
295
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800296class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 """Windows version of selector event loop."""
298
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700299
300class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
301 """Windows version of proactor event loop using IOCP."""
302
303 def __init__(self, proactor=None):
304 if proactor is None:
305 proactor = IocpProactor()
306 super().__init__(proactor)
307
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200308 async def create_pipe_connection(self, protocol_factory, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 f = self._proactor.connect_pipe(address)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200310 pipe = await f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311 protocol = protocol_factory()
312 trans = self._make_duplex_pipe_transport(pipe, protocol,
313 extra={'addr': address})
314 return trans, protocol
315
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200316 async def start_serving_pipe(self, protocol_factory, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700318
Victor Stinnerb2614752014-08-25 23:20:52 +0200319 def loop_accept_pipe(f=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700320 pipe = None
321 try:
322 if f:
323 pipe = f.result()
324 server._free_instances.discard(pipe)
Victor Stinnera19b7b32015-01-26 15:03:20 +0100325
326 if server.closed():
327 # A client connected before the server was closed:
328 # drop the client (close the pipe) and exit
329 pipe.close()
330 return
331
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 protocol = protocol_factory()
333 self._make_duplex_pipe_transport(
334 pipe, protocol, extra={'addr': address})
Victor Stinnera19b7b32015-01-26 15:03:20 +0100335
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 pipe = server._get_unconnected_pipe()
337 if pipe is None:
338 return
Victor Stinnera19b7b32015-01-26 15:03:20 +0100339
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 f = self._proactor.accept_pipe(pipe)
Yury Selivanovff827f02014-02-18 18:02:19 -0500341 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342 if pipe and pipe.fileno() != -1:
Yury Selivanovff827f02014-02-18 18:02:19 -0500343 self.call_exception_handler({
344 'message': 'Pipe accept failed',
345 'exception': exc,
346 'pipe': pipe,
347 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 pipe.close()
Victor Stinnerb2614752014-08-25 23:20:52 +0200349 elif self._debug:
350 logger.warning("Accept pipe failed on pipe %r",
351 pipe, exc_info=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352 except futures.CancelledError:
353 if pipe:
354 pipe.close()
355 else:
Victor Stinnerb2614752014-08-25 23:20:52 +0200356 server._accept_pipe_future = f
357 f.add_done_callback(loop_accept_pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700358
Victor Stinnerb2614752014-08-25 23:20:52 +0200359 self.call_soon(loop_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 return [server]
361
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200362 async def _make_subprocess_transport(self, protocol, args, shell,
363 stdin, stdout, stderr, bufsize,
364 extra=None, **kwargs):
Yury Selivanov7661db62016-05-16 15:38:39 -0400365 waiter = self.create_future()
Guido van Rossum59691282013-10-30 14:52:03 -0700366 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
367 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100368 waiter=waiter, extra=extra,
369 **kwargs)
Victor Stinner4bf22e02015-01-15 14:24:22 +0100370 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200371 await waiter
372 except Exception:
Victor Stinner4bf22e02015-01-15 14:24:22 +0100373 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200374 await transp._wait()
375 raise
Victor Stinner4bf22e02015-01-15 14:24:22 +0100376
Guido van Rossum59691282013-10-30 14:52:03 -0700377 return transp
378
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379
380class IocpProactor:
381 """Proactor implementation using IOCP."""
382
383 def __init__(self, concurrency=0xffffffff):
384 self._loop = None
385 self._results = []
386 self._iocp = _overlapped.CreateIoCompletionPort(
387 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
388 self._cache = {}
389 self._registered = weakref.WeakSet()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100390 self._unregistered = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391 self._stopped_serving = weakref.WeakSet()
392
Victor Stinnerfea6a102014-07-25 00:54:53 +0200393 def __repr__(self):
394 return ('<%s overlapped#=%s result#=%s>'
395 % (self.__class__.__name__, len(self._cache),
396 len(self._results)))
397
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 def set_loop(self, loop):
399 self._loop = loop
400
401 def select(self, timeout=None):
402 if not self._results:
403 self._poll(timeout)
404 tmp = self._results
405 self._results = []
406 return tmp
407
Victor Stinner41063d22015-01-26 22:30:49 +0100408 def _result(self, value):
Yury Selivanov7661db62016-05-16 15:38:39 -0400409 fut = self._loop.create_future()
Victor Stinner41063d22015-01-26 22:30:49 +0100410 fut.set_result(value)
411 return fut
412
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413 def recv(self, conn, nbytes, flags=0):
414 self._register_with_iocp(conn)
415 ov = _overlapped.Overlapped(NULL)
Victor Stinner41063d22015-01-26 22:30:49 +0100416 try:
417 if isinstance(conn, socket.socket):
418 ov.WSARecv(conn.fileno(), nbytes, flags)
419 else:
420 ov.ReadFile(conn.fileno(), nbytes)
421 except BrokenPipeError:
422 return self._result(b'')
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700423
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100424 def finish_recv(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425 try:
426 return ov.getresult()
427 except OSError as exc:
428 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
429 raise ConnectionResetError(*exc.args)
430 else:
431 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700432
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100433 return self._register(ov, conn, finish_recv)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200435 def recv_into(self, conn, buf, flags=0):
436 self._register_with_iocp(conn)
437 ov = _overlapped.Overlapped(NULL)
438 try:
439 if isinstance(conn, socket.socket):
440 ov.WSARecvInto(conn.fileno(), buf, flags)
441 else:
442 ov.ReadFileInto(conn.fileno(), buf)
443 except BrokenPipeError:
444 return self._result(b'')
445
446 def finish_recv(trans, key, ov):
447 try:
448 return ov.getresult()
449 except OSError as exc:
450 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
451 raise ConnectionResetError(*exc.args)
452 else:
453 raise
454
455 return self._register(ov, conn, finish_recv)
456
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457 def send(self, conn, buf, flags=0):
458 self._register_with_iocp(conn)
459 ov = _overlapped.Overlapped(NULL)
460 if isinstance(conn, socket.socket):
461 ov.WSASend(conn.fileno(), buf, flags)
462 else:
463 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700464
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100465 def finish_send(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466 try:
467 return ov.getresult()
468 except OSError as exc:
469 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
470 raise ConnectionResetError(*exc.args)
471 else:
472 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700473
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100474 return self._register(ov, conn, finish_send)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700475
476 def accept(self, listener):
477 self._register_with_iocp(listener)
478 conn = self._get_accept_socket(listener.family)
479 ov = _overlapped.Overlapped(NULL)
480 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700481
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482 def finish_accept(trans, key, ov):
483 ov.getresult()
484 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
485 buf = struct.pack('@P', listener.fileno())
486 conn.setsockopt(socket.SOL_SOCKET,
487 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
488 conn.settimeout(listener.gettimeout())
489 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700490
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200491 async def accept_coro(future, conn):
Victor Stinner7de26462014-01-11 00:03:21 +0100492 # Coroutine closing the accept socket if the future is cancelled
493 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200494 await future
Victor Stinner7de26462014-01-11 00:03:21 +0100495 except futures.CancelledError:
496 conn.close()
497 raise
498
499 future = self._register(ov, listener, finish_accept)
500 coro = accept_coro(future, conn)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400501 tasks.ensure_future(coro, loop=self._loop)
Victor Stinner7de26462014-01-11 00:03:21 +0100502 return future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700503
504 def connect(self, conn, address):
505 self._register_with_iocp(conn)
506 # The socket needs to be locally bound before we call ConnectEx().
507 try:
508 _overlapped.BindLocal(conn.fileno(), conn.family)
509 except OSError as e:
510 if e.winerror != errno.WSAEINVAL:
511 raise
512 # Probably already locally bound; check using getsockname().
513 if conn.getsockname()[1] == 0:
514 raise
515 ov = _overlapped.Overlapped(NULL)
516 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700517
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518 def finish_connect(trans, key, ov):
519 ov.getresult()
520 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
521 conn.setsockopt(socket.SOL_SOCKET,
522 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
523 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700524
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700525 return self._register(ov, conn, finish_connect)
526
527 def accept_pipe(self, pipe):
528 self._register_with_iocp(pipe)
529 ov = _overlapped.Overlapped(NULL)
Victor Stinner2b77c542015-01-22 23:50:03 +0100530 connected = ov.ConnectNamedPipe(pipe.fileno())
531
532 if connected:
533 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
534 # that the pipe is connected. There is no need to wait for the
535 # completion of the connection.
Victor Stinner41063d22015-01-26 22:30:49 +0100536 return self._result(pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700537
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100538 def finish_accept_pipe(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700539 ov.getresult()
540 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700541
Victor Stinner2b77c542015-01-22 23:50:03 +0100542 return self._register(ov, pipe, finish_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200544 async def connect_pipe(self, address):
Victor Stinnere0fd1572015-01-26 15:04:03 +0100545 delay = CONNECT_PIPE_INIT_DELAY
546 while True:
Yury Selivanov6370f342017-12-10 18:36:12 -0500547 # Unfortunately there is no way to do an overlapped connect to
548 # a pipe. Call CreateFile() in a loop until it doesn't fail with
549 # ERROR_PIPE_BUSY.
Victor Stinnere0fd1572015-01-26 15:04:03 +0100550 try:
551 handle = _overlapped.ConnectPipe(address)
552 break
553 except OSError as exc:
554 if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
555 raise
556
557 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
558 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200559 await tasks.sleep(delay, loop=self._loop)
Victor Stinnere0fd1572015-01-26 15:04:03 +0100560
561 return windows_utils.PipeHandle(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700562
Guido van Rossum90fb9142013-10-30 14:44:05 -0700563 def wait_for_handle(self, handle, timeout=None):
Victor Stinner4d825b42014-12-19 17:10:44 +0100564 """Wait for a handle.
565
566 Return a Future object. The result of the future is True if the wait
567 completed, or False if the wait did not complete (on timeout).
568 """
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100569 return self._wait_for_handle(handle, timeout, False)
570
571 def _wait_cancel(self, event, done_callback):
572 fut = self._wait_for_handle(event, None, True)
573 # add_done_callback() cannot be used because the wait may only complete
574 # in IocpProactor.close(), while the event loop is not running.
575 fut._done_callback = done_callback
576 return fut
577
578 def _wait_for_handle(self, handle, timeout, _is_cancel):
Guido van Rossum90fb9142013-10-30 14:44:05 -0700579 if timeout is None:
580 ms = _winapi.INFINITE
581 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100582 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
583 # round away from zero to wait *at least* timeout seconds.
584 ms = math.ceil(timeout * 1e3)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700585
586 # We only create ov so we can use ov.address as a key for the cache.
587 ov = _overlapped.Overlapped(NULL)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100588 wait_handle = _overlapped.RegisterWaitWithQueue(
Guido van Rossum90fb9142013-10-30 14:44:05 -0700589 handle, self._iocp, ov.address, ms)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100590 if _is_cancel:
591 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
592 else:
593 f = _WaitHandleFuture(ov, handle, wait_handle, self,
594 loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200595 if f._source_traceback:
596 del f._source_traceback[-1]
Guido van Rossum90fb9142013-10-30 14:44:05 -0700597
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100598 def finish_wait_for_handle(trans, key, ov):
Richard Oudkerk71196e72013-11-24 17:50:40 +0000599 # Note that this second wait means that we should only use
600 # this with handles types where a successful wait has no
601 # effect. So events or processes are all right, but locks
602 # or semaphores are not. Also note if the handle is
603 # signalled and then quickly reset, then we may return
604 # False even though we have not timed out.
Victor Stinner313a9802014-07-29 12:58:23 +0200605 return f._poll()
Guido van Rossum90fb9142013-10-30 14:44:05 -0700606
Victor Stinner313a9802014-07-29 12:58:23 +0200607 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700608 return f
609
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700610 def _register_with_iocp(self, obj):
611 # To get notifications of finished ops on this objects sent to the
612 # completion port, were must register the handle.
613 if obj not in self._registered:
614 self._registered.add(obj)
615 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
616 # XXX We could also use SetFileCompletionNotificationModes()
617 # to avoid sending notifications to completion port of ops
618 # that succeed immediately.
619
Victor Stinner2b77c542015-01-22 23:50:03 +0100620 def _register(self, ov, obj, callback):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700621 # Return a future which will be set with the result of the
622 # operation when it completes. The future's value is actually
623 # the value returned by callback().
624 f = _OverlappedFuture(ov, loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200625 if f._source_traceback:
626 del f._source_traceback[-1]
Victor Stinner2b77c542015-01-22 23:50:03 +0100627 if not ov.pending:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628 # The operation has completed, so no need to postpone the
629 # work. We cannot take this short cut if we need the
630 # NumberOfBytes, CompletionKey values returned by
631 # PostQueuedCompletionStatus().
632 try:
633 value = callback(None, None, ov)
634 except OSError as e:
635 f.set_exception(e)
636 else:
637 f.set_result(value)
Victor Stinner42d3bde2014-07-28 00:18:43 +0200638 # Even if GetOverlappedResult() was called, we have to wait for the
639 # notification of the completion in GetQueuedCompletionStatus().
640 # Register the overlapped operation to keep a reference to the
641 # OVERLAPPED object, otherwise the memory is freed and Windows may
642 # read uninitialized memory.
Victor Stinner2b77c542015-01-22 23:50:03 +0100643
644 # Register the overlapped operation for later. Note that
645 # we only store obj to prevent it from being garbage
646 # collected too early.
647 self._cache[ov.address] = (f, ov, obj, callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700648 return f
649
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100650 def _unregister(self, ov):
651 """Unregister an overlapped object.
652
653 Call this method when its future has been cancelled. The event can
654 already be signalled (pending in the proactor event queue). It is also
655 safe if the event is never signalled (because it was cancelled).
656 """
657 self._unregistered.append(ov)
658
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700659 def _get_accept_socket(self, family):
660 s = socket.socket(family)
661 s.settimeout(0)
662 return s
663
664 def _poll(self, timeout=None):
665 if timeout is None:
666 ms = INFINITE
667 elif timeout < 0:
668 raise ValueError("negative timeout")
669 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100670 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
671 # round away from zero to wait *at least* timeout seconds.
672 ms = math.ceil(timeout * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700673 if ms >= INFINITE:
674 raise ValueError("timeout too big")
Victor Stinner313a9802014-07-29 12:58:23 +0200675
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700676 while True:
677 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
678 if status is None:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100679 break
Victor Stinner313a9802014-07-29 12:58:23 +0200680 ms = 0
681
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700682 err, transferred, key, address = status
683 try:
684 f, ov, obj, callback = self._cache.pop(address)
685 except KeyError:
Victor Stinner42d3bde2014-07-28 00:18:43 +0200686 if self._loop.get_debug():
687 self._loop.call_exception_handler({
688 'message': ('GetQueuedCompletionStatus() returned an '
689 'unexpected event'),
690 'status': ('err=%s transferred=%s key=%#x address=%#x'
691 % (err, transferred, key, address)),
692 })
693
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700694 # key is either zero, or it is used to return a pipe
695 # handle which should be closed to avoid a leak.
696 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
697 _winapi.CloseHandle(key)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700698 continue
Victor Stinner51e44ea2014-07-26 00:58:34 +0200699
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700700 if obj in self._stopped_serving:
701 f.cancel()
Victor Stinner42d3bde2014-07-28 00:18:43 +0200702 # Don't call the callback if _register() already read the result or
703 # if the overlapped has been cancelled
704 elif not f.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700705 try:
706 value = callback(transferred, key, ov)
707 except OSError as e:
708 f.set_exception(e)
709 self._results.append(f)
710 else:
711 f.set_result(value)
712 self._results.append(f)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700713
Andrew Svetlov7a6706b2017-12-13 17:50:16 +0200714 # Remove unregistered futures
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100715 for ov in self._unregistered:
716 self._cache.pop(ov.address, None)
717 self._unregistered.clear()
718
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700719 def _stop_serving(self, obj):
720 # obj is a socket or pipe handle. It will be closed in
721 # BaseProactorEventLoop._stop_serving() which will make any
722 # pending operations fail quickly.
723 self._stopped_serving.add(obj)
724
725 def close(self):
726 # Cancel remaining registered operations.
Victor Stinnerfea6a102014-07-25 00:54:53 +0200727 for address, (fut, ov, obj, callback) in list(self._cache.items()):
Victor Stinner3d2256f2015-01-26 11:02:59 +0100728 if fut.cancelled():
Victor Stinner752aba72015-01-22 22:47:13 +0100729 # Nothing to do with cancelled futures
730 pass
Victor Stinner1ca93922015-01-22 00:17:54 +0100731 elif isinstance(fut, _WaitCancelFuture):
732 # _WaitCancelFuture must not be cancelled
733 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700734 else:
735 try:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200736 fut.cancel()
737 except OSError as exc:
738 if self._loop is not None:
739 context = {
740 'message': 'Cancelling a future failed',
741 'exception': exc,
742 'future': fut,
743 }
744 if fut._source_traceback:
745 context['source_traceback'] = fut._source_traceback
746 self._loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700747
748 while self._cache:
749 if not self._poll(1):
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700750 logger.debug('taking long time to close proactor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700751
752 self._results = []
753 if self._iocp is not None:
754 _winapi.CloseHandle(self._iocp)
755 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700756
Victor Stinnerfea6a102014-07-25 00:54:53 +0200757 def __del__(self):
758 self.close()
759
Guido van Rossum59691282013-10-30 14:52:03 -0700760
761class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
762
763 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
764 self._proc = windows_utils.Popen(
765 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
766 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700767
Guido van Rossum59691282013-10-30 14:52:03 -0700768 def callback(f):
769 returncode = self._proc.poll()
770 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700771
Guido van Rossum59691282013-10-30 14:52:03 -0700772 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
773 f.add_done_callback(callback)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800774
775
776SelectorEventLoop = _WindowsSelectorEventLoop
777
778
779class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
780 _loop_factory = SelectorEventLoop
781
782
783DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy