blob: 6045ba029e577af81406bb2f28a1d569316e27d6 [file] [log] [blame]
Yury Selivanovb0b0e622014-02-18 22:27:48 -05001"""Selector and proactor event loops for Windows."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Victor Stinnerf2e17682014-01-31 16:25:24 +01003import _winapi
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import errno
Victor Stinnerf2e17682014-01-31 16:25:24 +01005import math
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import socket
Victor Stinnerf2e17682014-01-31 16:25:24 +01007import struct
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010from . import events
Guido van Rossum59691282013-10-30 14:52:03 -070011from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012from . import futures
13from . import proactor_events
14from . import selector_events
15from . import tasks
16from . import windows_utils
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import _overlapped
Victor Stinnerf951d282014-06-29 00:46:45 +020018from .coroutines import coroutine
19from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080022__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
23 'DefaultEventLoopPolicy',
24 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
27NULL = 0
28INFINITE = 0xffffffff
29ERROR_CONNECTION_REFUSED = 1225
30ERROR_CONNECTION_ABORTED = 1236
31
Victor Stinner7ffa2c52015-01-22 22:55:08 +010032# Initial delay in seconds for connect_pipe() before retrying to connect
33CONNECT_PIPE_INIT_DELAY = 0.001
34
35# Maximum delay in seconds for connect_pipe() before retrying to connect
36CONNECT_PIPE_MAX_DELAY = 0.100
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038
39class _OverlappedFuture(futures.Future):
40 """Subclass of Future which represents an overlapped operation.
41
42 Cancelling it will immediately cancel the overlapped operation.
43 """
44
45 def __init__(self, ov, *, loop=None):
46 super().__init__(loop=loop)
Victor Stinnerfea6a102014-07-25 00:54:53 +020047 if self._source_traceback:
48 del self._source_traceback[-1]
Victor Stinner18a28dc2014-07-25 13:05:20 +020049 self._ov = ov
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
Victor Stinner313a9802014-07-29 12:58:23 +020051 def _repr_info(self):
52 info = super()._repr_info()
Victor Stinner18a28dc2014-07-25 13:05:20 +020053 if self._ov is not None:
54 state = 'pending' if self._ov.pending else 'completed'
Victor Stinner313a9802014-07-29 12:58:23 +020055 info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address))
56 return info
Victor Stinnere912e652014-07-12 03:11:53 +020057
Victor Stinner18a28dc2014-07-25 13:05:20 +020058 def _cancel_overlapped(self):
59 if self._ov is None:
60 return
61 try:
62 self._ov.cancel()
63 except OSError as exc:
64 context = {
65 'message': 'Cancelling an overlapped future failed',
66 'exception': exc,
67 'future': self,
68 }
69 if self._source_traceback:
70 context['source_traceback'] = self._source_traceback
71 self._loop.call_exception_handler(context)
72 self._ov = None
73
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 def cancel(self):
Victor Stinner18a28dc2014-07-25 13:05:20 +020075 self._cancel_overlapped()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 return super().cancel()
77
Victor Stinner18a28dc2014-07-25 13:05:20 +020078 def set_exception(self, exception):
79 super().set_exception(exception)
80 self._cancel_overlapped()
81
Victor Stinner51e44ea2014-07-26 00:58:34 +020082 def set_result(self, result):
83 super().set_result(result)
84 self._ov = None
85
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086
Victor Stinnerd0a28de2015-01-21 23:39:51 +010087class _BaseWaitHandleFuture(futures.Future):
Guido van Rossum90fb9142013-10-30 14:44:05 -070088 """Subclass of Future which represents a wait handle."""
89
Victor Stinnerd0a28de2015-01-21 23:39:51 +010090 def __init__(self, ov, handle, wait_handle, *, loop=None):
Guido van Rossum90fb9142013-10-30 14:44:05 -070091 super().__init__(loop=loop)
Victor Stinner313a9802014-07-29 12:58:23 +020092 if self._source_traceback:
93 del self._source_traceback[-1]
Victor Stinnerd0a28de2015-01-21 23:39:51 +010094 # Keep a reference to the Overlapped object to keep it alive until the
95 # wait is unregistered
Victor Stinner313a9802014-07-29 12:58:23 +020096 self._ov = ov
Victor Stinner18a28dc2014-07-25 13:05:20 +020097 self._handle = handle
Guido van Rossum90fb9142013-10-30 14:44:05 -070098 self._wait_handle = wait_handle
99
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100100 # Should we call UnregisterWaitEx() if the wait completes
101 # or is cancelled?
102 self._registered = True
103
Victor Stinner18a28dc2014-07-25 13:05:20 +0200104 def _poll(self):
105 # non-blocking wait: use a timeout of 0 millisecond
106 return (_winapi.WaitForSingleObject(self._handle, 0) ==
107 _winapi.WAIT_OBJECT_0)
108
Victor Stinner313a9802014-07-29 12:58:23 +0200109 def _repr_info(self):
110 info = super()._repr_info()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100111 info.append('handle=%#x' % self._handle)
112 if self._handle is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200113 state = 'signaled' if self._poll() else 'waiting'
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100114 info.append(state)
115 if self._wait_handle is not None:
116 info.append('wait_handle=%#x' % self._wait_handle)
Victor Stinner313a9802014-07-29 12:58:23 +0200117 return info
Victor Stinner18a28dc2014-07-25 13:05:20 +0200118
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100119 def _unregister_wait_cb(self, fut):
120 # The wait was unregistered: it's not safe to destroy the Overlapped
121 # object
122 self._ov = None
123
Victor Stinner313a9802014-07-29 12:58:23 +0200124 def _unregister_wait(self):
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100125 if not self._registered:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200126 return
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100127 self._registered = False
128
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100129 wait_handle = self._wait_handle
130 self._wait_handle = None
Guido van Rossum90fb9142013-10-30 14:44:05 -0700131 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100132 _overlapped.UnregisterWait(wait_handle)
Victor Stinnerb2614752014-08-25 23:20:52 +0200133 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100134 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerb2614752014-08-25 23:20:52 +0200135 context = {
136 'message': 'Failed to unregister the wait handle',
137 'exception': exc,
138 'future': self,
139 }
140 if self._source_traceback:
141 context['source_traceback'] = self._source_traceback
142 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100143 return
144 # ERROR_IO_PENDING means that the unregister is pending
145
146 self._unregister_wait_cb(None)
Victor Stinnerfea6a102014-07-25 00:54:53 +0200147
148 def cancel(self):
Victor Stinner313a9802014-07-29 12:58:23 +0200149 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100150 return super().cancel()
Victor Stinner313a9802014-07-29 12:58:23 +0200151
152 def set_exception(self, exception):
Victor Stinner313a9802014-07-29 12:58:23 +0200153 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100154 super().set_exception(exception)
Victor Stinner313a9802014-07-29 12:58:23 +0200155
156 def set_result(self, result):
Victor Stinner313a9802014-07-29 12:58:23 +0200157 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100158 super().set_result(result)
159
160
161class _WaitCancelFuture(_BaseWaitHandleFuture):
162 """Subclass of Future which represents a wait for the cancellation of a
163 _WaitHandleFuture using an event.
164 """
165
166 def __init__(self, ov, event, wait_handle, *, loop=None):
167 super().__init__(ov, event, wait_handle, loop=loop)
168
169 self._done_callback = None
170
Victor Stinner1ca93922015-01-22 00:17:54 +0100171 def cancel(self):
172 raise RuntimeError("_WaitCancelFuture must not be cancelled")
173
INADA Naokia8363622016-10-21 12:30:15 +0900174 def set_result(self, result):
175 super().set_result(result)
176 if self._done_callback is not None:
177 self._done_callback(self)
178
179 def set_exception(self, exception):
180 super().set_exception(exception)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100181 if self._done_callback is not None:
182 self._done_callback(self)
183
184
185class _WaitHandleFuture(_BaseWaitHandleFuture):
186 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
187 super().__init__(ov, handle, wait_handle, loop=loop)
188 self._proactor = proactor
189 self._unregister_proactor = True
190 self._event = _overlapped.CreateEvent(None, True, False, None)
191 self._event_fut = None
192
193 def _unregister_wait_cb(self, fut):
194 if self._event is not None:
195 _winapi.CloseHandle(self._event)
196 self._event = None
197 self._event_fut = None
198
199 # If the wait was cancelled, the wait may never be signalled, so
200 # it's required to unregister it. Otherwise, IocpProactor.close() will
201 # wait forever for an event which will never come.
202 #
203 # If the IocpProactor already received the event, it's safe to call
204 # _unregister() because we kept a reference to the Overlapped object
Martin Panter6245cb32016-04-15 02:14:19 +0000205 # which is used as a unique key.
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100206 self._proactor._unregister(self._ov)
207 self._proactor = None
208
209 super()._unregister_wait_cb(fut)
210
211 def _unregister_wait(self):
212 if not self._registered:
213 return
214 self._registered = False
215
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100216 wait_handle = self._wait_handle
217 self._wait_handle = None
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100218 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100219 _overlapped.UnregisterWaitEx(wait_handle, self._event)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100220 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100221 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100222 context = {
223 'message': 'Failed to unregister the wait handle',
224 'exception': exc,
225 'future': self,
226 }
227 if self._source_traceback:
228 context['source_traceback'] = self._source_traceback
229 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100230 return
231 # ERROR_IO_PENDING is not an error, the wait was unregistered
232
233 self._event_fut = self._proactor._wait_cancel(self._event,
234 self._unregister_wait_cb)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700235
236
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700237class PipeServer(object):
238 """Class representing a pipe server.
239
240 This is much like a bound, listening socket.
241 """
242 def __init__(self, address):
243 self._address = address
244 self._free_instances = weakref.WeakSet()
Victor Stinnerb2614752014-08-25 23:20:52 +0200245 # initialize the pipe attribute before calling _server_pipe_handle()
246 # because this function can raise an exception and the destructor calls
247 # the close() method
248 self._pipe = None
249 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250 self._pipe = self._server_pipe_handle(True)
251
252 def _get_unconnected_pipe(self):
253 # Create new instance and return previous one. This ensures
254 # that (until the server is closed) there is always at least
255 # one pipe handle for address. Therefore if a client attempt
256 # to connect it will not fail with FileNotFoundError.
257 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
258 return tmp
259
260 def _server_pipe_handle(self, first):
261 # Return a wrapper for a new pipe handle.
Victor Stinnera19b7b32015-01-26 15:03:20 +0100262 if self.closed():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263 return None
264 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
265 if first:
266 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
267 h = _winapi.CreateNamedPipe(
268 self._address, flags,
269 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
270 _winapi.PIPE_WAIT,
271 _winapi.PIPE_UNLIMITED_INSTANCES,
272 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
273 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
274 pipe = windows_utils.PipeHandle(h)
275 self._free_instances.add(pipe)
276 return pipe
277
Victor Stinnera19b7b32015-01-26 15:03:20 +0100278 def closed(self):
279 return (self._address is None)
280
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281 def close(self):
Victor Stinnerb2614752014-08-25 23:20:52 +0200282 if self._accept_pipe_future is not None:
283 self._accept_pipe_future.cancel()
284 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285 # Close all instances which have not been connected to by a client.
286 if self._address is not None:
287 for pipe in self._free_instances:
288 pipe.close()
289 self._pipe = None
290 self._address = None
291 self._free_instances.clear()
292
293 __del__ = close
294
295
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800296class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 """Windows version of selector event loop."""
298
299 def _socketpair(self):
300 return windows_utils.socketpair()
301
302
303class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
304 """Windows version of proactor event loop using IOCP."""
305
306 def __init__(self, proactor=None):
307 if proactor is None:
308 proactor = IocpProactor()
309 super().__init__(proactor)
310
311 def _socketpair(self):
312 return windows_utils.socketpair()
313
Victor Stinnerf951d282014-06-29 00:46:45 +0200314 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315 def create_pipe_connection(self, protocol_factory, address):
316 f = self._proactor.connect_pipe(address)
317 pipe = yield from f
318 protocol = protocol_factory()
319 trans = self._make_duplex_pipe_transport(pipe, protocol,
320 extra={'addr': address})
321 return trans, protocol
322
Victor Stinnerf951d282014-06-29 00:46:45 +0200323 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324 def start_serving_pipe(self, protocol_factory, address):
325 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700326
Victor Stinnerb2614752014-08-25 23:20:52 +0200327 def loop_accept_pipe(f=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328 pipe = None
329 try:
330 if f:
331 pipe = f.result()
332 server._free_instances.discard(pipe)
Victor Stinnera19b7b32015-01-26 15:03:20 +0100333
334 if server.closed():
335 # A client connected before the server was closed:
336 # drop the client (close the pipe) and exit
337 pipe.close()
338 return
339
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 protocol = protocol_factory()
341 self._make_duplex_pipe_transport(
342 pipe, protocol, extra={'addr': address})
Victor Stinnera19b7b32015-01-26 15:03:20 +0100343
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344 pipe = server._get_unconnected_pipe()
345 if pipe is None:
346 return
Victor Stinnera19b7b32015-01-26 15:03:20 +0100347
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 f = self._proactor.accept_pipe(pipe)
Yury Selivanovff827f02014-02-18 18:02:19 -0500349 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350 if pipe and pipe.fileno() != -1:
Yury Selivanovff827f02014-02-18 18:02:19 -0500351 self.call_exception_handler({
352 'message': 'Pipe accept failed',
353 'exception': exc,
354 'pipe': pipe,
355 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356 pipe.close()
Victor Stinnerb2614752014-08-25 23:20:52 +0200357 elif self._debug:
358 logger.warning("Accept pipe failed on pipe %r",
359 pipe, exc_info=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 except futures.CancelledError:
361 if pipe:
362 pipe.close()
363 else:
Victor Stinnerb2614752014-08-25 23:20:52 +0200364 server._accept_pipe_future = f
365 f.add_done_callback(loop_accept_pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700366
Victor Stinnerb2614752014-08-25 23:20:52 +0200367 self.call_soon(loop_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 return [server]
369
Victor Stinnerf951d282014-06-29 00:46:45 +0200370 @coroutine
Guido van Rossum59691282013-10-30 14:52:03 -0700371 def _make_subprocess_transport(self, protocol, args, shell,
372 stdin, stdout, stderr, bufsize,
373 extra=None, **kwargs):
Yury Selivanov7661db62016-05-16 15:38:39 -0400374 waiter = self.create_future()
Guido van Rossum59691282013-10-30 14:52:03 -0700375 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
376 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100377 waiter=waiter, extra=extra,
378 **kwargs)
Victor Stinner4bf22e02015-01-15 14:24:22 +0100379 try:
Victor Stinner47cd10d2015-01-30 00:05:19 +0100380 yield from waiter
Victor Stinner5d44c082015-02-02 18:36:31 +0100381 except Exception as exc:
382 # Workaround CPython bug #23353: using yield/yield-from in an
383 # except block of a generator doesn't clear properly sys.exc_info()
384 err = exc
385 else:
386 err = None
387
388 if err is not None:
Victor Stinner4bf22e02015-01-15 14:24:22 +0100389 transp.close()
Victor Stinner1241ecc2015-01-30 00:16:14 +0100390 yield from transp._wait()
Victor Stinner5d44c082015-02-02 18:36:31 +0100391 raise err
Victor Stinner4bf22e02015-01-15 14:24:22 +0100392
Guido van Rossum59691282013-10-30 14:52:03 -0700393 return transp
394
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395
396class IocpProactor:
397 """Proactor implementation using IOCP."""
398
399 def __init__(self, concurrency=0xffffffff):
400 self._loop = None
401 self._results = []
402 self._iocp = _overlapped.CreateIoCompletionPort(
403 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
404 self._cache = {}
405 self._registered = weakref.WeakSet()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100406 self._unregistered = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407 self._stopped_serving = weakref.WeakSet()
408
Victor Stinnerfea6a102014-07-25 00:54:53 +0200409 def __repr__(self):
410 return ('<%s overlapped#=%s result#=%s>'
411 % (self.__class__.__name__, len(self._cache),
412 len(self._results)))
413
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414 def set_loop(self, loop):
415 self._loop = loop
416
417 def select(self, timeout=None):
418 if not self._results:
419 self._poll(timeout)
420 tmp = self._results
421 self._results = []
422 return tmp
423
Victor Stinner41063d22015-01-26 22:30:49 +0100424 def _result(self, value):
Yury Selivanov7661db62016-05-16 15:38:39 -0400425 fut = self._loop.create_future()
Victor Stinner41063d22015-01-26 22:30:49 +0100426 fut.set_result(value)
427 return fut
428
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 def recv(self, conn, nbytes, flags=0):
430 self._register_with_iocp(conn)
431 ov = _overlapped.Overlapped(NULL)
Victor Stinner41063d22015-01-26 22:30:49 +0100432 try:
433 if isinstance(conn, socket.socket):
434 ov.WSARecv(conn.fileno(), nbytes, flags)
435 else:
436 ov.ReadFile(conn.fileno(), nbytes)
437 except BrokenPipeError:
438 return self._result(b'')
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700439
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100440 def finish_recv(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441 try:
442 return ov.getresult()
443 except OSError as exc:
444 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
445 raise ConnectionResetError(*exc.args)
446 else:
447 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700448
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100449 return self._register(ov, conn, finish_recv)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200451 def recv_into(self, conn, buf, flags=0):
452 self._register_with_iocp(conn)
453 ov = _overlapped.Overlapped(NULL)
454 try:
455 if isinstance(conn, socket.socket):
456 ov.WSARecvInto(conn.fileno(), buf, flags)
457 else:
458 ov.ReadFileInto(conn.fileno(), buf)
459 except BrokenPipeError:
460 return self._result(b'')
461
462 def finish_recv(trans, key, ov):
463 try:
464 return ov.getresult()
465 except OSError as exc:
466 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
467 raise ConnectionResetError(*exc.args)
468 else:
469 raise
470
471 return self._register(ov, conn, finish_recv)
472
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473 def send(self, conn, buf, flags=0):
474 self._register_with_iocp(conn)
475 ov = _overlapped.Overlapped(NULL)
476 if isinstance(conn, socket.socket):
477 ov.WSASend(conn.fileno(), buf, flags)
478 else:
479 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700480
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100481 def finish_send(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482 try:
483 return ov.getresult()
484 except OSError as exc:
485 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
486 raise ConnectionResetError(*exc.args)
487 else:
488 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700489
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100490 return self._register(ov, conn, finish_send)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700491
492 def accept(self, listener):
493 self._register_with_iocp(listener)
494 conn = self._get_accept_socket(listener.family)
495 ov = _overlapped.Overlapped(NULL)
496 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700497
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 def finish_accept(trans, key, ov):
499 ov.getresult()
500 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
501 buf = struct.pack('@P', listener.fileno())
502 conn.setsockopt(socket.SOL_SOCKET,
503 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
504 conn.settimeout(listener.gettimeout())
505 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700506
Victor Stinnerf951d282014-06-29 00:46:45 +0200507 @coroutine
Victor Stinner7de26462014-01-11 00:03:21 +0100508 def accept_coro(future, conn):
509 # Coroutine closing the accept socket if the future is cancelled
510 try:
511 yield from future
512 except futures.CancelledError:
513 conn.close()
514 raise
515
516 future = self._register(ov, listener, finish_accept)
517 coro = accept_coro(future, conn)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400518 tasks.ensure_future(coro, loop=self._loop)
Victor Stinner7de26462014-01-11 00:03:21 +0100519 return future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700520
521 def connect(self, conn, address):
522 self._register_with_iocp(conn)
523 # The socket needs to be locally bound before we call ConnectEx().
524 try:
525 _overlapped.BindLocal(conn.fileno(), conn.family)
526 except OSError as e:
527 if e.winerror != errno.WSAEINVAL:
528 raise
529 # Probably already locally bound; check using getsockname().
530 if conn.getsockname()[1] == 0:
531 raise
532 ov = _overlapped.Overlapped(NULL)
533 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700534
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700535 def finish_connect(trans, key, ov):
536 ov.getresult()
537 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
538 conn.setsockopt(socket.SOL_SOCKET,
539 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
540 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700541
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542 return self._register(ov, conn, finish_connect)
543
544 def accept_pipe(self, pipe):
545 self._register_with_iocp(pipe)
546 ov = _overlapped.Overlapped(NULL)
Victor Stinner2b77c542015-01-22 23:50:03 +0100547 connected = ov.ConnectNamedPipe(pipe.fileno())
548
549 if connected:
550 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
551 # that the pipe is connected. There is no need to wait for the
552 # completion of the connection.
Victor Stinner41063d22015-01-26 22:30:49 +0100553 return self._result(pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700554
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100555 def finish_accept_pipe(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556 ov.getresult()
557 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700558
Victor Stinner2b77c542015-01-22 23:50:03 +0100559 return self._register(ov, pipe, finish_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700560
Victor Stinnere0fd1572015-01-26 15:04:03 +0100561 @coroutine
Victor Stinner7ffa2c52015-01-22 22:55:08 +0100562 def connect_pipe(self, address):
Victor Stinnere0fd1572015-01-26 15:04:03 +0100563 delay = CONNECT_PIPE_INIT_DELAY
564 while True:
565 # Unfortunately there is no way to do an overlapped connect to a pipe.
566 # Call CreateFile() in a loop until it doesn't fail with
567 # ERROR_PIPE_BUSY
568 try:
569 handle = _overlapped.ConnectPipe(address)
570 break
571 except OSError as exc:
572 if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
573 raise
574
575 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
576 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
577 yield from tasks.sleep(delay, loop=self._loop)
578
579 return windows_utils.PipeHandle(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700580
Guido van Rossum90fb9142013-10-30 14:44:05 -0700581 def wait_for_handle(self, handle, timeout=None):
Victor Stinner4d825b42014-12-19 17:10:44 +0100582 """Wait for a handle.
583
584 Return a Future object. The result of the future is True if the wait
585 completed, or False if the wait did not complete (on timeout).
586 """
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100587 return self._wait_for_handle(handle, timeout, False)
588
589 def _wait_cancel(self, event, done_callback):
590 fut = self._wait_for_handle(event, None, True)
591 # add_done_callback() cannot be used because the wait may only complete
592 # in IocpProactor.close(), while the event loop is not running.
593 fut._done_callback = done_callback
594 return fut
595
596 def _wait_for_handle(self, handle, timeout, _is_cancel):
Guido van Rossum90fb9142013-10-30 14:44:05 -0700597 if timeout is None:
598 ms = _winapi.INFINITE
599 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100600 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
601 # round away from zero to wait *at least* timeout seconds.
602 ms = math.ceil(timeout * 1e3)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700603
604 # We only create ov so we can use ov.address as a key for the cache.
605 ov = _overlapped.Overlapped(NULL)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100606 wait_handle = _overlapped.RegisterWaitWithQueue(
Guido van Rossum90fb9142013-10-30 14:44:05 -0700607 handle, self._iocp, ov.address, ms)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100608 if _is_cancel:
609 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
610 else:
611 f = _WaitHandleFuture(ov, handle, wait_handle, self,
612 loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200613 if f._source_traceback:
614 del f._source_traceback[-1]
Guido van Rossum90fb9142013-10-30 14:44:05 -0700615
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100616 def finish_wait_for_handle(trans, key, ov):
Richard Oudkerk71196e72013-11-24 17:50:40 +0000617 # Note that this second wait means that we should only use
618 # this with handles types where a successful wait has no
619 # effect. So events or processes are all right, but locks
620 # or semaphores are not. Also note if the handle is
621 # signalled and then quickly reset, then we may return
622 # False even though we have not timed out.
Victor Stinner313a9802014-07-29 12:58:23 +0200623 return f._poll()
Guido van Rossum90fb9142013-10-30 14:44:05 -0700624
Victor Stinner313a9802014-07-29 12:58:23 +0200625 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700626 return f
627
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628 def _register_with_iocp(self, obj):
629 # To get notifications of finished ops on this objects sent to the
630 # completion port, were must register the handle.
631 if obj not in self._registered:
632 self._registered.add(obj)
633 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
634 # XXX We could also use SetFileCompletionNotificationModes()
635 # to avoid sending notifications to completion port of ops
636 # that succeed immediately.
637
Victor Stinner2b77c542015-01-22 23:50:03 +0100638 def _register(self, ov, obj, callback):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700639 # Return a future which will be set with the result of the
640 # operation when it completes. The future's value is actually
641 # the value returned by callback().
642 f = _OverlappedFuture(ov, loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200643 if f._source_traceback:
644 del f._source_traceback[-1]
Victor Stinner2b77c542015-01-22 23:50:03 +0100645 if not ov.pending:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700646 # The operation has completed, so no need to postpone the
647 # work. We cannot take this short cut if we need the
648 # NumberOfBytes, CompletionKey values returned by
649 # PostQueuedCompletionStatus().
650 try:
651 value = callback(None, None, ov)
652 except OSError as e:
653 f.set_exception(e)
654 else:
655 f.set_result(value)
Victor Stinner42d3bde2014-07-28 00:18:43 +0200656 # Even if GetOverlappedResult() was called, we have to wait for the
657 # notification of the completion in GetQueuedCompletionStatus().
658 # Register the overlapped operation to keep a reference to the
659 # OVERLAPPED object, otherwise the memory is freed and Windows may
660 # read uninitialized memory.
Victor Stinner2b77c542015-01-22 23:50:03 +0100661
662 # Register the overlapped operation for later. Note that
663 # we only store obj to prevent it from being garbage
664 # collected too early.
665 self._cache[ov.address] = (f, ov, obj, callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700666 return f
667
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100668 def _unregister(self, ov):
669 """Unregister an overlapped object.
670
671 Call this method when its future has been cancelled. The event can
672 already be signalled (pending in the proactor event queue). It is also
673 safe if the event is never signalled (because it was cancelled).
674 """
675 self._unregistered.append(ov)
676
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700677 def _get_accept_socket(self, family):
678 s = socket.socket(family)
679 s.settimeout(0)
680 return s
681
682 def _poll(self, timeout=None):
683 if timeout is None:
684 ms = INFINITE
685 elif timeout < 0:
686 raise ValueError("negative timeout")
687 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100688 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
689 # round away from zero to wait *at least* timeout seconds.
690 ms = math.ceil(timeout * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700691 if ms >= INFINITE:
692 raise ValueError("timeout too big")
Victor Stinner313a9802014-07-29 12:58:23 +0200693
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700694 while True:
695 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
696 if status is None:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100697 break
Victor Stinner313a9802014-07-29 12:58:23 +0200698 ms = 0
699
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700700 err, transferred, key, address = status
701 try:
702 f, ov, obj, callback = self._cache.pop(address)
703 except KeyError:
Victor Stinner42d3bde2014-07-28 00:18:43 +0200704 if self._loop.get_debug():
705 self._loop.call_exception_handler({
706 'message': ('GetQueuedCompletionStatus() returned an '
707 'unexpected event'),
708 'status': ('err=%s transferred=%s key=%#x address=%#x'
709 % (err, transferred, key, address)),
710 })
711
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700712 # key is either zero, or it is used to return a pipe
713 # handle which should be closed to avoid a leak.
714 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
715 _winapi.CloseHandle(key)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700716 continue
Victor Stinner51e44ea2014-07-26 00:58:34 +0200717
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700718 if obj in self._stopped_serving:
719 f.cancel()
Victor Stinner42d3bde2014-07-28 00:18:43 +0200720 # Don't call the callback if _register() already read the result or
721 # if the overlapped has been cancelled
722 elif not f.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700723 try:
724 value = callback(transferred, key, ov)
725 except OSError as e:
726 f.set_exception(e)
727 self._results.append(f)
728 else:
729 f.set_result(value)
730 self._results.append(f)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700731
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100732 # Remove unregisted futures
733 for ov in self._unregistered:
734 self._cache.pop(ov.address, None)
735 self._unregistered.clear()
736
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700737 def _stop_serving(self, obj):
738 # obj is a socket or pipe handle. It will be closed in
739 # BaseProactorEventLoop._stop_serving() which will make any
740 # pending operations fail quickly.
741 self._stopped_serving.add(obj)
742
743 def close(self):
744 # Cancel remaining registered operations.
Victor Stinnerfea6a102014-07-25 00:54:53 +0200745 for address, (fut, ov, obj, callback) in list(self._cache.items()):
Victor Stinner3d2256f2015-01-26 11:02:59 +0100746 if fut.cancelled():
Victor Stinner752aba72015-01-22 22:47:13 +0100747 # Nothing to do with cancelled futures
748 pass
Victor Stinner1ca93922015-01-22 00:17:54 +0100749 elif isinstance(fut, _WaitCancelFuture):
750 # _WaitCancelFuture must not be cancelled
751 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700752 else:
753 try:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200754 fut.cancel()
755 except OSError as exc:
756 if self._loop is not None:
757 context = {
758 'message': 'Cancelling a future failed',
759 'exception': exc,
760 'future': fut,
761 }
762 if fut._source_traceback:
763 context['source_traceback'] = fut._source_traceback
764 self._loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700765
766 while self._cache:
767 if not self._poll(1):
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700768 logger.debug('taking long time to close proactor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700769
770 self._results = []
771 if self._iocp is not None:
772 _winapi.CloseHandle(self._iocp)
773 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700774
Victor Stinnerfea6a102014-07-25 00:54:53 +0200775 def __del__(self):
776 self.close()
777
Guido van Rossum59691282013-10-30 14:52:03 -0700778
779class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
780
781 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
782 self._proc = windows_utils.Popen(
783 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
784 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700785
Guido van Rossum59691282013-10-30 14:52:03 -0700786 def callback(f):
787 returncode = self._proc.poll()
788 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700789
Guido van Rossum59691282013-10-30 14:52:03 -0700790 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
791 f.add_done_callback(callback)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800792
793
794SelectorEventLoop = _WindowsSelectorEventLoop
795
796
797class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
798 _loop_factory = SelectorEventLoop
799
800
801DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy