blob: ec427d5c7052be311d88761d8edb071594249dda [file] [log] [blame]
Yury Selivanovb0b0e622014-02-18 22:27:48 -05001"""Selector and proactor event loops for Windows."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Victor Stinnerf2e17682014-01-31 16:25:24 +01003import _winapi
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import errno
Victor Stinnerf2e17682014-01-31 16:25:24 +01005import math
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import socket
Victor Stinnerf2e17682014-01-31 16:25:24 +01007import struct
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010from . import events
Guido van Rossum59691282013-10-30 14:52:03 -070011from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012from . import futures
13from . import proactor_events
14from . import selector_events
15from . import tasks
16from . import windows_utils
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import _overlapped
Victor Stinnerf951d282014-06-29 00:46:45 +020018from .coroutines import coroutine
19from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080022__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
23 'DefaultEventLoopPolicy',
24 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
27NULL = 0
28INFINITE = 0xffffffff
29ERROR_CONNECTION_REFUSED = 1225
30ERROR_CONNECTION_ABORTED = 1236
31
32
33class _OverlappedFuture(futures.Future):
34 """Subclass of Future which represents an overlapped operation.
35
36 Cancelling it will immediately cancel the overlapped operation.
37 """
38
39 def __init__(self, ov, *, loop=None):
40 super().__init__(loop=loop)
Victor Stinnerfea6a102014-07-25 00:54:53 +020041 if self._source_traceback:
42 del self._source_traceback[-1]
Victor Stinner18a28dc2014-07-25 13:05:20 +020043 self._ov = ov
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044
Victor Stinner313a9802014-07-29 12:58:23 +020045 def _repr_info(self):
46 info = super()._repr_info()
Victor Stinner18a28dc2014-07-25 13:05:20 +020047 if self._ov is not None:
48 state = 'pending' if self._ov.pending else 'completed'
Victor Stinner313a9802014-07-29 12:58:23 +020049 info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address))
50 return info
Victor Stinnere912e652014-07-12 03:11:53 +020051
Victor Stinner18a28dc2014-07-25 13:05:20 +020052 def _cancel_overlapped(self):
53 if self._ov is None:
54 return
55 try:
56 self._ov.cancel()
57 except OSError as exc:
58 context = {
59 'message': 'Cancelling an overlapped future failed',
60 'exception': exc,
61 'future': self,
62 }
63 if self._source_traceback:
64 context['source_traceback'] = self._source_traceback
65 self._loop.call_exception_handler(context)
66 self._ov = None
67
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070068 def cancel(self):
Victor Stinner18a28dc2014-07-25 13:05:20 +020069 self._cancel_overlapped()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070070 return super().cancel()
71
Victor Stinner18a28dc2014-07-25 13:05:20 +020072 def set_exception(self, exception):
73 super().set_exception(exception)
74 self._cancel_overlapped()
75
Victor Stinner51e44ea2014-07-26 00:58:34 +020076 def set_result(self, result):
77 super().set_result(result)
78 self._ov = None
79
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070080
Guido van Rossum90fb9142013-10-30 14:44:05 -070081class _WaitHandleFuture(futures.Future):
82 """Subclass of Future which represents a wait handle."""
83
Victor Stinner313a9802014-07-29 12:58:23 +020084 def __init__(self, iocp, ov, handle, wait_handle, *, loop=None):
Guido van Rossum90fb9142013-10-30 14:44:05 -070085 super().__init__(loop=loop)
Victor Stinner313a9802014-07-29 12:58:23 +020086 if self._source_traceback:
87 del self._source_traceback[-1]
88 # iocp and ov are only used by cancel() to notify IocpProactor
89 # that the wait was cancelled
90 self._iocp = iocp
91 self._ov = ov
Victor Stinner18a28dc2014-07-25 13:05:20 +020092 self._handle = handle
Guido van Rossum90fb9142013-10-30 14:44:05 -070093 self._wait_handle = wait_handle
94
Victor Stinner18a28dc2014-07-25 13:05:20 +020095 def _poll(self):
96 # non-blocking wait: use a timeout of 0 millisecond
97 return (_winapi.WaitForSingleObject(self._handle, 0) ==
98 _winapi.WAIT_OBJECT_0)
99
Victor Stinner313a9802014-07-29 12:58:23 +0200100 def _repr_info(self):
101 info = super()._repr_info()
102 info.insert(1, 'handle=%#x' % self._handle)
Victor Stinner18a28dc2014-07-25 13:05:20 +0200103 if self._wait_handle:
Victor Stinner313a9802014-07-29 12:58:23 +0200104 state = 'signaled' if self._poll() else 'waiting'
105 info.insert(1, 'wait_handle=<%s, %#x>'
106 % (state, self._wait_handle))
107 return info
Victor Stinner18a28dc2014-07-25 13:05:20 +0200108
Victor Stinner313a9802014-07-29 12:58:23 +0200109 def _unregister_wait(self):
Victor Stinnerfea6a102014-07-25 00:54:53 +0200110 if self._wait_handle is None:
111 return
Guido van Rossum90fb9142013-10-30 14:44:05 -0700112 try:
113 _overlapped.UnregisterWait(self._wait_handle)
114 except OSError as e:
115 if e.winerror != _overlapped.ERROR_IO_PENDING:
116 raise
Victor Stinnerfea6a102014-07-25 00:54:53 +0200117 # ERROR_IO_PENDING is not an error, the wait was unregistered
118 self._wait_handle = None
Victor Stinner313a9802014-07-29 12:58:23 +0200119 self._iocp = None
120 self._ov = None
Victor Stinnerfea6a102014-07-25 00:54:53 +0200121
122 def cancel(self):
Victor Stinner313a9802014-07-29 12:58:23 +0200123 result = super().cancel()
124 if self._ov is not None:
125 # signal the cancellation to the overlapped object
126 _overlapped.PostQueuedCompletionStatus(self._iocp, True,
127 0, self._ov.address)
128 self._unregister_wait()
129 return result
130
131 def set_exception(self, exception):
132 super().set_exception(exception)
133 self._unregister_wait()
134
135 def set_result(self, result):
136 super().set_result(result)
137 self._unregister_wait()
Guido van Rossum90fb9142013-10-30 14:44:05 -0700138
139
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700140class PipeServer(object):
141 """Class representing a pipe server.
142
143 This is much like a bound, listening socket.
144 """
145 def __init__(self, address):
146 self._address = address
147 self._free_instances = weakref.WeakSet()
148 self._pipe = self._server_pipe_handle(True)
149
150 def _get_unconnected_pipe(self):
151 # Create new instance and return previous one. This ensures
152 # that (until the server is closed) there is always at least
153 # one pipe handle for address. Therefore if a client attempt
154 # to connect it will not fail with FileNotFoundError.
155 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
156 return tmp
157
158 def _server_pipe_handle(self, first):
159 # Return a wrapper for a new pipe handle.
160 if self._address is None:
161 return None
162 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
163 if first:
164 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
165 h = _winapi.CreateNamedPipe(
166 self._address, flags,
167 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
168 _winapi.PIPE_WAIT,
169 _winapi.PIPE_UNLIMITED_INSTANCES,
170 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
171 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
172 pipe = windows_utils.PipeHandle(h)
173 self._free_instances.add(pipe)
174 return pipe
175
176 def close(self):
177 # Close all instances which have not been connected to by a client.
178 if self._address is not None:
179 for pipe in self._free_instances:
180 pipe.close()
181 self._pipe = None
182 self._address = None
183 self._free_instances.clear()
184
185 __del__ = close
186
187
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800188class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700189 """Windows version of selector event loop."""
190
191 def _socketpair(self):
192 return windows_utils.socketpair()
193
194
195class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
196 """Windows version of proactor event loop using IOCP."""
197
198 def __init__(self, proactor=None):
199 if proactor is None:
200 proactor = IocpProactor()
201 super().__init__(proactor)
202
203 def _socketpair(self):
204 return windows_utils.socketpair()
205
Victor Stinnerf951d282014-06-29 00:46:45 +0200206 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700207 def create_pipe_connection(self, protocol_factory, address):
208 f = self._proactor.connect_pipe(address)
209 pipe = yield from f
210 protocol = protocol_factory()
211 trans = self._make_duplex_pipe_transport(pipe, protocol,
212 extra={'addr': address})
213 return trans, protocol
214
Victor Stinnerf951d282014-06-29 00:46:45 +0200215 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700216 def start_serving_pipe(self, protocol_factory, address):
217 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700218
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700219 def loop(f=None):
220 pipe = None
221 try:
222 if f:
223 pipe = f.result()
224 server._free_instances.discard(pipe)
225 protocol = protocol_factory()
226 self._make_duplex_pipe_transport(
227 pipe, protocol, extra={'addr': address})
228 pipe = server._get_unconnected_pipe()
229 if pipe is None:
230 return
231 f = self._proactor.accept_pipe(pipe)
Yury Selivanovff827f02014-02-18 18:02:19 -0500232 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233 if pipe and pipe.fileno() != -1:
Yury Selivanovff827f02014-02-18 18:02:19 -0500234 self.call_exception_handler({
235 'message': 'Pipe accept failed',
236 'exception': exc,
237 'pipe': pipe,
238 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700239 pipe.close()
240 except futures.CancelledError:
241 if pipe:
242 pipe.close()
243 else:
244 f.add_done_callback(loop)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700245
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700246 self.call_soon(loop)
247 return [server]
248
Victor Stinnerf951d282014-06-29 00:46:45 +0200249 @coroutine
Guido van Rossum59691282013-10-30 14:52:03 -0700250 def _make_subprocess_transport(self, protocol, args, shell,
251 stdin, stdout, stderr, bufsize,
252 extra=None, **kwargs):
253 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
254 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800255 extra=extra, **kwargs)
Guido van Rossum59691282013-10-30 14:52:03 -0700256 yield from transp._post_init()
257 return transp
258
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700259
260class IocpProactor:
261 """Proactor implementation using IOCP."""
262
263 def __init__(self, concurrency=0xffffffff):
264 self._loop = None
265 self._results = []
266 self._iocp = _overlapped.CreateIoCompletionPort(
267 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
268 self._cache = {}
269 self._registered = weakref.WeakSet()
270 self._stopped_serving = weakref.WeakSet()
271
Victor Stinnerfea6a102014-07-25 00:54:53 +0200272 def __repr__(self):
273 return ('<%s overlapped#=%s result#=%s>'
274 % (self.__class__.__name__, len(self._cache),
275 len(self._results)))
276
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700277 def set_loop(self, loop):
278 self._loop = loop
279
280 def select(self, timeout=None):
281 if not self._results:
282 self._poll(timeout)
283 tmp = self._results
284 self._results = []
285 return tmp
286
287 def recv(self, conn, nbytes, flags=0):
288 self._register_with_iocp(conn)
289 ov = _overlapped.Overlapped(NULL)
290 if isinstance(conn, socket.socket):
291 ov.WSARecv(conn.fileno(), nbytes, flags)
292 else:
293 ov.ReadFile(conn.fileno(), nbytes)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700294
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100295 def finish_recv(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296 try:
297 return ov.getresult()
298 except OSError as exc:
299 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
300 raise ConnectionResetError(*exc.args)
301 else:
302 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700303
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100304 return self._register(ov, conn, finish_recv)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700305
306 def send(self, conn, buf, flags=0):
307 self._register_with_iocp(conn)
308 ov = _overlapped.Overlapped(NULL)
309 if isinstance(conn, socket.socket):
310 ov.WSASend(conn.fileno(), buf, flags)
311 else:
312 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700313
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100314 def finish_send(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315 try:
316 return ov.getresult()
317 except OSError as exc:
318 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
319 raise ConnectionResetError(*exc.args)
320 else:
321 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700322
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100323 return self._register(ov, conn, finish_send)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324
325 def accept(self, listener):
326 self._register_with_iocp(listener)
327 conn = self._get_accept_socket(listener.family)
328 ov = _overlapped.Overlapped(NULL)
329 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700330
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 def finish_accept(trans, key, ov):
332 ov.getresult()
333 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
334 buf = struct.pack('@P', listener.fileno())
335 conn.setsockopt(socket.SOL_SOCKET,
336 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
337 conn.settimeout(listener.gettimeout())
338 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700339
Victor Stinnerf951d282014-06-29 00:46:45 +0200340 @coroutine
Victor Stinner7de26462014-01-11 00:03:21 +0100341 def accept_coro(future, conn):
342 # Coroutine closing the accept socket if the future is cancelled
343 try:
344 yield from future
345 except futures.CancelledError:
346 conn.close()
347 raise
348
349 future = self._register(ov, listener, finish_accept)
350 coro = accept_coro(future, conn)
351 tasks.async(coro, loop=self._loop)
352 return future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700353
354 def connect(self, conn, address):
355 self._register_with_iocp(conn)
356 # The socket needs to be locally bound before we call ConnectEx().
357 try:
358 _overlapped.BindLocal(conn.fileno(), conn.family)
359 except OSError as e:
360 if e.winerror != errno.WSAEINVAL:
361 raise
362 # Probably already locally bound; check using getsockname().
363 if conn.getsockname()[1] == 0:
364 raise
365 ov = _overlapped.Overlapped(NULL)
366 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700367
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 def finish_connect(trans, key, ov):
369 ov.getresult()
370 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
371 conn.setsockopt(socket.SOL_SOCKET,
372 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
373 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700374
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375 return self._register(ov, conn, finish_connect)
376
377 def accept_pipe(self, pipe):
378 self._register_with_iocp(pipe)
379 ov = _overlapped.Overlapped(NULL)
380 ov.ConnectNamedPipe(pipe.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700381
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100382 def finish_accept_pipe(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383 ov.getresult()
384 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700385
Victor Stinner42d3bde2014-07-28 00:18:43 +0200386 # FIXME: Tulip issue 196: why to we neeed register=False?
387 # See also the comment in the _register() method
388 return self._register(ov, pipe, finish_accept_pipe,
389 register=False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390
391 def connect_pipe(self, address):
392 ov = _overlapped.Overlapped(NULL)
393 ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700394
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100395 def finish_connect_pipe(err, handle, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700396 # err, handle were arguments passed to PostQueuedCompletionStatus()
397 # in a function run in a thread pool.
398 if err == _overlapped.ERROR_SEM_TIMEOUT:
399 # Connection did not succeed within time limit.
400 msg = _overlapped.FormatMessage(err)
401 raise ConnectionRefusedError(0, msg, None, err)
402 elif err != 0:
403 msg = _overlapped.FormatMessage(err)
404 raise OSError(0, msg, None, err)
405 else:
406 return windows_utils.PipeHandle(handle)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700407
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100408 return self._register(ov, None, finish_connect_pipe, wait_for_post=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409
Guido van Rossum90fb9142013-10-30 14:44:05 -0700410 def wait_for_handle(self, handle, timeout=None):
411 if timeout is None:
412 ms = _winapi.INFINITE
413 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100414 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
415 # round away from zero to wait *at least* timeout seconds.
416 ms = math.ceil(timeout * 1e3)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700417
418 # We only create ov so we can use ov.address as a key for the cache.
419 ov = _overlapped.Overlapped(NULL)
420 wh = _overlapped.RegisterWaitWithQueue(
421 handle, self._iocp, ov.address, ms)
Victor Stinner313a9802014-07-29 12:58:23 +0200422 f = _WaitHandleFuture(self._iocp, ov, handle, wh, loop=self._loop)
423 if f._source_traceback:
424 del f._source_traceback[-1]
Guido van Rossum90fb9142013-10-30 14:44:05 -0700425
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100426 def finish_wait_for_handle(trans, key, ov):
Richard Oudkerk71196e72013-11-24 17:50:40 +0000427 # Note that this second wait means that we should only use
428 # this with handles types where a successful wait has no
429 # effect. So events or processes are all right, but locks
430 # or semaphores are not. Also note if the handle is
431 # signalled and then quickly reset, then we may return
432 # False even though we have not timed out.
Victor Stinner313a9802014-07-29 12:58:23 +0200433 return f._poll()
Guido van Rossum90fb9142013-10-30 14:44:05 -0700434
Victor Stinner313a9802014-07-29 12:58:23 +0200435 if f._poll():
436 try:
437 result = f._poll()
438 except OSError as exc:
439 f.set_exception(exc)
440 else:
441 f.set_result(result)
442
443 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700444 return f
445
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 def _register_with_iocp(self, obj):
447 # To get notifications of finished ops on this objects sent to the
448 # completion port, were must register the handle.
449 if obj not in self._registered:
450 self._registered.add(obj)
451 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
452 # XXX We could also use SetFileCompletionNotificationModes()
453 # to avoid sending notifications to completion port of ops
454 # that succeed immediately.
455
Victor Stinner42d3bde2014-07-28 00:18:43 +0200456 def _register(self, ov, obj, callback,
457 wait_for_post=False, register=True):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700458 # Return a future which will be set with the result of the
459 # operation when it completes. The future's value is actually
460 # the value returned by callback().
461 f = _OverlappedFuture(ov, loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200462 if f._source_traceback:
463 del f._source_traceback[-1]
Victor Stinner42d3bde2014-07-28 00:18:43 +0200464 if not ov.pending and not wait_for_post:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465 # The operation has completed, so no need to postpone the
466 # work. We cannot take this short cut if we need the
467 # NumberOfBytes, CompletionKey values returned by
468 # PostQueuedCompletionStatus().
469 try:
470 value = callback(None, None, ov)
471 except OSError as e:
472 f.set_exception(e)
473 else:
474 f.set_result(value)
Victor Stinner42d3bde2014-07-28 00:18:43 +0200475 # Even if GetOverlappedResult() was called, we have to wait for the
476 # notification of the completion in GetQueuedCompletionStatus().
477 # Register the overlapped operation to keep a reference to the
478 # OVERLAPPED object, otherwise the memory is freed and Windows may
479 # read uninitialized memory.
480 #
481 # For an unknown reason, ConnectNamedPipe() behaves differently:
482 # the completion is not notified by GetOverlappedResult() if we
483 # already called GetOverlappedResult(). For this specific case, we
484 # don't expect notification (register is set to False).
485 else:
486 register = True
487 if register:
488 # Register the overlapped operation for later. Note that
489 # we only store obj to prevent it from being garbage
490 # collected too early.
491 self._cache[ov.address] = (f, ov, obj, callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700492 return f
493
494 def _get_accept_socket(self, family):
495 s = socket.socket(family)
496 s.settimeout(0)
497 return s
498
499 def _poll(self, timeout=None):
500 if timeout is None:
501 ms = INFINITE
502 elif timeout < 0:
503 raise ValueError("negative timeout")
504 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100505 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
506 # round away from zero to wait *at least* timeout seconds.
507 ms = math.ceil(timeout * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700508 if ms >= INFINITE:
509 raise ValueError("timeout too big")
Victor Stinner313a9802014-07-29 12:58:23 +0200510
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700511 while True:
512 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
513 if status is None:
514 return
Victor Stinner313a9802014-07-29 12:58:23 +0200515 ms = 0
516
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700517 err, transferred, key, address = status
518 try:
519 f, ov, obj, callback = self._cache.pop(address)
520 except KeyError:
Victor Stinner42d3bde2014-07-28 00:18:43 +0200521 if self._loop.get_debug():
522 self._loop.call_exception_handler({
523 'message': ('GetQueuedCompletionStatus() returned an '
524 'unexpected event'),
525 'status': ('err=%s transferred=%s key=%#x address=%#x'
526 % (err, transferred, key, address)),
527 })
528
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700529 # key is either zero, or it is used to return a pipe
530 # handle which should be closed to avoid a leak.
531 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
532 _winapi.CloseHandle(key)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533 continue
Victor Stinner51e44ea2014-07-26 00:58:34 +0200534
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700535 if obj in self._stopped_serving:
536 f.cancel()
Victor Stinner42d3bde2014-07-28 00:18:43 +0200537 # Don't call the callback if _register() already read the result or
538 # if the overlapped has been cancelled
539 elif not f.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700540 try:
541 value = callback(transferred, key, ov)
542 except OSError as e:
543 f.set_exception(e)
544 self._results.append(f)
545 else:
546 f.set_result(value)
547 self._results.append(f)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700548
549 def _stop_serving(self, obj):
550 # obj is a socket or pipe handle. It will be closed in
551 # BaseProactorEventLoop._stop_serving() which will make any
552 # pending operations fail quickly.
553 self._stopped_serving.add(obj)
554
555 def close(self):
556 # Cancel remaining registered operations.
Victor Stinnerfea6a102014-07-25 00:54:53 +0200557 for address, (fut, ov, obj, callback) in list(self._cache.items()):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700558 if obj is None:
559 # The operation was started with connect_pipe() which
560 # queues a task to Windows' thread pool. This cannot
561 # be cancelled, so just forget it.
562 del self._cache[address]
Victor Stinner42d3bde2014-07-28 00:18:43 +0200563 # FIXME: Tulip issue 196: remove this case, it should not happen
564 elif fut.done() and not fut.cancelled():
565 del self._cache[address]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566 else:
567 try:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200568 fut.cancel()
569 except OSError as exc:
570 if self._loop is not None:
571 context = {
572 'message': 'Cancelling a future failed',
573 'exception': exc,
574 'future': fut,
575 }
576 if fut._source_traceback:
577 context['source_traceback'] = fut._source_traceback
578 self._loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579
580 while self._cache:
581 if not self._poll(1):
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700582 logger.debug('taking long time to close proactor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700583
584 self._results = []
585 if self._iocp is not None:
586 _winapi.CloseHandle(self._iocp)
587 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700588
Victor Stinnerfea6a102014-07-25 00:54:53 +0200589 def __del__(self):
590 self.close()
591
Guido van Rossum59691282013-10-30 14:52:03 -0700592
593class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
594
595 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
596 self._proc = windows_utils.Popen(
597 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
598 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700599
Guido van Rossum59691282013-10-30 14:52:03 -0700600 def callback(f):
601 returncode = self._proc.poll()
602 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700603
Guido van Rossum59691282013-10-30 14:52:03 -0700604 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
605 f.add_done_callback(callback)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800606
607
608SelectorEventLoop = _WindowsSelectorEventLoop
609
610
611class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
612 _loop_factory = SelectorEventLoop
613
614
615DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy