blob: 6c7e0580422b0e573a7d1de9eff38be462deb885 [file] [log] [blame]
Yury Selivanovb0b0e622014-02-18 22:27:48 -05001"""Selector and proactor event loops for Windows."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Victor Stinnerf2e17682014-01-31 16:25:24 +01003import _winapi
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import errno
Victor Stinnerf2e17682014-01-31 16:25:24 +01005import math
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import socket
Victor Stinnerf2e17682014-01-31 16:25:24 +01007import struct
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010from . import events
Guido van Rossum59691282013-10-30 14:52:03 -070011from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012from . import futures
13from . import proactor_events
14from . import selector_events
15from . import tasks
16from . import windows_utils
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import _overlapped
Victor Stinnerf951d282014-06-29 00:46:45 +020018from .coroutines import coroutine
19from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080022__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
23 'DefaultEventLoopPolicy',
24 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
27NULL = 0
28INFINITE = 0xffffffff
29ERROR_CONNECTION_REFUSED = 1225
30ERROR_CONNECTION_ABORTED = 1236
31
Victor Stinner7ffa2c52015-01-22 22:55:08 +010032# Initial delay in seconds for connect_pipe() before retrying to connect
33CONNECT_PIPE_INIT_DELAY = 0.001
34
35# Maximum delay in seconds for connect_pipe() before retrying to connect
36CONNECT_PIPE_MAX_DELAY = 0.100
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
39class _OverlappedFuture(futures.Future):
40 """Subclass of Future which represents an overlapped operation.
41
42 Cancelling it will immediately cancel the overlapped operation.
43 """
44
45 def __init__(self, ov, *, loop=None):
46 super().__init__(loop=loop)
Victor Stinnerfea6a102014-07-25 00:54:53 +020047 if self._source_traceback:
48 del self._source_traceback[-1]
Victor Stinner18a28dc2014-07-25 13:05:20 +020049 self._ov = ov
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
Victor Stinner313a9802014-07-29 12:58:23 +020051 def _repr_info(self):
52 info = super()._repr_info()
Victor Stinner18a28dc2014-07-25 13:05:20 +020053 if self._ov is not None:
54 state = 'pending' if self._ov.pending else 'completed'
Victor Stinner313a9802014-07-29 12:58:23 +020055 info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address))
56 return info
Victor Stinnere912e652014-07-12 03:11:53 +020057
Victor Stinner18a28dc2014-07-25 13:05:20 +020058 def _cancel_overlapped(self):
59 if self._ov is None:
60 return
61 try:
62 self._ov.cancel()
63 except OSError as exc:
64 context = {
65 'message': 'Cancelling an overlapped future failed',
66 'exception': exc,
67 'future': self,
68 }
69 if self._source_traceback:
70 context['source_traceback'] = self._source_traceback
71 self._loop.call_exception_handler(context)
72 self._ov = None
73
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 def cancel(self):
Victor Stinner18a28dc2014-07-25 13:05:20 +020075 self._cancel_overlapped()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 return super().cancel()
77
Victor Stinner18a28dc2014-07-25 13:05:20 +020078 def set_exception(self, exception):
79 super().set_exception(exception)
80 self._cancel_overlapped()
81
Victor Stinner51e44ea2014-07-26 00:58:34 +020082 def set_result(self, result):
83 super().set_result(result)
84 self._ov = None
85
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086
Victor Stinnerd0a28de2015-01-21 23:39:51 +010087class _BaseWaitHandleFuture(futures.Future):
Guido van Rossum90fb9142013-10-30 14:44:05 -070088 """Subclass of Future which represents a wait handle."""
89
Victor Stinnerd0a28de2015-01-21 23:39:51 +010090 def __init__(self, ov, handle, wait_handle, *, loop=None):
Guido van Rossum90fb9142013-10-30 14:44:05 -070091 super().__init__(loop=loop)
Victor Stinner313a9802014-07-29 12:58:23 +020092 if self._source_traceback:
93 del self._source_traceback[-1]
Victor Stinnerd0a28de2015-01-21 23:39:51 +010094 # Keep a reference to the Overlapped object to keep it alive until the
95 # wait is unregistered
Victor Stinner313a9802014-07-29 12:58:23 +020096 self._ov = ov
Victor Stinner18a28dc2014-07-25 13:05:20 +020097 self._handle = handle
Guido van Rossum90fb9142013-10-30 14:44:05 -070098 self._wait_handle = wait_handle
99
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100100 # Should we call UnregisterWaitEx() if the wait completes
101 # or is cancelled?
102 self._registered = True
103
Victor Stinner18a28dc2014-07-25 13:05:20 +0200104 def _poll(self):
105 # non-blocking wait: use a timeout of 0 millisecond
106 return (_winapi.WaitForSingleObject(self._handle, 0) ==
107 _winapi.WAIT_OBJECT_0)
108
Victor Stinner313a9802014-07-29 12:58:23 +0200109 def _repr_info(self):
110 info = super()._repr_info()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100111 info.append('handle=%#x' % self._handle)
112 if self._handle is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200113 state = 'signaled' if self._poll() else 'waiting'
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100114 info.append(state)
115 if self._wait_handle is not None:
116 info.append('wait_handle=%#x' % self._wait_handle)
Victor Stinner313a9802014-07-29 12:58:23 +0200117 return info
Victor Stinner18a28dc2014-07-25 13:05:20 +0200118
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100119 def _unregister_wait_cb(self, fut):
120 # The wait was unregistered: it's not safe to destroy the Overlapped
121 # object
122 self._ov = None
123
Victor Stinner313a9802014-07-29 12:58:23 +0200124 def _unregister_wait(self):
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100125 if not self._registered:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200126 return
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100127 self._registered = False
128
Guido van Rossum90fb9142013-10-30 14:44:05 -0700129 try:
130 _overlapped.UnregisterWait(self._wait_handle)
Victor Stinnerb2614752014-08-25 23:20:52 +0200131 except OSError as exc:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100132 self._wait_handle = None
133 if exc.winerror == _overlapped.ERROR_IO_PENDING:
134 # ERROR_IO_PENDING is not an error, the wait was unregistered
135 self._unregister_wait_cb(None)
136 elif exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerb2614752014-08-25 23:20:52 +0200137 context = {
138 'message': 'Failed to unregister the wait handle',
139 'exception': exc,
140 'future': self,
141 }
142 if self._source_traceback:
143 context['source_traceback'] = self._source_traceback
144 self._loop.call_exception_handler(context)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100145 else:
146 self._wait_handle = None
147 self._unregister_wait_cb(None)
Victor Stinnerfea6a102014-07-25 00:54:53 +0200148
149 def cancel(self):
Victor Stinner313a9802014-07-29 12:58:23 +0200150 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100151 return super().cancel()
Victor Stinner313a9802014-07-29 12:58:23 +0200152
153 def set_exception(self, exception):
Victor Stinner313a9802014-07-29 12:58:23 +0200154 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100155 super().set_exception(exception)
Victor Stinner313a9802014-07-29 12:58:23 +0200156
157 def set_result(self, result):
Victor Stinner313a9802014-07-29 12:58:23 +0200158 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100159 super().set_result(result)
160
161
162class _WaitCancelFuture(_BaseWaitHandleFuture):
163 """Subclass of Future which represents a wait for the cancellation of a
164 _WaitHandleFuture using an event.
165 """
166
167 def __init__(self, ov, event, wait_handle, *, loop=None):
168 super().__init__(ov, event, wait_handle, loop=loop)
169
170 self._done_callback = None
171
Victor Stinner1ca93922015-01-22 00:17:54 +0100172 def cancel(self):
173 raise RuntimeError("_WaitCancelFuture must not be cancelled")
174
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100175 def _schedule_callbacks(self):
176 super(_WaitCancelFuture, self)._schedule_callbacks()
177 if self._done_callback is not None:
178 self._done_callback(self)
179
180
181class _WaitHandleFuture(_BaseWaitHandleFuture):
182 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
183 super().__init__(ov, handle, wait_handle, loop=loop)
184 self._proactor = proactor
185 self._unregister_proactor = True
186 self._event = _overlapped.CreateEvent(None, True, False, None)
187 self._event_fut = None
188
189 def _unregister_wait_cb(self, fut):
190 if self._event is not None:
191 _winapi.CloseHandle(self._event)
192 self._event = None
193 self._event_fut = None
194
195 # If the wait was cancelled, the wait may never be signalled, so
196 # it's required to unregister it. Otherwise, IocpProactor.close() will
197 # wait forever for an event which will never come.
198 #
199 # If the IocpProactor already received the event, it's safe to call
200 # _unregister() because we kept a reference to the Overlapped object
201 # which is used as an unique key.
202 self._proactor._unregister(self._ov)
203 self._proactor = None
204
205 super()._unregister_wait_cb(fut)
206
207 def _unregister_wait(self):
208 if not self._registered:
209 return
210 self._registered = False
211
212 try:
213 _overlapped.UnregisterWaitEx(self._wait_handle, self._event)
214 except OSError as exc:
215 self._wait_handle = None
216 if exc.winerror == _overlapped.ERROR_IO_PENDING:
217 # ERROR_IO_PENDING is not an error, the wait was unregistered
218 self._unregister_wait_cb(None)
219 elif exc.winerror != _overlapped.ERROR_IO_PENDING:
220 context = {
221 'message': 'Failed to unregister the wait handle',
222 'exception': exc,
223 'future': self,
224 }
225 if self._source_traceback:
226 context['source_traceback'] = self._source_traceback
227 self._loop.call_exception_handler(context)
228 else:
229 self._wait_handle = None
230 self._event_fut = self._proactor._wait_cancel(
231 self._event,
232 self._unregister_wait_cb)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700233
234
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235class PipeServer(object):
236 """Class representing a pipe server.
237
238 This is much like a bound, listening socket.
239 """
240 def __init__(self, address):
241 self._address = address
242 self._free_instances = weakref.WeakSet()
Victor Stinnerb2614752014-08-25 23:20:52 +0200243 # initialize the pipe attribute before calling _server_pipe_handle()
244 # because this function can raise an exception and the destructor calls
245 # the close() method
246 self._pipe = None
247 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700248 self._pipe = self._server_pipe_handle(True)
249
250 def _get_unconnected_pipe(self):
251 # Create new instance and return previous one. This ensures
252 # that (until the server is closed) there is always at least
253 # one pipe handle for address. Therefore if a client attempt
254 # to connect it will not fail with FileNotFoundError.
255 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
256 return tmp
257
258 def _server_pipe_handle(self, first):
259 # Return a wrapper for a new pipe handle.
260 if self._address is None:
261 return None
262 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
263 if first:
264 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
265 h = _winapi.CreateNamedPipe(
266 self._address, flags,
267 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
268 _winapi.PIPE_WAIT,
269 _winapi.PIPE_UNLIMITED_INSTANCES,
270 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
271 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
272 pipe = windows_utils.PipeHandle(h)
273 self._free_instances.add(pipe)
274 return pipe
275
276 def close(self):
Victor Stinnerb2614752014-08-25 23:20:52 +0200277 if self._accept_pipe_future is not None:
278 self._accept_pipe_future.cancel()
279 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700280 # Close all instances which have not been connected to by a client.
281 if self._address is not None:
282 for pipe in self._free_instances:
283 pipe.close()
284 self._pipe = None
285 self._address = None
286 self._free_instances.clear()
287
288 __del__ = close
289
290
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800291class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700292 """Windows version of selector event loop."""
293
294 def _socketpair(self):
295 return windows_utils.socketpair()
296
297
298class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
299 """Windows version of proactor event loop using IOCP."""
300
301 def __init__(self, proactor=None):
302 if proactor is None:
303 proactor = IocpProactor()
304 super().__init__(proactor)
305
306 def _socketpair(self):
307 return windows_utils.socketpair()
308
Victor Stinnerf951d282014-06-29 00:46:45 +0200309 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700310 def create_pipe_connection(self, protocol_factory, address):
311 f = self._proactor.connect_pipe(address)
312 pipe = yield from f
313 protocol = protocol_factory()
314 trans = self._make_duplex_pipe_transport(pipe, protocol,
315 extra={'addr': address})
316 return trans, protocol
317
Victor Stinnerf951d282014-06-29 00:46:45 +0200318 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319 def start_serving_pipe(self, protocol_factory, address):
320 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700321
Victor Stinnerb2614752014-08-25 23:20:52 +0200322 def loop_accept_pipe(f=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323 pipe = None
324 try:
325 if f:
326 pipe = f.result()
327 server._free_instances.discard(pipe)
328 protocol = protocol_factory()
329 self._make_duplex_pipe_transport(
330 pipe, protocol, extra={'addr': address})
331 pipe = server._get_unconnected_pipe()
332 if pipe is None:
333 return
334 f = self._proactor.accept_pipe(pipe)
Yury Selivanovff827f02014-02-18 18:02:19 -0500335 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 if pipe and pipe.fileno() != -1:
Yury Selivanovff827f02014-02-18 18:02:19 -0500337 self.call_exception_handler({
338 'message': 'Pipe accept failed',
339 'exception': exc,
340 'pipe': pipe,
341 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342 pipe.close()
Victor Stinnerb2614752014-08-25 23:20:52 +0200343 elif self._debug:
344 logger.warning("Accept pipe failed on pipe %r",
345 pipe, exc_info=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346 except futures.CancelledError:
347 if pipe:
348 pipe.close()
349 else:
Victor Stinnerb2614752014-08-25 23:20:52 +0200350 server._accept_pipe_future = f
351 f.add_done_callback(loop_accept_pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700352
Victor Stinnerb2614752014-08-25 23:20:52 +0200353 self.call_soon(loop_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 return [server]
355
Victor Stinnerf951d282014-06-29 00:46:45 +0200356 @coroutine
Guido van Rossum59691282013-10-30 14:52:03 -0700357 def _make_subprocess_transport(self, protocol, args, shell,
358 stdin, stdout, stderr, bufsize,
359 extra=None, **kwargs):
360 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
361 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800362 extra=extra, **kwargs)
Victor Stinner4bf22e02015-01-15 14:24:22 +0100363 try:
364 yield from transp._post_init()
365 except:
366 transp.close()
367 raise
368
Guido van Rossum59691282013-10-30 14:52:03 -0700369 return transp
370
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371
372class IocpProactor:
373 """Proactor implementation using IOCP."""
374
375 def __init__(self, concurrency=0xffffffff):
376 self._loop = None
377 self._results = []
378 self._iocp = _overlapped.CreateIoCompletionPort(
379 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
380 self._cache = {}
381 self._registered = weakref.WeakSet()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100382 self._unregistered = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383 self._stopped_serving = weakref.WeakSet()
384
Victor Stinnerfea6a102014-07-25 00:54:53 +0200385 def __repr__(self):
386 return ('<%s overlapped#=%s result#=%s>'
387 % (self.__class__.__name__, len(self._cache),
388 len(self._results)))
389
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390 def set_loop(self, loop):
391 self._loop = loop
392
393 def select(self, timeout=None):
394 if not self._results:
395 self._poll(timeout)
396 tmp = self._results
397 self._results = []
398 return tmp
399
400 def recv(self, conn, nbytes, flags=0):
401 self._register_with_iocp(conn)
402 ov = _overlapped.Overlapped(NULL)
403 if isinstance(conn, socket.socket):
404 ov.WSARecv(conn.fileno(), nbytes, flags)
405 else:
406 ov.ReadFile(conn.fileno(), nbytes)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700407
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100408 def finish_recv(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 try:
410 return ov.getresult()
411 except OSError as exc:
412 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
413 raise ConnectionResetError(*exc.args)
414 else:
415 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700416
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100417 return self._register(ov, conn, finish_recv)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418
419 def send(self, conn, buf, flags=0):
420 self._register_with_iocp(conn)
421 ov = _overlapped.Overlapped(NULL)
422 if isinstance(conn, socket.socket):
423 ov.WSASend(conn.fileno(), buf, flags)
424 else:
425 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700426
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100427 def finish_send(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428 try:
429 return ov.getresult()
430 except OSError as exc:
431 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
432 raise ConnectionResetError(*exc.args)
433 else:
434 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700435
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100436 return self._register(ov, conn, finish_send)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437
438 def accept(self, listener):
439 self._register_with_iocp(listener)
440 conn = self._get_accept_socket(listener.family)
441 ov = _overlapped.Overlapped(NULL)
442 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700443
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444 def finish_accept(trans, key, ov):
445 ov.getresult()
446 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
447 buf = struct.pack('@P', listener.fileno())
448 conn.setsockopt(socket.SOL_SOCKET,
449 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
450 conn.settimeout(listener.gettimeout())
451 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700452
Victor Stinnerf951d282014-06-29 00:46:45 +0200453 @coroutine
Victor Stinner7de26462014-01-11 00:03:21 +0100454 def accept_coro(future, conn):
455 # Coroutine closing the accept socket if the future is cancelled
456 try:
457 yield from future
458 except futures.CancelledError:
459 conn.close()
460 raise
461
462 future = self._register(ov, listener, finish_accept)
463 coro = accept_coro(future, conn)
464 tasks.async(coro, loop=self._loop)
465 return future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466
467 def connect(self, conn, address):
468 self._register_with_iocp(conn)
469 # The socket needs to be locally bound before we call ConnectEx().
470 try:
471 _overlapped.BindLocal(conn.fileno(), conn.family)
472 except OSError as e:
473 if e.winerror != errno.WSAEINVAL:
474 raise
475 # Probably already locally bound; check using getsockname().
476 if conn.getsockname()[1] == 0:
477 raise
478 ov = _overlapped.Overlapped(NULL)
479 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700480
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700481 def finish_connect(trans, key, ov):
482 ov.getresult()
483 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
484 conn.setsockopt(socket.SOL_SOCKET,
485 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
486 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700487
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700488 return self._register(ov, conn, finish_connect)
489
490 def accept_pipe(self, pipe):
491 self._register_with_iocp(pipe)
492 ov = _overlapped.Overlapped(NULL)
Victor Stinner2b77c542015-01-22 23:50:03 +0100493 connected = ov.ConnectNamedPipe(pipe.fileno())
494
495 if connected:
496 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
497 # that the pipe is connected. There is no need to wait for the
498 # completion of the connection.
499 f = futures.Future(loop=self._loop)
500 f.set_result(pipe)
501 return f
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700502
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100503 def finish_accept_pipe(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700504 ov.getresult()
505 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700506
Victor Stinner2b77c542015-01-22 23:50:03 +0100507 return self._register(ov, pipe, finish_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700508
Victor Stinner7ffa2c52015-01-22 22:55:08 +0100509 def _connect_pipe(self, fut, address, delay):
510 # Unfortunately there is no way to do an overlapped connect to a pipe.
511 # Call CreateFile() in a loop until it doesn't fail with
512 # ERROR_PIPE_BUSY
513 try:
514 handle = _overlapped.ConnectPipe(address)
515 except OSError as exc:
516 if exc.winerror == _overlapped.ERROR_PIPE_BUSY:
517 # Polling: retry later
518 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
519 self._loop.call_later(delay,
520 self._connect_pipe, fut, address, delay)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700521 else:
Victor Stinner7ffa2c52015-01-22 22:55:08 +0100522 fut.set_exception(exc)
523 else:
524 pipe = windows_utils.PipeHandle(handle)
525 fut.set_result(pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700526
Victor Stinner7ffa2c52015-01-22 22:55:08 +0100527 def connect_pipe(self, address):
528 fut = futures.Future(loop=self._loop)
529 self._connect_pipe(fut, address, CONNECT_PIPE_INIT_DELAY)
530 return fut
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531
Guido van Rossum90fb9142013-10-30 14:44:05 -0700532 def wait_for_handle(self, handle, timeout=None):
Victor Stinner4d825b42014-12-19 17:10:44 +0100533 """Wait for a handle.
534
535 Return a Future object. The result of the future is True if the wait
536 completed, or False if the wait did not complete (on timeout).
537 """
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100538 return self._wait_for_handle(handle, timeout, False)
539
540 def _wait_cancel(self, event, done_callback):
541 fut = self._wait_for_handle(event, None, True)
542 # add_done_callback() cannot be used because the wait may only complete
543 # in IocpProactor.close(), while the event loop is not running.
544 fut._done_callback = done_callback
545 return fut
546
547 def _wait_for_handle(self, handle, timeout, _is_cancel):
Guido van Rossum90fb9142013-10-30 14:44:05 -0700548 if timeout is None:
549 ms = _winapi.INFINITE
550 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100551 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
552 # round away from zero to wait *at least* timeout seconds.
553 ms = math.ceil(timeout * 1e3)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700554
555 # We only create ov so we can use ov.address as a key for the cache.
556 ov = _overlapped.Overlapped(NULL)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100557 wait_handle = _overlapped.RegisterWaitWithQueue(
Guido van Rossum90fb9142013-10-30 14:44:05 -0700558 handle, self._iocp, ov.address, ms)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100559 if _is_cancel:
560 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
561 else:
562 f = _WaitHandleFuture(ov, handle, wait_handle, self,
563 loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200564 if f._source_traceback:
565 del f._source_traceback[-1]
Guido van Rossum90fb9142013-10-30 14:44:05 -0700566
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100567 def finish_wait_for_handle(trans, key, ov):
Richard Oudkerk71196e72013-11-24 17:50:40 +0000568 # Note that this second wait means that we should only use
569 # this with handles types where a successful wait has no
570 # effect. So events or processes are all right, but locks
571 # or semaphores are not. Also note if the handle is
572 # signalled and then quickly reset, then we may return
573 # False even though we have not timed out.
Victor Stinner313a9802014-07-29 12:58:23 +0200574 return f._poll()
Guido van Rossum90fb9142013-10-30 14:44:05 -0700575
Victor Stinner313a9802014-07-29 12:58:23 +0200576 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700577 return f
578
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579 def _register_with_iocp(self, obj):
580 # To get notifications of finished ops on this objects sent to the
581 # completion port, were must register the handle.
582 if obj not in self._registered:
583 self._registered.add(obj)
584 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
585 # XXX We could also use SetFileCompletionNotificationModes()
586 # to avoid sending notifications to completion port of ops
587 # that succeed immediately.
588
Victor Stinner2b77c542015-01-22 23:50:03 +0100589 def _register(self, ov, obj, callback):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700590 # Return a future which will be set with the result of the
591 # operation when it completes. The future's value is actually
592 # the value returned by callback().
593 f = _OverlappedFuture(ov, loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200594 if f._source_traceback:
595 del f._source_traceback[-1]
Victor Stinner2b77c542015-01-22 23:50:03 +0100596 if not ov.pending:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700597 # The operation has completed, so no need to postpone the
598 # work. We cannot take this short cut if we need the
599 # NumberOfBytes, CompletionKey values returned by
600 # PostQueuedCompletionStatus().
601 try:
602 value = callback(None, None, ov)
603 except OSError as e:
604 f.set_exception(e)
605 else:
606 f.set_result(value)
Victor Stinner42d3bde2014-07-28 00:18:43 +0200607 # Even if GetOverlappedResult() was called, we have to wait for the
608 # notification of the completion in GetQueuedCompletionStatus().
609 # Register the overlapped operation to keep a reference to the
610 # OVERLAPPED object, otherwise the memory is freed and Windows may
611 # read uninitialized memory.
Victor Stinner2b77c542015-01-22 23:50:03 +0100612
613 # Register the overlapped operation for later. Note that
614 # we only store obj to prevent it from being garbage
615 # collected too early.
616 self._cache[ov.address] = (f, ov, obj, callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700617 return f
618
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100619 def _unregister(self, ov):
620 """Unregister an overlapped object.
621
622 Call this method when its future has been cancelled. The event can
623 already be signalled (pending in the proactor event queue). It is also
624 safe if the event is never signalled (because it was cancelled).
625 """
626 self._unregistered.append(ov)
627
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628 def _get_accept_socket(self, family):
629 s = socket.socket(family)
630 s.settimeout(0)
631 return s
632
633 def _poll(self, timeout=None):
634 if timeout is None:
635 ms = INFINITE
636 elif timeout < 0:
637 raise ValueError("negative timeout")
638 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100639 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
640 # round away from zero to wait *at least* timeout seconds.
641 ms = math.ceil(timeout * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700642 if ms >= INFINITE:
643 raise ValueError("timeout too big")
Victor Stinner313a9802014-07-29 12:58:23 +0200644
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700645 while True:
646 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
647 if status is None:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100648 break
Victor Stinner313a9802014-07-29 12:58:23 +0200649 ms = 0
650
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700651 err, transferred, key, address = status
652 try:
653 f, ov, obj, callback = self._cache.pop(address)
654 except KeyError:
Victor Stinner42d3bde2014-07-28 00:18:43 +0200655 if self._loop.get_debug():
656 self._loop.call_exception_handler({
657 'message': ('GetQueuedCompletionStatus() returned an '
658 'unexpected event'),
659 'status': ('err=%s transferred=%s key=%#x address=%#x'
660 % (err, transferred, key, address)),
661 })
662
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700663 # key is either zero, or it is used to return a pipe
664 # handle which should be closed to avoid a leak.
665 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
666 _winapi.CloseHandle(key)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700667 continue
Victor Stinner51e44ea2014-07-26 00:58:34 +0200668
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700669 if obj in self._stopped_serving:
670 f.cancel()
Victor Stinner42d3bde2014-07-28 00:18:43 +0200671 # Don't call the callback if _register() already read the result or
672 # if the overlapped has been cancelled
673 elif not f.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700674 try:
675 value = callback(transferred, key, ov)
676 except OSError as e:
677 f.set_exception(e)
678 self._results.append(f)
679 else:
680 f.set_result(value)
681 self._results.append(f)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700682
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100683 # Remove unregisted futures
684 for ov in self._unregistered:
685 self._cache.pop(ov.address, None)
686 self._unregistered.clear()
687
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700688 def _stop_serving(self, obj):
689 # obj is a socket or pipe handle. It will be closed in
690 # BaseProactorEventLoop._stop_serving() which will make any
691 # pending operations fail quickly.
692 self._stopped_serving.add(obj)
693
694 def close(self):
695 # Cancel remaining registered operations.
Victor Stinnerfea6a102014-07-25 00:54:53 +0200696 for address, (fut, ov, obj, callback) in list(self._cache.items()):
Victor Stinner3d2256f2015-01-26 11:02:59 +0100697 if fut.cancelled():
Victor Stinner752aba72015-01-22 22:47:13 +0100698 # Nothing to do with cancelled futures
699 pass
Victor Stinner1ca93922015-01-22 00:17:54 +0100700 elif isinstance(fut, _WaitCancelFuture):
701 # _WaitCancelFuture must not be cancelled
702 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700703 else:
704 try:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200705 fut.cancel()
706 except OSError as exc:
707 if self._loop is not None:
708 context = {
709 'message': 'Cancelling a future failed',
710 'exception': exc,
711 'future': fut,
712 }
713 if fut._source_traceback:
714 context['source_traceback'] = fut._source_traceback
715 self._loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700716
717 while self._cache:
718 if not self._poll(1):
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700719 logger.debug('taking long time to close proactor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700720
721 self._results = []
722 if self._iocp is not None:
723 _winapi.CloseHandle(self._iocp)
724 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700725
Victor Stinnerfea6a102014-07-25 00:54:53 +0200726 def __del__(self):
727 self.close()
728
Guido van Rossum59691282013-10-30 14:52:03 -0700729
730class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
731
732 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
733 self._proc = windows_utils.Popen(
734 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
735 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700736
Guido van Rossum59691282013-10-30 14:52:03 -0700737 def callback(f):
738 returncode = self._proc.poll()
739 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700740
Guido van Rossum59691282013-10-30 14:52:03 -0700741 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
742 f.add_done_callback(callback)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800743
744
745SelectorEventLoop = _WindowsSelectorEventLoop
746
747
748class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
749 _loop_factory = SelectorEventLoop
750
751
752DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy