blob: 668fe1451b65ac034021e08671402f789393c911 [file] [log] [blame]
Yury Selivanovb0b0e622014-02-18 22:27:48 -05001"""Selector and proactor event loops for Windows."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Victor Stinnerf2e17682014-01-31 16:25:24 +01003import _winapi
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import errno
Victor Stinnerf2e17682014-01-31 16:25:24 +01005import math
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import socket
Victor Stinnerf2e17682014-01-31 16:25:24 +01007import struct
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010from . import events
Guido van Rossum59691282013-10-30 14:52:03 -070011from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012from . import futures
13from . import proactor_events
14from . import selector_events
15from . import tasks
16from . import windows_utils
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import _overlapped
Victor Stinnerf951d282014-06-29 00:46:45 +020018from .coroutines import coroutine
19from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080022__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
23 'DefaultEventLoopPolicy',
24 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
27NULL = 0
28INFINITE = 0xffffffff
29ERROR_CONNECTION_REFUSED = 1225
30ERROR_CONNECTION_ABORTED = 1236
31
Victor Stinner7ffa2c52015-01-22 22:55:08 +010032# Initial delay in seconds for connect_pipe() before retrying to connect
33CONNECT_PIPE_INIT_DELAY = 0.001
34
35# Maximum delay in seconds for connect_pipe() before retrying to connect
36CONNECT_PIPE_MAX_DELAY = 0.100
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
39class _OverlappedFuture(futures.Future):
40 """Subclass of Future which represents an overlapped operation.
41
42 Cancelling it will immediately cancel the overlapped operation.
43 """
44
45 def __init__(self, ov, *, loop=None):
46 super().__init__(loop=loop)
Victor Stinnerfea6a102014-07-25 00:54:53 +020047 if self._source_traceback:
48 del self._source_traceback[-1]
Victor Stinner18a28dc2014-07-25 13:05:20 +020049 self._ov = ov
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
Victor Stinner313a9802014-07-29 12:58:23 +020051 def _repr_info(self):
52 info = super()._repr_info()
Victor Stinner18a28dc2014-07-25 13:05:20 +020053 if self._ov is not None:
54 state = 'pending' if self._ov.pending else 'completed'
Victor Stinner313a9802014-07-29 12:58:23 +020055 info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address))
56 return info
Victor Stinnere912e652014-07-12 03:11:53 +020057
Victor Stinner18a28dc2014-07-25 13:05:20 +020058 def _cancel_overlapped(self):
59 if self._ov is None:
60 return
61 try:
62 self._ov.cancel()
63 except OSError as exc:
64 context = {
65 'message': 'Cancelling an overlapped future failed',
66 'exception': exc,
67 'future': self,
68 }
69 if self._source_traceback:
70 context['source_traceback'] = self._source_traceback
71 self._loop.call_exception_handler(context)
72 self._ov = None
73
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 def cancel(self):
Victor Stinner18a28dc2014-07-25 13:05:20 +020075 self._cancel_overlapped()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 return super().cancel()
77
Victor Stinner18a28dc2014-07-25 13:05:20 +020078 def set_exception(self, exception):
79 super().set_exception(exception)
80 self._cancel_overlapped()
81
Victor Stinner51e44ea2014-07-26 00:58:34 +020082 def set_result(self, result):
83 super().set_result(result)
84 self._ov = None
85
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086
Victor Stinnerd0a28de2015-01-21 23:39:51 +010087class _BaseWaitHandleFuture(futures.Future):
Guido van Rossum90fb9142013-10-30 14:44:05 -070088 """Subclass of Future which represents a wait handle."""
89
Victor Stinnerd0a28de2015-01-21 23:39:51 +010090 def __init__(self, ov, handle, wait_handle, *, loop=None):
Guido van Rossum90fb9142013-10-30 14:44:05 -070091 super().__init__(loop=loop)
Victor Stinner313a9802014-07-29 12:58:23 +020092 if self._source_traceback:
93 del self._source_traceback[-1]
Victor Stinnerd0a28de2015-01-21 23:39:51 +010094 # Keep a reference to the Overlapped object to keep it alive until the
95 # wait is unregistered
Victor Stinner313a9802014-07-29 12:58:23 +020096 self._ov = ov
Victor Stinner18a28dc2014-07-25 13:05:20 +020097 self._handle = handle
Guido van Rossum90fb9142013-10-30 14:44:05 -070098 self._wait_handle = wait_handle
99
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100100 # Should we call UnregisterWaitEx() if the wait completes
101 # or is cancelled?
102 self._registered = True
103
Victor Stinner18a28dc2014-07-25 13:05:20 +0200104 def _poll(self):
105 # non-blocking wait: use a timeout of 0 millisecond
106 return (_winapi.WaitForSingleObject(self._handle, 0) ==
107 _winapi.WAIT_OBJECT_0)
108
Victor Stinner313a9802014-07-29 12:58:23 +0200109 def _repr_info(self):
110 info = super()._repr_info()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100111 info.append('handle=%#x' % self._handle)
112 if self._handle is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200113 state = 'signaled' if self._poll() else 'waiting'
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100114 info.append(state)
115 if self._wait_handle is not None:
116 info.append('wait_handle=%#x' % self._wait_handle)
Victor Stinner313a9802014-07-29 12:58:23 +0200117 return info
Victor Stinner18a28dc2014-07-25 13:05:20 +0200118
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100119 def _unregister_wait_cb(self, fut):
120 # The wait was unregistered: it's not safe to destroy the Overlapped
121 # object
122 self._ov = None
123
Victor Stinner313a9802014-07-29 12:58:23 +0200124 def _unregister_wait(self):
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100125 if not self._registered:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200126 return
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100127 self._registered = False
128
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100129 wait_handle = self._wait_handle
130 self._wait_handle = None
Guido van Rossum90fb9142013-10-30 14:44:05 -0700131 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100132 _overlapped.UnregisterWait(wait_handle)
Victor Stinnerb2614752014-08-25 23:20:52 +0200133 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100134 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerb2614752014-08-25 23:20:52 +0200135 context = {
136 'message': 'Failed to unregister the wait handle',
137 'exception': exc,
138 'future': self,
139 }
140 if self._source_traceback:
141 context['source_traceback'] = self._source_traceback
142 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100143 return
144 # ERROR_IO_PENDING means that the unregister is pending
145
146 self._unregister_wait_cb(None)
Victor Stinnerfea6a102014-07-25 00:54:53 +0200147
148 def cancel(self):
Victor Stinner313a9802014-07-29 12:58:23 +0200149 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100150 return super().cancel()
Victor Stinner313a9802014-07-29 12:58:23 +0200151
152 def set_exception(self, exception):
Victor Stinner313a9802014-07-29 12:58:23 +0200153 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100154 super().set_exception(exception)
Victor Stinner313a9802014-07-29 12:58:23 +0200155
156 def set_result(self, result):
Victor Stinner313a9802014-07-29 12:58:23 +0200157 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100158 super().set_result(result)
159
160
161class _WaitCancelFuture(_BaseWaitHandleFuture):
162 """Subclass of Future which represents a wait for the cancellation of a
163 _WaitHandleFuture using an event.
164 """
165
166 def __init__(self, ov, event, wait_handle, *, loop=None):
167 super().__init__(ov, event, wait_handle, loop=loop)
168
169 self._done_callback = None
170
Victor Stinner1ca93922015-01-22 00:17:54 +0100171 def cancel(self):
172 raise RuntimeError("_WaitCancelFuture must not be cancelled")
173
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100174 def _schedule_callbacks(self):
175 super(_WaitCancelFuture, self)._schedule_callbacks()
176 if self._done_callback is not None:
177 self._done_callback(self)
178
179
180class _WaitHandleFuture(_BaseWaitHandleFuture):
181 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
182 super().__init__(ov, handle, wait_handle, loop=loop)
183 self._proactor = proactor
184 self._unregister_proactor = True
185 self._event = _overlapped.CreateEvent(None, True, False, None)
186 self._event_fut = None
187
188 def _unregister_wait_cb(self, fut):
189 if self._event is not None:
190 _winapi.CloseHandle(self._event)
191 self._event = None
192 self._event_fut = None
193
194 # If the wait was cancelled, the wait may never be signalled, so
195 # it's required to unregister it. Otherwise, IocpProactor.close() will
196 # wait forever for an event which will never come.
197 #
198 # If the IocpProactor already received the event, it's safe to call
199 # _unregister() because we kept a reference to the Overlapped object
Martin Panter6245cb32016-04-15 02:14:19 +0000200 # which is used as a unique key.
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100201 self._proactor._unregister(self._ov)
202 self._proactor = None
203
204 super()._unregister_wait_cb(fut)
205
206 def _unregister_wait(self):
207 if not self._registered:
208 return
209 self._registered = False
210
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100211 wait_handle = self._wait_handle
212 self._wait_handle = None
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100213 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100214 _overlapped.UnregisterWaitEx(wait_handle, self._event)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100215 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100216 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100217 context = {
218 'message': 'Failed to unregister the wait handle',
219 'exception': exc,
220 'future': self,
221 }
222 if self._source_traceback:
223 context['source_traceback'] = self._source_traceback
224 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100225 return
226 # ERROR_IO_PENDING is not an error, the wait was unregistered
227
228 self._event_fut = self._proactor._wait_cancel(self._event,
229 self._unregister_wait_cb)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700230
231
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232class PipeServer(object):
233 """Class representing a pipe server.
234
235 This is much like a bound, listening socket.
236 """
237 def __init__(self, address):
238 self._address = address
239 self._free_instances = weakref.WeakSet()
Victor Stinnerb2614752014-08-25 23:20:52 +0200240 # initialize the pipe attribute before calling _server_pipe_handle()
241 # because this function can raise an exception and the destructor calls
242 # the close() method
243 self._pipe = None
244 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700245 self._pipe = self._server_pipe_handle(True)
246
247 def _get_unconnected_pipe(self):
248 # Create new instance and return previous one. This ensures
249 # that (until the server is closed) there is always at least
250 # one pipe handle for address. Therefore if a client attempt
251 # to connect it will not fail with FileNotFoundError.
252 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
253 return tmp
254
255 def _server_pipe_handle(self, first):
256 # Return a wrapper for a new pipe handle.
Victor Stinnera19b7b32015-01-26 15:03:20 +0100257 if self.closed():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700258 return None
259 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
260 if first:
261 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
262 h = _winapi.CreateNamedPipe(
263 self._address, flags,
264 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
265 _winapi.PIPE_WAIT,
266 _winapi.PIPE_UNLIMITED_INSTANCES,
267 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
268 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
269 pipe = windows_utils.PipeHandle(h)
270 self._free_instances.add(pipe)
271 return pipe
272
Victor Stinnera19b7b32015-01-26 15:03:20 +0100273 def closed(self):
274 return (self._address is None)
275
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276 def close(self):
Victor Stinnerb2614752014-08-25 23:20:52 +0200277 if self._accept_pipe_future is not None:
278 self._accept_pipe_future.cancel()
279 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700280 # Close all instances which have not been connected to by a client.
281 if self._address is not None:
282 for pipe in self._free_instances:
283 pipe.close()
284 self._pipe = None
285 self._address = None
286 self._free_instances.clear()
287
288 __del__ = close
289
290
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800291class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700292 """Windows version of selector event loop."""
293
294 def _socketpair(self):
295 return windows_utils.socketpair()
296
297
298class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
299 """Windows version of proactor event loop using IOCP."""
300
301 def __init__(self, proactor=None):
302 if proactor is None:
303 proactor = IocpProactor()
304 super().__init__(proactor)
305
306 def _socketpair(self):
307 return windows_utils.socketpair()
308
Victor Stinnerf951d282014-06-29 00:46:45 +0200309 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700310 def create_pipe_connection(self, protocol_factory, address):
311 f = self._proactor.connect_pipe(address)
312 pipe = yield from f
313 protocol = protocol_factory()
314 trans = self._make_duplex_pipe_transport(pipe, protocol,
315 extra={'addr': address})
316 return trans, protocol
317
Victor Stinnerf951d282014-06-29 00:46:45 +0200318 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319 def start_serving_pipe(self, protocol_factory, address):
320 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700321
Victor Stinnerb2614752014-08-25 23:20:52 +0200322 def loop_accept_pipe(f=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323 pipe = None
324 try:
325 if f:
326 pipe = f.result()
327 server._free_instances.discard(pipe)
Victor Stinnera19b7b32015-01-26 15:03:20 +0100328
329 if server.closed():
330 # A client connected before the server was closed:
331 # drop the client (close the pipe) and exit
332 pipe.close()
333 return
334
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 protocol = protocol_factory()
336 self._make_duplex_pipe_transport(
337 pipe, protocol, extra={'addr': address})
Victor Stinnera19b7b32015-01-26 15:03:20 +0100338
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 pipe = server._get_unconnected_pipe()
340 if pipe is None:
341 return
Victor Stinnera19b7b32015-01-26 15:03:20 +0100342
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 f = self._proactor.accept_pipe(pipe)
Yury Selivanovff827f02014-02-18 18:02:19 -0500344 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 if pipe and pipe.fileno() != -1:
Yury Selivanovff827f02014-02-18 18:02:19 -0500346 self.call_exception_handler({
347 'message': 'Pipe accept failed',
348 'exception': exc,
349 'pipe': pipe,
350 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 pipe.close()
Victor Stinnerb2614752014-08-25 23:20:52 +0200352 elif self._debug:
353 logger.warning("Accept pipe failed on pipe %r",
354 pipe, exc_info=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355 except futures.CancelledError:
356 if pipe:
357 pipe.close()
358 else:
Victor Stinnerb2614752014-08-25 23:20:52 +0200359 server._accept_pipe_future = f
360 f.add_done_callback(loop_accept_pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700361
Victor Stinnerb2614752014-08-25 23:20:52 +0200362 self.call_soon(loop_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700363 return [server]
364
Victor Stinnerf951d282014-06-29 00:46:45 +0200365 @coroutine
Guido van Rossum59691282013-10-30 14:52:03 -0700366 def _make_subprocess_transport(self, protocol, args, shell,
367 stdin, stdout, stderr, bufsize,
368 extra=None, **kwargs):
Yury Selivanov7661db62016-05-16 15:38:39 -0400369 waiter = self.create_future()
Guido van Rossum59691282013-10-30 14:52:03 -0700370 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
371 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100372 waiter=waiter, extra=extra,
373 **kwargs)
Victor Stinner4bf22e02015-01-15 14:24:22 +0100374 try:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100375 yield from waiter
Victor Stinner5d44c082015-02-02 18:36:31 +0100376 except Exception as exc:
377 # Workaround CPython bug #23353: using yield/yield-from in an
378 # except block of a generator doesn't clear properly sys.exc_info()
379 err = exc
380 else:
381 err = None
382
383 if err is not None:
Victor Stinner4bf22e02015-01-15 14:24:22 +0100384 transp.close()
Victor Stinner1241ecc2015-01-30 00:16:14 +0100385 yield from transp._wait()
Victor Stinner5d44c082015-02-02 18:36:31 +0100386 raise err
Victor Stinner4bf22e02015-01-15 14:24:22 +0100387
Guido van Rossum59691282013-10-30 14:52:03 -0700388 return transp
389
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390
391class IocpProactor:
392 """Proactor implementation using IOCP."""
393
394 def __init__(self, concurrency=0xffffffff):
395 self._loop = None
396 self._results = []
397 self._iocp = _overlapped.CreateIoCompletionPort(
398 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
399 self._cache = {}
400 self._registered = weakref.WeakSet()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100401 self._unregistered = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 self._stopped_serving = weakref.WeakSet()
403
Victor Stinnerfea6a102014-07-25 00:54:53 +0200404 def __repr__(self):
405 return ('<%s overlapped#=%s result#=%s>'
406 % (self.__class__.__name__, len(self._cache),
407 len(self._results)))
408
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 def set_loop(self, loop):
410 self._loop = loop
411
412 def select(self, timeout=None):
413 if not self._results:
414 self._poll(timeout)
415 tmp = self._results
416 self._results = []
417 return tmp
418
Victor Stinner41063d22015-01-26 22:30:49 +0100419 def _result(self, value):
Yury Selivanov7661db62016-05-16 15:38:39 -0400420 fut = self._loop.create_future()
Victor Stinner41063d22015-01-26 22:30:49 +0100421 fut.set_result(value)
422 return fut
423
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424 def recv(self, conn, nbytes, flags=0):
425 self._register_with_iocp(conn)
426 ov = _overlapped.Overlapped(NULL)
Victor Stinner41063d22015-01-26 22:30:49 +0100427 try:
428 if isinstance(conn, socket.socket):
429 ov.WSARecv(conn.fileno(), nbytes, flags)
430 else:
431 ov.ReadFile(conn.fileno(), nbytes)
432 except BrokenPipeError:
433 return self._result(b'')
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700434
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100435 def finish_recv(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 try:
437 return ov.getresult()
438 except OSError as exc:
439 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
440 raise ConnectionResetError(*exc.args)
441 else:
442 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700443
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100444 return self._register(ov, conn, finish_recv)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445
446 def send(self, conn, buf, flags=0):
447 self._register_with_iocp(conn)
448 ov = _overlapped.Overlapped(NULL)
449 if isinstance(conn, socket.socket):
450 ov.WSASend(conn.fileno(), buf, flags)
451 else:
452 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700453
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100454 def finish_send(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455 try:
456 return ov.getresult()
457 except OSError as exc:
458 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
459 raise ConnectionResetError(*exc.args)
460 else:
461 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700462
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100463 return self._register(ov, conn, finish_send)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700464
465 def accept(self, listener):
466 self._register_with_iocp(listener)
467 conn = self._get_accept_socket(listener.family)
468 ov = _overlapped.Overlapped(NULL)
469 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700470
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471 def finish_accept(trans, key, ov):
472 ov.getresult()
473 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
474 buf = struct.pack('@P', listener.fileno())
475 conn.setsockopt(socket.SOL_SOCKET,
476 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
477 conn.settimeout(listener.gettimeout())
478 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700479
Victor Stinnerf951d282014-06-29 00:46:45 +0200480 @coroutine
Victor Stinner7de26462014-01-11 00:03:21 +0100481 def accept_coro(future, conn):
482 # Coroutine closing the accept socket if the future is cancelled
483 try:
484 yield from future
485 except futures.CancelledError:
486 conn.close()
487 raise
488
489 future = self._register(ov, listener, finish_accept)
490 coro = accept_coro(future, conn)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400491 tasks.ensure_future(coro, loop=self._loop)
Victor Stinner7de26462014-01-11 00:03:21 +0100492 return future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700493
494 def connect(self, conn, address):
495 self._register_with_iocp(conn)
496 # The socket needs to be locally bound before we call ConnectEx().
497 try:
498 _overlapped.BindLocal(conn.fileno(), conn.family)
499 except OSError as e:
500 if e.winerror != errno.WSAEINVAL:
501 raise
502 # Probably already locally bound; check using getsockname().
503 if conn.getsockname()[1] == 0:
504 raise
505 ov = _overlapped.Overlapped(NULL)
506 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700507
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700508 def finish_connect(trans, key, ov):
509 ov.getresult()
510 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
511 conn.setsockopt(socket.SOL_SOCKET,
512 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
513 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700514
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700515 return self._register(ov, conn, finish_connect)
516
517 def accept_pipe(self, pipe):
518 self._register_with_iocp(pipe)
519 ov = _overlapped.Overlapped(NULL)
Victor Stinner2b77c542015-01-22 23:50:03 +0100520 connected = ov.ConnectNamedPipe(pipe.fileno())
521
522 if connected:
523 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
524 # that the pipe is connected. There is no need to wait for the
525 # completion of the connection.
Victor Stinner41063d22015-01-26 22:30:49 +0100526 return self._result(pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700527
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100528 def finish_accept_pipe(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700529 ov.getresult()
530 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700531
Victor Stinner2b77c542015-01-22 23:50:03 +0100532 return self._register(ov, pipe, finish_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533
Victor Stinnere0fd1572015-01-26 15:04:03 +0100534 @coroutine
Victor Stinner7ffa2c52015-01-22 22:55:08 +0100535 def connect_pipe(self, address):
Victor Stinnere0fd1572015-01-26 15:04:03 +0100536 delay = CONNECT_PIPE_INIT_DELAY
537 while True:
538 # Unfortunately there is no way to do an overlapped connect to a pipe.
539 # Call CreateFile() in a loop until it doesn't fail with
540 # ERROR_PIPE_BUSY
541 try:
542 handle = _overlapped.ConnectPipe(address)
543 break
544 except OSError as exc:
545 if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
546 raise
547
548 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
549 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
550 yield from tasks.sleep(delay, loop=self._loop)
551
552 return windows_utils.PipeHandle(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553
Guido van Rossum90fb9142013-10-30 14:44:05 -0700554 def wait_for_handle(self, handle, timeout=None):
Victor Stinner4d825b42014-12-19 17:10:44 +0100555 """Wait for a handle.
556
557 Return a Future object. The result of the future is True if the wait
558 completed, or False if the wait did not complete (on timeout).
559 """
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100560 return self._wait_for_handle(handle, timeout, False)
561
562 def _wait_cancel(self, event, done_callback):
563 fut = self._wait_for_handle(event, None, True)
564 # add_done_callback() cannot be used because the wait may only complete
565 # in IocpProactor.close(), while the event loop is not running.
566 fut._done_callback = done_callback
567 return fut
568
569 def _wait_for_handle(self, handle, timeout, _is_cancel):
Guido van Rossum90fb9142013-10-30 14:44:05 -0700570 if timeout is None:
571 ms = _winapi.INFINITE
572 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100573 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
574 # round away from zero to wait *at least* timeout seconds.
575 ms = math.ceil(timeout * 1e3)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700576
577 # We only create ov so we can use ov.address as a key for the cache.
578 ov = _overlapped.Overlapped(NULL)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100579 wait_handle = _overlapped.RegisterWaitWithQueue(
Guido van Rossum90fb9142013-10-30 14:44:05 -0700580 handle, self._iocp, ov.address, ms)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100581 if _is_cancel:
582 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
583 else:
584 f = _WaitHandleFuture(ov, handle, wait_handle, self,
585 loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200586 if f._source_traceback:
587 del f._source_traceback[-1]
Guido van Rossum90fb9142013-10-30 14:44:05 -0700588
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100589 def finish_wait_for_handle(trans, key, ov):
Richard Oudkerk71196e72013-11-24 17:50:40 +0000590 # Note that this second wait means that we should only use
591 # this with handles types where a successful wait has no
592 # effect. So events or processes are all right, but locks
593 # or semaphores are not. Also note if the handle is
594 # signalled and then quickly reset, then we may return
595 # False even though we have not timed out.
Victor Stinner313a9802014-07-29 12:58:23 +0200596 return f._poll()
Guido van Rossum90fb9142013-10-30 14:44:05 -0700597
Victor Stinner313a9802014-07-29 12:58:23 +0200598 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700599 return f
600
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700601 def _register_with_iocp(self, obj):
602 # To get notifications of finished ops on this objects sent to the
603 # completion port, were must register the handle.
604 if obj not in self._registered:
605 self._registered.add(obj)
606 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
607 # XXX We could also use SetFileCompletionNotificationModes()
608 # to avoid sending notifications to completion port of ops
609 # that succeed immediately.
610
Victor Stinner2b77c542015-01-22 23:50:03 +0100611 def _register(self, ov, obj, callback):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700612 # Return a future which will be set with the result of the
613 # operation when it completes. The future's value is actually
614 # the value returned by callback().
615 f = _OverlappedFuture(ov, loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200616 if f._source_traceback:
617 del f._source_traceback[-1]
Victor Stinner2b77c542015-01-22 23:50:03 +0100618 if not ov.pending:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700619 # The operation has completed, so no need to postpone the
620 # work. We cannot take this short cut if we need the
621 # NumberOfBytes, CompletionKey values returned by
622 # PostQueuedCompletionStatus().
623 try:
624 value = callback(None, None, ov)
625 except OSError as e:
626 f.set_exception(e)
627 else:
628 f.set_result(value)
Victor Stinner42d3bde2014-07-28 00:18:43 +0200629 # Even if GetOverlappedResult() was called, we have to wait for the
630 # notification of the completion in GetQueuedCompletionStatus().
631 # Register the overlapped operation to keep a reference to the
632 # OVERLAPPED object, otherwise the memory is freed and Windows may
633 # read uninitialized memory.
Victor Stinner2b77c542015-01-22 23:50:03 +0100634
635 # Register the overlapped operation for later. Note that
636 # we only store obj to prevent it from being garbage
637 # collected too early.
638 self._cache[ov.address] = (f, ov, obj, callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700639 return f
640
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100641 def _unregister(self, ov):
642 """Unregister an overlapped object.
643
644 Call this method when its future has been cancelled. The event can
645 already be signalled (pending in the proactor event queue). It is also
646 safe if the event is never signalled (because it was cancelled).
647 """
648 self._unregistered.append(ov)
649
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700650 def _get_accept_socket(self, family):
651 s = socket.socket(family)
652 s.settimeout(0)
653 return s
654
655 def _poll(self, timeout=None):
656 if timeout is None:
657 ms = INFINITE
658 elif timeout < 0:
659 raise ValueError("negative timeout")
660 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100661 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
662 # round away from zero to wait *at least* timeout seconds.
663 ms = math.ceil(timeout * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700664 if ms >= INFINITE:
665 raise ValueError("timeout too big")
Victor Stinner313a9802014-07-29 12:58:23 +0200666
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700667 while True:
668 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
669 if status is None:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100670 break
Victor Stinner313a9802014-07-29 12:58:23 +0200671 ms = 0
672
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700673 err, transferred, key, address = status
674 try:
675 f, ov, obj, callback = self._cache.pop(address)
676 except KeyError:
Victor Stinner42d3bde2014-07-28 00:18:43 +0200677 if self._loop.get_debug():
678 self._loop.call_exception_handler({
679 'message': ('GetQueuedCompletionStatus() returned an '
680 'unexpected event'),
681 'status': ('err=%s transferred=%s key=%#x address=%#x'
682 % (err, transferred, key, address)),
683 })
684
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700685 # key is either zero, or it is used to return a pipe
686 # handle which should be closed to avoid a leak.
687 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
688 _winapi.CloseHandle(key)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700689 continue
Victor Stinner51e44ea2014-07-26 00:58:34 +0200690
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700691 if obj in self._stopped_serving:
692 f.cancel()
Victor Stinner42d3bde2014-07-28 00:18:43 +0200693 # Don't call the callback if _register() already read the result or
694 # if the overlapped has been cancelled
695 elif not f.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700696 try:
697 value = callback(transferred, key, ov)
698 except OSError as e:
699 f.set_exception(e)
700 self._results.append(f)
701 else:
702 f.set_result(value)
703 self._results.append(f)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700704
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100705 # Remove unregisted futures
706 for ov in self._unregistered:
707 self._cache.pop(ov.address, None)
708 self._unregistered.clear()
709
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700710 def _stop_serving(self, obj):
711 # obj is a socket or pipe handle. It will be closed in
712 # BaseProactorEventLoop._stop_serving() which will make any
713 # pending operations fail quickly.
714 self._stopped_serving.add(obj)
715
716 def close(self):
717 # Cancel remaining registered operations.
Victor Stinnerfea6a102014-07-25 00:54:53 +0200718 for address, (fut, ov, obj, callback) in list(self._cache.items()):
Victor Stinner3d2256f2015-01-26 11:02:59 +0100719 if fut.cancelled():
Victor Stinner752aba72015-01-22 22:47:13 +0100720 # Nothing to do with cancelled futures
721 pass
Victor Stinner1ca93922015-01-22 00:17:54 +0100722 elif isinstance(fut, _WaitCancelFuture):
723 # _WaitCancelFuture must not be cancelled
724 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700725 else:
726 try:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200727 fut.cancel()
728 except OSError as exc:
729 if self._loop is not None:
730 context = {
731 'message': 'Cancelling a future failed',
732 'exception': exc,
733 'future': fut,
734 }
735 if fut._source_traceback:
736 context['source_traceback'] = fut._source_traceback
737 self._loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700738
739 while self._cache:
740 if not self._poll(1):
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700741 logger.debug('taking long time to close proactor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700742
743 self._results = []
744 if self._iocp is not None:
745 _winapi.CloseHandle(self._iocp)
746 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700747
Victor Stinnerfea6a102014-07-25 00:54:53 +0200748 def __del__(self):
749 self.close()
750
Guido van Rossum59691282013-10-30 14:52:03 -0700751
752class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
753
754 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
755 self._proc = windows_utils.Popen(
756 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
757 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700758
Guido van Rossum59691282013-10-30 14:52:03 -0700759 def callback(f):
760 returncode = self._proc.poll()
761 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700762
Guido van Rossum59691282013-10-30 14:52:03 -0700763 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
764 f.add_done_callback(callback)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800765
766
767SelectorEventLoop = _WindowsSelectorEventLoop
768
769
770class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
771 _loop_factory = SelectorEventLoop
772
773
774DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy