blob: 41be8da2a043a5856245a4838c3d29b01ddce70b [file] [log] [blame]
Yury Selivanovb0b0e622014-02-18 22:27:48 -05001"""Selector and proactor event loops for Windows."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Victor Stinnerf2e17682014-01-31 16:25:24 +01003import _winapi
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import errno
Victor Stinnerf2e17682014-01-31 16:25:24 +01005import math
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import socket
Victor Stinnerf2e17682014-01-31 16:25:24 +01007import struct
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010from . import events
Guido van Rossum59691282013-10-30 14:52:03 -070011from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012from . import futures
13from . import proactor_events
14from . import selector_events
15from . import tasks
16from . import windows_utils
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import _overlapped
Victor Stinnerf951d282014-06-29 00:46:45 +020018from .coroutines import coroutine
19from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080022__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
23 'DefaultEventLoopPolicy',
24 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
27NULL = 0
28INFINITE = 0xffffffff
29ERROR_CONNECTION_REFUSED = 1225
30ERROR_CONNECTION_ABORTED = 1236
31
32
33class _OverlappedFuture(futures.Future):
34 """Subclass of Future which represents an overlapped operation.
35
36 Cancelling it will immediately cancel the overlapped operation.
37 """
38
39 def __init__(self, ov, *, loop=None):
40 super().__init__(loop=loop)
Victor Stinnerfea6a102014-07-25 00:54:53 +020041 if self._source_traceback:
42 del self._source_traceback[-1]
Victor Stinner18a28dc2014-07-25 13:05:20 +020043 self._ov = ov
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044
Victor Stinnere912e652014-07-12 03:11:53 +020045 def __repr__(self):
46 info = [self._state.lower()]
Victor Stinner18a28dc2014-07-25 13:05:20 +020047 if self._ov is not None:
48 state = 'pending' if self._ov.pending else 'completed'
49 info.append('overlapped=<%s, %#x>' % (state, self._ov.address))
Victor Stinnere912e652014-07-12 03:11:53 +020050 if self._state == futures._FINISHED:
51 info.append(self._format_result())
52 if self._callbacks:
53 info.append(self._format_callbacks())
54 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
55
Victor Stinner18a28dc2014-07-25 13:05:20 +020056 def _cancel_overlapped(self):
57 if self._ov is None:
58 return
59 try:
60 self._ov.cancel()
61 except OSError as exc:
62 context = {
63 'message': 'Cancelling an overlapped future failed',
64 'exception': exc,
65 'future': self,
66 }
67 if self._source_traceback:
68 context['source_traceback'] = self._source_traceback
69 self._loop.call_exception_handler(context)
70 self._ov = None
71
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070072 def cancel(self):
Victor Stinner18a28dc2014-07-25 13:05:20 +020073 self._cancel_overlapped()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 return super().cancel()
75
Victor Stinner18a28dc2014-07-25 13:05:20 +020076 def set_exception(self, exception):
77 super().set_exception(exception)
78 self._cancel_overlapped()
79
Victor Stinner51e44ea2014-07-26 00:58:34 +020080 def set_result(self, result):
81 super().set_result(result)
82 self._ov = None
83
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084
Guido van Rossum90fb9142013-10-30 14:44:05 -070085class _WaitHandleFuture(futures.Future):
86 """Subclass of Future which represents a wait handle."""
87
Victor Stinner18a28dc2014-07-25 13:05:20 +020088 def __init__(self, handle, wait_handle, *, loop=None):
Guido van Rossum90fb9142013-10-30 14:44:05 -070089 super().__init__(loop=loop)
Victor Stinner18a28dc2014-07-25 13:05:20 +020090 self._handle = handle
Guido van Rossum90fb9142013-10-30 14:44:05 -070091 self._wait_handle = wait_handle
92
Victor Stinner18a28dc2014-07-25 13:05:20 +020093 def _poll(self):
94 # non-blocking wait: use a timeout of 0 millisecond
95 return (_winapi.WaitForSingleObject(self._handle, 0) ==
96 _winapi.WAIT_OBJECT_0)
97
98 def __repr__(self):
99 info = [self._state.lower()]
100 if self._wait_handle:
101 state = 'pending' if self._poll() else 'completed'
102 info.append('wait_handle=<%s, %#x>' % (state, self._wait_handle))
103 info.append('handle=<%#x>' % self._handle)
104 if self._state == futures._FINISHED:
105 info.append(self._format_result())
106 if self._callbacks:
107 info.append(self._format_callbacks())
108 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
109
Victor Stinnerfea6a102014-07-25 00:54:53 +0200110 def _unregister(self):
111 if self._wait_handle is None:
112 return
Guido van Rossum90fb9142013-10-30 14:44:05 -0700113 try:
114 _overlapped.UnregisterWait(self._wait_handle)
115 except OSError as e:
116 if e.winerror != _overlapped.ERROR_IO_PENDING:
117 raise
Victor Stinnerfea6a102014-07-25 00:54:53 +0200118 # ERROR_IO_PENDING is not an error, the wait was unregistered
119 self._wait_handle = None
120
121 def cancel(self):
122 self._unregister()
Victor Stinner18a28dc2014-07-25 13:05:20 +0200123 return super().cancel()
Guido van Rossum90fb9142013-10-30 14:44:05 -0700124
125
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700126class PipeServer(object):
127 """Class representing a pipe server.
128
129 This is much like a bound, listening socket.
130 """
131 def __init__(self, address):
132 self._address = address
133 self._free_instances = weakref.WeakSet()
134 self._pipe = self._server_pipe_handle(True)
135
136 def _get_unconnected_pipe(self):
137 # Create new instance and return previous one. This ensures
138 # that (until the server is closed) there is always at least
139 # one pipe handle for address. Therefore if a client attempt
140 # to connect it will not fail with FileNotFoundError.
141 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
142 return tmp
143
144 def _server_pipe_handle(self, first):
145 # Return a wrapper for a new pipe handle.
146 if self._address is None:
147 return None
148 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
149 if first:
150 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
151 h = _winapi.CreateNamedPipe(
152 self._address, flags,
153 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
154 _winapi.PIPE_WAIT,
155 _winapi.PIPE_UNLIMITED_INSTANCES,
156 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
157 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
158 pipe = windows_utils.PipeHandle(h)
159 self._free_instances.add(pipe)
160 return pipe
161
162 def close(self):
163 # Close all instances which have not been connected to by a client.
164 if self._address is not None:
165 for pipe in self._free_instances:
166 pipe.close()
167 self._pipe = None
168 self._address = None
169 self._free_instances.clear()
170
171 __del__ = close
172
173
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800174class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700175 """Windows version of selector event loop."""
176
177 def _socketpair(self):
178 return windows_utils.socketpair()
179
180
181class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
182 """Windows version of proactor event loop using IOCP."""
183
184 def __init__(self, proactor=None):
185 if proactor is None:
186 proactor = IocpProactor()
187 super().__init__(proactor)
188
189 def _socketpair(self):
190 return windows_utils.socketpair()
191
Victor Stinnerf951d282014-06-29 00:46:45 +0200192 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700193 def create_pipe_connection(self, protocol_factory, address):
194 f = self._proactor.connect_pipe(address)
195 pipe = yield from f
196 protocol = protocol_factory()
197 trans = self._make_duplex_pipe_transport(pipe, protocol,
198 extra={'addr': address})
199 return trans, protocol
200
Victor Stinnerf951d282014-06-29 00:46:45 +0200201 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700202 def start_serving_pipe(self, protocol_factory, address):
203 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700204
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700205 def loop(f=None):
206 pipe = None
207 try:
208 if f:
209 pipe = f.result()
210 server._free_instances.discard(pipe)
211 protocol = protocol_factory()
212 self._make_duplex_pipe_transport(
213 pipe, protocol, extra={'addr': address})
214 pipe = server._get_unconnected_pipe()
215 if pipe is None:
216 return
217 f = self._proactor.accept_pipe(pipe)
Yury Selivanovff827f02014-02-18 18:02:19 -0500218 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 if pipe and pipe.fileno() != -1:
Yury Selivanovff827f02014-02-18 18:02:19 -0500220 self.call_exception_handler({
221 'message': 'Pipe accept failed',
222 'exception': exc,
223 'pipe': pipe,
224 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700225 pipe.close()
226 except futures.CancelledError:
227 if pipe:
228 pipe.close()
229 else:
230 f.add_done_callback(loop)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700231
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232 self.call_soon(loop)
233 return [server]
234
Victor Stinnerf951d282014-06-29 00:46:45 +0200235 @coroutine
Guido van Rossum59691282013-10-30 14:52:03 -0700236 def _make_subprocess_transport(self, protocol, args, shell,
237 stdin, stdout, stderr, bufsize,
238 extra=None, **kwargs):
239 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
240 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800241 extra=extra, **kwargs)
Guido van Rossum59691282013-10-30 14:52:03 -0700242 yield from transp._post_init()
243 return transp
244
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700245
246class IocpProactor:
247 """Proactor implementation using IOCP."""
248
249 def __init__(self, concurrency=0xffffffff):
250 self._loop = None
251 self._results = []
252 self._iocp = _overlapped.CreateIoCompletionPort(
253 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
254 self._cache = {}
255 self._registered = weakref.WeakSet()
256 self._stopped_serving = weakref.WeakSet()
257
Victor Stinnerfea6a102014-07-25 00:54:53 +0200258 def __repr__(self):
259 return ('<%s overlapped#=%s result#=%s>'
260 % (self.__class__.__name__, len(self._cache),
261 len(self._results)))
262
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263 def set_loop(self, loop):
264 self._loop = loop
265
266 def select(self, timeout=None):
267 if not self._results:
268 self._poll(timeout)
269 tmp = self._results
270 self._results = []
271 return tmp
272
273 def recv(self, conn, nbytes, flags=0):
274 self._register_with_iocp(conn)
275 ov = _overlapped.Overlapped(NULL)
276 if isinstance(conn, socket.socket):
277 ov.WSARecv(conn.fileno(), nbytes, flags)
278 else:
279 ov.ReadFile(conn.fileno(), nbytes)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700280
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100281 def finish_recv(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282 try:
283 return ov.getresult()
284 except OSError as exc:
285 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
286 raise ConnectionResetError(*exc.args)
287 else:
288 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700289
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100290 return self._register(ov, conn, finish_recv)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291
292 def send(self, conn, buf, flags=0):
293 self._register_with_iocp(conn)
294 ov = _overlapped.Overlapped(NULL)
295 if isinstance(conn, socket.socket):
296 ov.WSASend(conn.fileno(), buf, flags)
297 else:
298 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700299
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100300 def finish_send(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700301 try:
302 return ov.getresult()
303 except OSError as exc:
304 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
305 raise ConnectionResetError(*exc.args)
306 else:
307 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700308
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100309 return self._register(ov, conn, finish_send)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700310
311 def accept(self, listener):
312 self._register_with_iocp(listener)
313 conn = self._get_accept_socket(listener.family)
314 ov = _overlapped.Overlapped(NULL)
315 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700316
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700317 def finish_accept(trans, key, ov):
318 ov.getresult()
319 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
320 buf = struct.pack('@P', listener.fileno())
321 conn.setsockopt(socket.SOL_SOCKET,
322 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
323 conn.settimeout(listener.gettimeout())
324 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700325
Victor Stinnerf951d282014-06-29 00:46:45 +0200326 @coroutine
Victor Stinner7de26462014-01-11 00:03:21 +0100327 def accept_coro(future, conn):
328 # Coroutine closing the accept socket if the future is cancelled
329 try:
330 yield from future
331 except futures.CancelledError:
332 conn.close()
333 raise
334
335 future = self._register(ov, listener, finish_accept)
336 coro = accept_coro(future, conn)
337 tasks.async(coro, loop=self._loop)
338 return future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339
340 def connect(self, conn, address):
341 self._register_with_iocp(conn)
342 # The socket needs to be locally bound before we call ConnectEx().
343 try:
344 _overlapped.BindLocal(conn.fileno(), conn.family)
345 except OSError as e:
346 if e.winerror != errno.WSAEINVAL:
347 raise
348 # Probably already locally bound; check using getsockname().
349 if conn.getsockname()[1] == 0:
350 raise
351 ov = _overlapped.Overlapped(NULL)
352 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700353
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 def finish_connect(trans, key, ov):
355 ov.getresult()
356 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
357 conn.setsockopt(socket.SOL_SOCKET,
358 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
359 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700360
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 return self._register(ov, conn, finish_connect)
362
363 def accept_pipe(self, pipe):
364 self._register_with_iocp(pipe)
365 ov = _overlapped.Overlapped(NULL)
366 ov.ConnectNamedPipe(pipe.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700367
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100368 def finish_accept_pipe(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369 ov.getresult()
370 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700371
Victor Stinner42d3bde2014-07-28 00:18:43 +0200372 # FIXME: Tulip issue 196: why to we neeed register=False?
373 # See also the comment in the _register() method
374 return self._register(ov, pipe, finish_accept_pipe,
375 register=False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376
377 def connect_pipe(self, address):
378 ov = _overlapped.Overlapped(NULL)
379 ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700380
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100381 def finish_connect_pipe(err, handle, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382 # err, handle were arguments passed to PostQueuedCompletionStatus()
383 # in a function run in a thread pool.
384 if err == _overlapped.ERROR_SEM_TIMEOUT:
385 # Connection did not succeed within time limit.
386 msg = _overlapped.FormatMessage(err)
387 raise ConnectionRefusedError(0, msg, None, err)
388 elif err != 0:
389 msg = _overlapped.FormatMessage(err)
390 raise OSError(0, msg, None, err)
391 else:
392 return windows_utils.PipeHandle(handle)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700393
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100394 return self._register(ov, None, finish_connect_pipe, wait_for_post=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395
Guido van Rossum90fb9142013-10-30 14:44:05 -0700396 def wait_for_handle(self, handle, timeout=None):
397 if timeout is None:
398 ms = _winapi.INFINITE
399 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100400 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
401 # round away from zero to wait *at least* timeout seconds.
402 ms = math.ceil(timeout * 1e3)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700403
404 # We only create ov so we can use ov.address as a key for the cache.
405 ov = _overlapped.Overlapped(NULL)
406 wh = _overlapped.RegisterWaitWithQueue(
407 handle, self._iocp, ov.address, ms)
Victor Stinner18a28dc2014-07-25 13:05:20 +0200408 f = _WaitHandleFuture(handle, wh, loop=self._loop)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700409
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100410 def finish_wait_for_handle(trans, key, ov):
Richard Oudkerk71196e72013-11-24 17:50:40 +0000411 # Note that this second wait means that we should only use
412 # this with handles types where a successful wait has no
413 # effect. So events or processes are all right, but locks
414 # or semaphores are not. Also note if the handle is
415 # signalled and then quickly reset, then we may return
416 # False even though we have not timed out.
Victor Stinner18a28dc2014-07-25 13:05:20 +0200417 try:
418 return f._poll()
419 finally:
420 f._unregister()
Guido van Rossum90fb9142013-10-30 14:44:05 -0700421
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100422 self._cache[ov.address] = (f, ov, None, finish_wait_for_handle)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700423 return f
424
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425 def _register_with_iocp(self, obj):
426 # To get notifications of finished ops on this objects sent to the
427 # completion port, were must register the handle.
428 if obj not in self._registered:
429 self._registered.add(obj)
430 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
431 # XXX We could also use SetFileCompletionNotificationModes()
432 # to avoid sending notifications to completion port of ops
433 # that succeed immediately.
434
Victor Stinner42d3bde2014-07-28 00:18:43 +0200435 def _register(self, ov, obj, callback,
436 wait_for_post=False, register=True):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437 # Return a future which will be set with the result of the
438 # operation when it completes. The future's value is actually
439 # the value returned by callback().
440 f = _OverlappedFuture(ov, loop=self._loop)
Victor Stinner42d3bde2014-07-28 00:18:43 +0200441 if not ov.pending and not wait_for_post:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700442 # The operation has completed, so no need to postpone the
443 # work. We cannot take this short cut if we need the
444 # NumberOfBytes, CompletionKey values returned by
445 # PostQueuedCompletionStatus().
446 try:
447 value = callback(None, None, ov)
448 except OSError as e:
449 f.set_exception(e)
450 else:
451 f.set_result(value)
Victor Stinner42d3bde2014-07-28 00:18:43 +0200452 # Even if GetOverlappedResult() was called, we have to wait for the
453 # notification of the completion in GetQueuedCompletionStatus().
454 # Register the overlapped operation to keep a reference to the
455 # OVERLAPPED object, otherwise the memory is freed and Windows may
456 # read uninitialized memory.
457 #
458 # For an unknown reason, ConnectNamedPipe() behaves differently:
459 # the completion is not notified by GetOverlappedResult() if we
460 # already called GetOverlappedResult(). For this specific case, we
461 # don't expect notification (register is set to False).
462 else:
463 register = True
464 if register:
465 # Register the overlapped operation for later. Note that
466 # we only store obj to prevent it from being garbage
467 # collected too early.
468 self._cache[ov.address] = (f, ov, obj, callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469 return f
470
471 def _get_accept_socket(self, family):
472 s = socket.socket(family)
473 s.settimeout(0)
474 return s
475
476 def _poll(self, timeout=None):
477 if timeout is None:
478 ms = INFINITE
479 elif timeout < 0:
480 raise ValueError("negative timeout")
481 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100482 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
483 # round away from zero to wait *at least* timeout seconds.
484 ms = math.ceil(timeout * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 if ms >= INFINITE:
486 raise ValueError("timeout too big")
487 while True:
488 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
489 if status is None:
490 return
491 err, transferred, key, address = status
492 try:
493 f, ov, obj, callback = self._cache.pop(address)
494 except KeyError:
Victor Stinner42d3bde2014-07-28 00:18:43 +0200495 if self._loop.get_debug():
496 self._loop.call_exception_handler({
497 'message': ('GetQueuedCompletionStatus() returned an '
498 'unexpected event'),
499 'status': ('err=%s transferred=%s key=%#x address=%#x'
500 % (err, transferred, key, address)),
501 })
502
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700503 # key is either zero, or it is used to return a pipe
504 # handle which should be closed to avoid a leak.
505 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
506 _winapi.CloseHandle(key)
507 ms = 0
508 continue
Victor Stinner51e44ea2014-07-26 00:58:34 +0200509
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700510 if obj in self._stopped_serving:
511 f.cancel()
Victor Stinner42d3bde2014-07-28 00:18:43 +0200512 # Don't call the callback if _register() already read the result or
513 # if the overlapped has been cancelled
514 elif not f.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700515 try:
516 value = callback(transferred, key, ov)
517 except OSError as e:
518 f.set_exception(e)
519 self._results.append(f)
520 else:
521 f.set_result(value)
522 self._results.append(f)
523 ms = 0
524
525 def _stop_serving(self, obj):
526 # obj is a socket or pipe handle. It will be closed in
527 # BaseProactorEventLoop._stop_serving() which will make any
528 # pending operations fail quickly.
529 self._stopped_serving.add(obj)
530
531 def close(self):
532 # Cancel remaining registered operations.
Victor Stinnerfea6a102014-07-25 00:54:53 +0200533 for address, (fut, ov, obj, callback) in list(self._cache.items()):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700534 if obj is None:
535 # The operation was started with connect_pipe() which
536 # queues a task to Windows' thread pool. This cannot
537 # be cancelled, so just forget it.
538 del self._cache[address]
Victor Stinner42d3bde2014-07-28 00:18:43 +0200539 # FIXME: Tulip issue 196: remove this case, it should not happen
540 elif fut.done() and not fut.cancelled():
541 del self._cache[address]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542 else:
543 try:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200544 fut.cancel()
545 except OSError as exc:
546 if self._loop is not None:
547 context = {
548 'message': 'Cancelling a future failed',
549 'exception': exc,
550 'future': fut,
551 }
552 if fut._source_traceback:
553 context['source_traceback'] = fut._source_traceback
554 self._loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700555
556 while self._cache:
557 if not self._poll(1):
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700558 logger.debug('taking long time to close proactor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700559
560 self._results = []
561 if self._iocp is not None:
562 _winapi.CloseHandle(self._iocp)
563 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700564
Victor Stinnerfea6a102014-07-25 00:54:53 +0200565 def __del__(self):
566 self.close()
567
Guido van Rossum59691282013-10-30 14:52:03 -0700568
569class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
570
571 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
572 self._proc = windows_utils.Popen(
573 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
574 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700575
Guido van Rossum59691282013-10-30 14:52:03 -0700576 def callback(f):
577 returncode = self._proc.poll()
578 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700579
Guido van Rossum59691282013-10-30 14:52:03 -0700580 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
581 f.add_done_callback(callback)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800582
583
584SelectorEventLoop = _WindowsSelectorEventLoop
585
586
587class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
588 _loop_factory = SelectorEventLoop
589
590
591DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy