blob: 9d496f2f47ab67402618eb6f836aee542b3a667f [file] [log] [blame]
Yury Selivanovb0b0e622014-02-18 22:27:48 -05001"""Selector and proactor event loops for Windows."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Victor Stinnerf2e17682014-01-31 16:25:24 +01003import _winapi
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import errno
Victor Stinnerf2e17682014-01-31 16:25:24 +01005import math
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import socket
Victor Stinnerf2e17682014-01-31 16:25:24 +01007import struct
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010from . import events
Guido van Rossum59691282013-10-30 14:52:03 -070011from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012from . import futures
13from . import proactor_events
14from . import selector_events
15from . import tasks
16from . import windows_utils
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import _overlapped
Victor Stinnerf951d282014-06-29 00:46:45 +020018from .coroutines import coroutine
19from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080022__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
23 'DefaultEventLoopPolicy',
24 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
27NULL = 0
28INFINITE = 0xffffffff
29ERROR_CONNECTION_REFUSED = 1225
30ERROR_CONNECTION_ABORTED = 1236
31
32
33class _OverlappedFuture(futures.Future):
34 """Subclass of Future which represents an overlapped operation.
35
36 Cancelling it will immediately cancel the overlapped operation.
37 """
38
39 def __init__(self, ov, *, loop=None):
40 super().__init__(loop=loop)
Victor Stinnerfea6a102014-07-25 00:54:53 +020041 if self._source_traceback:
42 del self._source_traceback[-1]
Victor Stinner18a28dc2014-07-25 13:05:20 +020043 self._ov = ov
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044
Victor Stinner313a9802014-07-29 12:58:23 +020045 def _repr_info(self):
46 info = super()._repr_info()
Victor Stinner18a28dc2014-07-25 13:05:20 +020047 if self._ov is not None:
48 state = 'pending' if self._ov.pending else 'completed'
Victor Stinner313a9802014-07-29 12:58:23 +020049 info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address))
50 return info
Victor Stinnere912e652014-07-12 03:11:53 +020051
Victor Stinner18a28dc2014-07-25 13:05:20 +020052 def _cancel_overlapped(self):
53 if self._ov is None:
54 return
55 try:
56 self._ov.cancel()
57 except OSError as exc:
58 context = {
59 'message': 'Cancelling an overlapped future failed',
60 'exception': exc,
61 'future': self,
62 }
63 if self._source_traceback:
64 context['source_traceback'] = self._source_traceback
65 self._loop.call_exception_handler(context)
66 self._ov = None
67
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070068 def cancel(self):
Victor Stinner18a28dc2014-07-25 13:05:20 +020069 self._cancel_overlapped()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070070 return super().cancel()
71
Victor Stinner18a28dc2014-07-25 13:05:20 +020072 def set_exception(self, exception):
73 super().set_exception(exception)
74 self._cancel_overlapped()
75
Victor Stinner51e44ea2014-07-26 00:58:34 +020076 def set_result(self, result):
77 super().set_result(result)
78 self._ov = None
79
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070080
Guido van Rossum90fb9142013-10-30 14:44:05 -070081class _WaitHandleFuture(futures.Future):
82 """Subclass of Future which represents a wait handle."""
83
Victor Stinner313a9802014-07-29 12:58:23 +020084 def __init__(self, iocp, ov, handle, wait_handle, *, loop=None):
Guido van Rossum90fb9142013-10-30 14:44:05 -070085 super().__init__(loop=loop)
Victor Stinner313a9802014-07-29 12:58:23 +020086 if self._source_traceback:
87 del self._source_traceback[-1]
88 # iocp and ov are only used by cancel() to notify IocpProactor
89 # that the wait was cancelled
90 self._iocp = iocp
91 self._ov = ov
Victor Stinner18a28dc2014-07-25 13:05:20 +020092 self._handle = handle
Guido van Rossum90fb9142013-10-30 14:44:05 -070093 self._wait_handle = wait_handle
94
Victor Stinner18a28dc2014-07-25 13:05:20 +020095 def _poll(self):
96 # non-blocking wait: use a timeout of 0 millisecond
97 return (_winapi.WaitForSingleObject(self._handle, 0) ==
98 _winapi.WAIT_OBJECT_0)
99
Victor Stinner313a9802014-07-29 12:58:23 +0200100 def _repr_info(self):
101 info = super()._repr_info()
102 info.insert(1, 'handle=%#x' % self._handle)
Victor Stinner18a28dc2014-07-25 13:05:20 +0200103 if self._wait_handle:
Victor Stinner313a9802014-07-29 12:58:23 +0200104 state = 'signaled' if self._poll() else 'waiting'
105 info.insert(1, 'wait_handle=<%s, %#x>'
106 % (state, self._wait_handle))
107 return info
Victor Stinner18a28dc2014-07-25 13:05:20 +0200108
Victor Stinner313a9802014-07-29 12:58:23 +0200109 def _unregister_wait(self):
Victor Stinnerfea6a102014-07-25 00:54:53 +0200110 if self._wait_handle is None:
111 return
Guido van Rossum90fb9142013-10-30 14:44:05 -0700112 try:
113 _overlapped.UnregisterWait(self._wait_handle)
Victor Stinnerb2614752014-08-25 23:20:52 +0200114 except OSError as exc:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200115 # ERROR_IO_PENDING is not an error, the wait was unregistered
Victor Stinnerb2614752014-08-25 23:20:52 +0200116 if exc.winerror != _overlapped.ERROR_IO_PENDING:
117 context = {
118 'message': 'Failed to unregister the wait handle',
119 'exception': exc,
120 'future': self,
121 }
122 if self._source_traceback:
123 context['source_traceback'] = self._source_traceback
124 self._loop.call_exception_handler(context)
Victor Stinnerfea6a102014-07-25 00:54:53 +0200125 self._wait_handle = None
Victor Stinner313a9802014-07-29 12:58:23 +0200126 self._iocp = None
127 self._ov = None
Victor Stinnerfea6a102014-07-25 00:54:53 +0200128
129 def cancel(self):
Victor Stinner313a9802014-07-29 12:58:23 +0200130 result = super().cancel()
131 if self._ov is not None:
132 # signal the cancellation to the overlapped object
133 _overlapped.PostQueuedCompletionStatus(self._iocp, True,
134 0, self._ov.address)
135 self._unregister_wait()
136 return result
137
138 def set_exception(self, exception):
139 super().set_exception(exception)
140 self._unregister_wait()
141
142 def set_result(self, result):
143 super().set_result(result)
144 self._unregister_wait()
Guido van Rossum90fb9142013-10-30 14:44:05 -0700145
146
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147class PipeServer(object):
148 """Class representing a pipe server.
149
150 This is much like a bound, listening socket.
151 """
152 def __init__(self, address):
153 self._address = address
154 self._free_instances = weakref.WeakSet()
Victor Stinnerb2614752014-08-25 23:20:52 +0200155 # initialize the pipe attribute before calling _server_pipe_handle()
156 # because this function can raise an exception and the destructor calls
157 # the close() method
158 self._pipe = None
159 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160 self._pipe = self._server_pipe_handle(True)
161
162 def _get_unconnected_pipe(self):
163 # Create new instance and return previous one. This ensures
164 # that (until the server is closed) there is always at least
165 # one pipe handle for address. Therefore if a client attempt
166 # to connect it will not fail with FileNotFoundError.
167 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
168 return tmp
169
170 def _server_pipe_handle(self, first):
171 # Return a wrapper for a new pipe handle.
172 if self._address is None:
173 return None
174 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
175 if first:
176 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
177 h = _winapi.CreateNamedPipe(
178 self._address, flags,
179 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
180 _winapi.PIPE_WAIT,
181 _winapi.PIPE_UNLIMITED_INSTANCES,
182 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
183 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
184 pipe = windows_utils.PipeHandle(h)
185 self._free_instances.add(pipe)
186 return pipe
187
188 def close(self):
Victor Stinnerb2614752014-08-25 23:20:52 +0200189 if self._accept_pipe_future is not None:
190 self._accept_pipe_future.cancel()
191 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700192 # Close all instances which have not been connected to by a client.
193 if self._address is not None:
194 for pipe in self._free_instances:
195 pipe.close()
196 self._pipe = None
197 self._address = None
198 self._free_instances.clear()
199
200 __del__ = close
201
202
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800203class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204 """Windows version of selector event loop."""
205
206 def _socketpair(self):
207 return windows_utils.socketpair()
208
209
210class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
211 """Windows version of proactor event loop using IOCP."""
212
213 def __init__(self, proactor=None):
214 if proactor is None:
215 proactor = IocpProactor()
216 super().__init__(proactor)
217
218 def _socketpair(self):
219 return windows_utils.socketpair()
220
Victor Stinnerf951d282014-06-29 00:46:45 +0200221 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700222 def create_pipe_connection(self, protocol_factory, address):
223 f = self._proactor.connect_pipe(address)
224 pipe = yield from f
225 protocol = protocol_factory()
226 trans = self._make_duplex_pipe_transport(pipe, protocol,
227 extra={'addr': address})
228 return trans, protocol
229
Victor Stinnerf951d282014-06-29 00:46:45 +0200230 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700231 def start_serving_pipe(self, protocol_factory, address):
232 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700233
Victor Stinnerb2614752014-08-25 23:20:52 +0200234 def loop_accept_pipe(f=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235 pipe = None
236 try:
237 if f:
238 pipe = f.result()
239 server._free_instances.discard(pipe)
240 protocol = protocol_factory()
241 self._make_duplex_pipe_transport(
242 pipe, protocol, extra={'addr': address})
243 pipe = server._get_unconnected_pipe()
244 if pipe is None:
245 return
246 f = self._proactor.accept_pipe(pipe)
Yury Selivanovff827f02014-02-18 18:02:19 -0500247 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700248 if pipe and pipe.fileno() != -1:
Yury Selivanovff827f02014-02-18 18:02:19 -0500249 self.call_exception_handler({
250 'message': 'Pipe accept failed',
251 'exception': exc,
252 'pipe': pipe,
253 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254 pipe.close()
Victor Stinnerb2614752014-08-25 23:20:52 +0200255 elif self._debug:
256 logger.warning("Accept pipe failed on pipe %r",
257 pipe, exc_info=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700258 except futures.CancelledError:
259 if pipe:
260 pipe.close()
261 else:
Victor Stinnerb2614752014-08-25 23:20:52 +0200262 server._accept_pipe_future = f
263 f.add_done_callback(loop_accept_pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700264
Victor Stinnerb2614752014-08-25 23:20:52 +0200265 self.call_soon(loop_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700266 return [server]
267
Victor Stinnerf951d282014-06-29 00:46:45 +0200268 @coroutine
Guido van Rossum59691282013-10-30 14:52:03 -0700269 def _make_subprocess_transport(self, protocol, args, shell,
270 stdin, stdout, stderr, bufsize,
271 extra=None, **kwargs):
272 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
273 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800274 extra=extra, **kwargs)
Guido van Rossum59691282013-10-30 14:52:03 -0700275 yield from transp._post_init()
276 return transp
277
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700278
279class IocpProactor:
280 """Proactor implementation using IOCP."""
281
282 def __init__(self, concurrency=0xffffffff):
283 self._loop = None
284 self._results = []
285 self._iocp = _overlapped.CreateIoCompletionPort(
286 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
287 self._cache = {}
288 self._registered = weakref.WeakSet()
289 self._stopped_serving = weakref.WeakSet()
290
Victor Stinnerfea6a102014-07-25 00:54:53 +0200291 def __repr__(self):
292 return ('<%s overlapped#=%s result#=%s>'
293 % (self.__class__.__name__, len(self._cache),
294 len(self._results)))
295
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296 def set_loop(self, loop):
297 self._loop = loop
298
299 def select(self, timeout=None):
300 if not self._results:
301 self._poll(timeout)
302 tmp = self._results
303 self._results = []
304 return tmp
305
306 def recv(self, conn, nbytes, flags=0):
307 self._register_with_iocp(conn)
308 ov = _overlapped.Overlapped(NULL)
309 if isinstance(conn, socket.socket):
310 ov.WSARecv(conn.fileno(), nbytes, flags)
311 else:
312 ov.ReadFile(conn.fileno(), nbytes)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700313
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100314 def finish_recv(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315 try:
316 return ov.getresult()
317 except OSError as exc:
318 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
319 raise ConnectionResetError(*exc.args)
320 else:
321 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700322
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100323 return self._register(ov, conn, finish_recv)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324
325 def send(self, conn, buf, flags=0):
326 self._register_with_iocp(conn)
327 ov = _overlapped.Overlapped(NULL)
328 if isinstance(conn, socket.socket):
329 ov.WSASend(conn.fileno(), buf, flags)
330 else:
331 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700332
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100333 def finish_send(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334 try:
335 return ov.getresult()
336 except OSError as exc:
337 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
338 raise ConnectionResetError(*exc.args)
339 else:
340 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700341
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100342 return self._register(ov, conn, finish_send)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343
344 def accept(self, listener):
345 self._register_with_iocp(listener)
346 conn = self._get_accept_socket(listener.family)
347 ov = _overlapped.Overlapped(NULL)
348 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700349
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 def finish_accept(trans, key, ov):
351 ov.getresult()
352 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
353 buf = struct.pack('@P', listener.fileno())
354 conn.setsockopt(socket.SOL_SOCKET,
355 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
356 conn.settimeout(listener.gettimeout())
357 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700358
Victor Stinnerf951d282014-06-29 00:46:45 +0200359 @coroutine
Victor Stinner7de26462014-01-11 00:03:21 +0100360 def accept_coro(future, conn):
361 # Coroutine closing the accept socket if the future is cancelled
362 try:
363 yield from future
364 except futures.CancelledError:
365 conn.close()
366 raise
367
368 future = self._register(ov, listener, finish_accept)
369 coro = accept_coro(future, conn)
370 tasks.async(coro, loop=self._loop)
371 return future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372
373 def connect(self, conn, address):
374 self._register_with_iocp(conn)
375 # The socket needs to be locally bound before we call ConnectEx().
376 try:
377 _overlapped.BindLocal(conn.fileno(), conn.family)
378 except OSError as e:
379 if e.winerror != errno.WSAEINVAL:
380 raise
381 # Probably already locally bound; check using getsockname().
382 if conn.getsockname()[1] == 0:
383 raise
384 ov = _overlapped.Overlapped(NULL)
385 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700386
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700387 def finish_connect(trans, key, ov):
388 ov.getresult()
389 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
390 conn.setsockopt(socket.SOL_SOCKET,
391 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
392 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700393
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394 return self._register(ov, conn, finish_connect)
395
396 def accept_pipe(self, pipe):
397 self._register_with_iocp(pipe)
398 ov = _overlapped.Overlapped(NULL)
399 ov.ConnectNamedPipe(pipe.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700400
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100401 def finish_accept_pipe(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700402 ov.getresult()
403 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700404
Victor Stinner1b9763d2014-12-18 23:47:27 +0100405 # FIXME: Tulip issue 196: why do we need register=False?
Victor Stinner42d3bde2014-07-28 00:18:43 +0200406 # See also the comment in the _register() method
407 return self._register(ov, pipe, finish_accept_pipe,
408 register=False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409
410 def connect_pipe(self, address):
411 ov = _overlapped.Overlapped(NULL)
412 ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700413
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100414 def finish_connect_pipe(err, handle, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415 # err, handle were arguments passed to PostQueuedCompletionStatus()
416 # in a function run in a thread pool.
417 if err == _overlapped.ERROR_SEM_TIMEOUT:
418 # Connection did not succeed within time limit.
419 msg = _overlapped.FormatMessage(err)
420 raise ConnectionRefusedError(0, msg, None, err)
421 elif err != 0:
422 msg = _overlapped.FormatMessage(err)
423 raise OSError(0, msg, None, err)
424 else:
425 return windows_utils.PipeHandle(handle)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700426
Victor Stinner15cc6782015-01-09 00:09:10 +0100427 return self._register(ov, None, finish_connect_pipe,
428 wait_for_post=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429
Guido van Rossum90fb9142013-10-30 14:44:05 -0700430 def wait_for_handle(self, handle, timeout=None):
Victor Stinner4d825b42014-12-19 17:10:44 +0100431 """Wait for a handle.
432
433 Return a Future object. The result of the future is True if the wait
434 completed, or False if the wait did not complete (on timeout).
435 """
Guido van Rossum90fb9142013-10-30 14:44:05 -0700436 if timeout is None:
437 ms = _winapi.INFINITE
438 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100439 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
440 # round away from zero to wait *at least* timeout seconds.
441 ms = math.ceil(timeout * 1e3)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700442
443 # We only create ov so we can use ov.address as a key for the cache.
444 ov = _overlapped.Overlapped(NULL)
445 wh = _overlapped.RegisterWaitWithQueue(
446 handle, self._iocp, ov.address, ms)
Victor Stinner313a9802014-07-29 12:58:23 +0200447 f = _WaitHandleFuture(self._iocp, ov, handle, wh, loop=self._loop)
448 if f._source_traceback:
449 del f._source_traceback[-1]
Guido van Rossum90fb9142013-10-30 14:44:05 -0700450
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100451 def finish_wait_for_handle(trans, key, ov):
Richard Oudkerk71196e72013-11-24 17:50:40 +0000452 # Note that this second wait means that we should only use
453 # this with handles types where a successful wait has no
454 # effect. So events or processes are all right, but locks
455 # or semaphores are not. Also note if the handle is
456 # signalled and then quickly reset, then we may return
457 # False even though we have not timed out.
Victor Stinner313a9802014-07-29 12:58:23 +0200458 return f._poll()
Guido van Rossum90fb9142013-10-30 14:44:05 -0700459
Victor Stinner313a9802014-07-29 12:58:23 +0200460 if f._poll():
461 try:
462 result = f._poll()
463 except OSError as exc:
464 f.set_exception(exc)
465 else:
466 f.set_result(result)
467
468 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700469 return f
470
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471 def _register_with_iocp(self, obj):
472 # To get notifications of finished ops on this objects sent to the
473 # completion port, were must register the handle.
474 if obj not in self._registered:
475 self._registered.add(obj)
476 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
477 # XXX We could also use SetFileCompletionNotificationModes()
478 # to avoid sending notifications to completion port of ops
479 # that succeed immediately.
480
Victor Stinner42d3bde2014-07-28 00:18:43 +0200481 def _register(self, ov, obj, callback,
482 wait_for_post=False, register=True):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483 # Return a future which will be set with the result of the
484 # operation when it completes. The future's value is actually
485 # the value returned by callback().
486 f = _OverlappedFuture(ov, loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200487 if f._source_traceback:
488 del f._source_traceback[-1]
Victor Stinner42d3bde2014-07-28 00:18:43 +0200489 if not ov.pending and not wait_for_post:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700490 # The operation has completed, so no need to postpone the
491 # work. We cannot take this short cut if we need the
492 # NumberOfBytes, CompletionKey values returned by
493 # PostQueuedCompletionStatus().
494 try:
495 value = callback(None, None, ov)
496 except OSError as e:
497 f.set_exception(e)
498 else:
499 f.set_result(value)
Victor Stinner42d3bde2014-07-28 00:18:43 +0200500 # Even if GetOverlappedResult() was called, we have to wait for the
501 # notification of the completion in GetQueuedCompletionStatus().
502 # Register the overlapped operation to keep a reference to the
503 # OVERLAPPED object, otherwise the memory is freed and Windows may
504 # read uninitialized memory.
505 #
506 # For an unknown reason, ConnectNamedPipe() behaves differently:
507 # the completion is not notified by GetOverlappedResult() if we
508 # already called GetOverlappedResult(). For this specific case, we
509 # don't expect notification (register is set to False).
510 else:
511 register = True
512 if register:
513 # Register the overlapped operation for later. Note that
514 # we only store obj to prevent it from being garbage
515 # collected too early.
516 self._cache[ov.address] = (f, ov, obj, callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700517 return f
518
519 def _get_accept_socket(self, family):
520 s = socket.socket(family)
521 s.settimeout(0)
522 return s
523
524 def _poll(self, timeout=None):
525 if timeout is None:
526 ms = INFINITE
527 elif timeout < 0:
528 raise ValueError("negative timeout")
529 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100530 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
531 # round away from zero to wait *at least* timeout seconds.
532 ms = math.ceil(timeout * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533 if ms >= INFINITE:
534 raise ValueError("timeout too big")
Victor Stinner313a9802014-07-29 12:58:23 +0200535
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700536 while True:
537 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
538 if status is None:
539 return
Victor Stinner313a9802014-07-29 12:58:23 +0200540 ms = 0
541
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542 err, transferred, key, address = status
543 try:
544 f, ov, obj, callback = self._cache.pop(address)
545 except KeyError:
Victor Stinner42d3bde2014-07-28 00:18:43 +0200546 if self._loop.get_debug():
547 self._loop.call_exception_handler({
548 'message': ('GetQueuedCompletionStatus() returned an '
549 'unexpected event'),
550 'status': ('err=%s transferred=%s key=%#x address=%#x'
551 % (err, transferred, key, address)),
552 })
553
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 # key is either zero, or it is used to return a pipe
555 # handle which should be closed to avoid a leak.
556 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
557 _winapi.CloseHandle(key)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700558 continue
Victor Stinner51e44ea2014-07-26 00:58:34 +0200559
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700560 if obj in self._stopped_serving:
561 f.cancel()
Victor Stinner42d3bde2014-07-28 00:18:43 +0200562 # Don't call the callback if _register() already read the result or
563 # if the overlapped has been cancelled
564 elif not f.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700565 try:
566 value = callback(transferred, key, ov)
567 except OSError as e:
568 f.set_exception(e)
569 self._results.append(f)
570 else:
571 f.set_result(value)
572 self._results.append(f)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700573
574 def _stop_serving(self, obj):
575 # obj is a socket or pipe handle. It will be closed in
576 # BaseProactorEventLoop._stop_serving() which will make any
577 # pending operations fail quickly.
578 self._stopped_serving.add(obj)
579
580 def close(self):
581 # Cancel remaining registered operations.
Victor Stinnerfea6a102014-07-25 00:54:53 +0200582 for address, (fut, ov, obj, callback) in list(self._cache.items()):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700583 if obj is None:
584 # The operation was started with connect_pipe() which
585 # queues a task to Windows' thread pool. This cannot
586 # be cancelled, so just forget it.
587 del self._cache[address]
Victor Stinner42d3bde2014-07-28 00:18:43 +0200588 # FIXME: Tulip issue 196: remove this case, it should not happen
589 elif fut.done() and not fut.cancelled():
590 del self._cache[address]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700591 else:
592 try:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200593 fut.cancel()
594 except OSError as exc:
595 if self._loop is not None:
596 context = {
597 'message': 'Cancelling a future failed',
598 'exception': exc,
599 'future': fut,
600 }
601 if fut._source_traceback:
602 context['source_traceback'] = fut._source_traceback
603 self._loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700604
605 while self._cache:
606 if not self._poll(1):
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700607 logger.debug('taking long time to close proactor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700608
609 self._results = []
610 if self._iocp is not None:
611 _winapi.CloseHandle(self._iocp)
612 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700613
Victor Stinnerfea6a102014-07-25 00:54:53 +0200614 def __del__(self):
615 self.close()
616
Guido van Rossum59691282013-10-30 14:52:03 -0700617
618class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
619
620 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
621 self._proc = windows_utils.Popen(
622 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
623 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700624
Guido van Rossum59691282013-10-30 14:52:03 -0700625 def callback(f):
626 returncode = self._proc.poll()
627 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700628
Guido van Rossum59691282013-10-30 14:52:03 -0700629 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
630 f.add_done_callback(callback)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800631
632
633SelectorEventLoop = _WindowsSelectorEventLoop
634
635
636class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
637 _loop_factory = SelectorEventLoop
638
639
640DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy