blob: 95b12a11a624bc76c8d5b68542d42d889e55dda2 [file] [log] [blame]
Yury Selivanovb0b0e622014-02-18 22:27:48 -05001"""Selector and proactor event loops for Windows."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Victor Stinner4271dfd2017-11-28 15:19:56 +01003import _overlapped
Victor Stinnerf2e17682014-01-31 16:25:24 +01004import _winapi
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import errno
Victor Stinnerf2e17682014-01-31 16:25:24 +01006import math
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007import socket
Victor Stinnerf2e17682014-01-31 16:25:24 +01008import struct
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011from . import events
Guido van Rossum59691282013-10-30 14:52:03 -070012from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013from . import futures
14from . import proactor_events
15from . import selector_events
16from . import tasks
17from . import windows_utils
Victor Stinnerf951d282014-06-29 00:46:45 +020018from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019
20
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080021__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
22 'DefaultEventLoopPolicy',
23 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024
25
26NULL = 0
27INFINITE = 0xffffffff
28ERROR_CONNECTION_REFUSED = 1225
29ERROR_CONNECTION_ABORTED = 1236
30
Victor Stinner7ffa2c52015-01-22 22:55:08 +010031# Initial delay in seconds for connect_pipe() before retrying to connect
32CONNECT_PIPE_INIT_DELAY = 0.001
33
34# Maximum delay in seconds for connect_pipe() before retrying to connect
35CONNECT_PIPE_MAX_DELAY = 0.100
36
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037
38class _OverlappedFuture(futures.Future):
39 """Subclass of Future which represents an overlapped operation.
40
41 Cancelling it will immediately cancel the overlapped operation.
42 """
43
44 def __init__(self, ov, *, loop=None):
45 super().__init__(loop=loop)
Victor Stinnerfea6a102014-07-25 00:54:53 +020046 if self._source_traceback:
47 del self._source_traceback[-1]
Victor Stinner18a28dc2014-07-25 13:05:20 +020048 self._ov = ov
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070049
Victor Stinner313a9802014-07-29 12:58:23 +020050 def _repr_info(self):
51 info = super()._repr_info()
Victor Stinner18a28dc2014-07-25 13:05:20 +020052 if self._ov is not None:
53 state = 'pending' if self._ov.pending else 'completed'
Victor Stinner313a9802014-07-29 12:58:23 +020054 info.insert(1, 'overlapped=<%s, %#x>' % (state, self._ov.address))
55 return info
Victor Stinnere912e652014-07-12 03:11:53 +020056
Victor Stinner18a28dc2014-07-25 13:05:20 +020057 def _cancel_overlapped(self):
58 if self._ov is None:
59 return
60 try:
61 self._ov.cancel()
62 except OSError as exc:
63 context = {
64 'message': 'Cancelling an overlapped future failed',
65 'exception': exc,
66 'future': self,
67 }
68 if self._source_traceback:
69 context['source_traceback'] = self._source_traceback
70 self._loop.call_exception_handler(context)
71 self._ov = None
72
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070073 def cancel(self):
Victor Stinner18a28dc2014-07-25 13:05:20 +020074 self._cancel_overlapped()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 return super().cancel()
76
Victor Stinner18a28dc2014-07-25 13:05:20 +020077 def set_exception(self, exception):
78 super().set_exception(exception)
79 self._cancel_overlapped()
80
Victor Stinner51e44ea2014-07-26 00:58:34 +020081 def set_result(self, result):
82 super().set_result(result)
83 self._ov = None
84
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085
Victor Stinnerd0a28de2015-01-21 23:39:51 +010086class _BaseWaitHandleFuture(futures.Future):
Guido van Rossum90fb9142013-10-30 14:44:05 -070087 """Subclass of Future which represents a wait handle."""
88
Victor Stinnerd0a28de2015-01-21 23:39:51 +010089 def __init__(self, ov, handle, wait_handle, *, loop=None):
Guido van Rossum90fb9142013-10-30 14:44:05 -070090 super().__init__(loop=loop)
Victor Stinner313a9802014-07-29 12:58:23 +020091 if self._source_traceback:
92 del self._source_traceback[-1]
Victor Stinnerd0a28de2015-01-21 23:39:51 +010093 # Keep a reference to the Overlapped object to keep it alive until the
94 # wait is unregistered
Victor Stinner313a9802014-07-29 12:58:23 +020095 self._ov = ov
Victor Stinner18a28dc2014-07-25 13:05:20 +020096 self._handle = handle
Guido van Rossum90fb9142013-10-30 14:44:05 -070097 self._wait_handle = wait_handle
98
Victor Stinnerd0a28de2015-01-21 23:39:51 +010099 # Should we call UnregisterWaitEx() if the wait completes
100 # or is cancelled?
101 self._registered = True
102
Victor Stinner18a28dc2014-07-25 13:05:20 +0200103 def _poll(self):
104 # non-blocking wait: use a timeout of 0 millisecond
105 return (_winapi.WaitForSingleObject(self._handle, 0) ==
106 _winapi.WAIT_OBJECT_0)
107
Victor Stinner313a9802014-07-29 12:58:23 +0200108 def _repr_info(self):
109 info = super()._repr_info()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100110 info.append('handle=%#x' % self._handle)
111 if self._handle is not None:
Victor Stinner313a9802014-07-29 12:58:23 +0200112 state = 'signaled' if self._poll() else 'waiting'
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100113 info.append(state)
114 if self._wait_handle is not None:
115 info.append('wait_handle=%#x' % self._wait_handle)
Victor Stinner313a9802014-07-29 12:58:23 +0200116 return info
Victor Stinner18a28dc2014-07-25 13:05:20 +0200117
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100118 def _unregister_wait_cb(self, fut):
119 # The wait was unregistered: it's not safe to destroy the Overlapped
120 # object
121 self._ov = None
122
Victor Stinner313a9802014-07-29 12:58:23 +0200123 def _unregister_wait(self):
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100124 if not self._registered:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200125 return
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100126 self._registered = False
127
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100128 wait_handle = self._wait_handle
129 self._wait_handle = None
Guido van Rossum90fb9142013-10-30 14:44:05 -0700130 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100131 _overlapped.UnregisterWait(wait_handle)
Victor Stinnerb2614752014-08-25 23:20:52 +0200132 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100133 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerb2614752014-08-25 23:20:52 +0200134 context = {
135 'message': 'Failed to unregister the wait handle',
136 'exception': exc,
137 'future': self,
138 }
139 if self._source_traceback:
140 context['source_traceback'] = self._source_traceback
141 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100142 return
143 # ERROR_IO_PENDING means that the unregister is pending
144
145 self._unregister_wait_cb(None)
Victor Stinnerfea6a102014-07-25 00:54:53 +0200146
147 def cancel(self):
Victor Stinner313a9802014-07-29 12:58:23 +0200148 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100149 return super().cancel()
Victor Stinner313a9802014-07-29 12:58:23 +0200150
151 def set_exception(self, exception):
Victor Stinner313a9802014-07-29 12:58:23 +0200152 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100153 super().set_exception(exception)
Victor Stinner313a9802014-07-29 12:58:23 +0200154
155 def set_result(self, result):
Victor Stinner313a9802014-07-29 12:58:23 +0200156 self._unregister_wait()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100157 super().set_result(result)
158
159
160class _WaitCancelFuture(_BaseWaitHandleFuture):
161 """Subclass of Future which represents a wait for the cancellation of a
162 _WaitHandleFuture using an event.
163 """
164
165 def __init__(self, ov, event, wait_handle, *, loop=None):
166 super().__init__(ov, event, wait_handle, loop=loop)
167
168 self._done_callback = None
169
Victor Stinner1ca93922015-01-22 00:17:54 +0100170 def cancel(self):
171 raise RuntimeError("_WaitCancelFuture must not be cancelled")
172
INADA Naokia8363622016-10-21 12:30:15 +0900173 def set_result(self, result):
174 super().set_result(result)
175 if self._done_callback is not None:
176 self._done_callback(self)
177
178 def set_exception(self, exception):
179 super().set_exception(exception)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100180 if self._done_callback is not None:
181 self._done_callback(self)
182
183
184class _WaitHandleFuture(_BaseWaitHandleFuture):
185 def __init__(self, ov, handle, wait_handle, proactor, *, loop=None):
186 super().__init__(ov, handle, wait_handle, loop=loop)
187 self._proactor = proactor
188 self._unregister_proactor = True
189 self._event = _overlapped.CreateEvent(None, True, False, None)
190 self._event_fut = None
191
192 def _unregister_wait_cb(self, fut):
193 if self._event is not None:
194 _winapi.CloseHandle(self._event)
195 self._event = None
196 self._event_fut = None
197
198 # If the wait was cancelled, the wait may never be signalled, so
199 # it's required to unregister it. Otherwise, IocpProactor.close() will
200 # wait forever for an event which will never come.
201 #
202 # If the IocpProactor already received the event, it's safe to call
203 # _unregister() because we kept a reference to the Overlapped object
Martin Panter6245cb32016-04-15 02:14:19 +0000204 # which is used as a unique key.
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100205 self._proactor._unregister(self._ov)
206 self._proactor = None
207
208 super()._unregister_wait_cb(fut)
209
210 def _unregister_wait(self):
211 if not self._registered:
212 return
213 self._registered = False
214
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100215 wait_handle = self._wait_handle
216 self._wait_handle = None
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100217 try:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100218 _overlapped.UnregisterWaitEx(wait_handle, self._event)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100219 except OSError as exc:
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100220 if exc.winerror != _overlapped.ERROR_IO_PENDING:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100221 context = {
222 'message': 'Failed to unregister the wait handle',
223 'exception': exc,
224 'future': self,
225 }
226 if self._source_traceback:
227 context['source_traceback'] = self._source_traceback
228 self._loop.call_exception_handler(context)
Victor Stinner24dfa3c2015-01-26 22:30:28 +0100229 return
230 # ERROR_IO_PENDING is not an error, the wait was unregistered
231
232 self._event_fut = self._proactor._wait_cancel(self._event,
233 self._unregister_wait_cb)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700234
235
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700236class PipeServer(object):
237 """Class representing a pipe server.
238
239 This is much like a bound, listening socket.
240 """
241 def __init__(self, address):
242 self._address = address
243 self._free_instances = weakref.WeakSet()
Victor Stinnerb2614752014-08-25 23:20:52 +0200244 # initialize the pipe attribute before calling _server_pipe_handle()
245 # because this function can raise an exception and the destructor calls
246 # the close() method
247 self._pipe = None
248 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700249 self._pipe = self._server_pipe_handle(True)
250
251 def _get_unconnected_pipe(self):
252 # Create new instance and return previous one. This ensures
253 # that (until the server is closed) there is always at least
254 # one pipe handle for address. Therefore if a client attempt
255 # to connect it will not fail with FileNotFoundError.
256 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
257 return tmp
258
259 def _server_pipe_handle(self, first):
260 # Return a wrapper for a new pipe handle.
Victor Stinnera19b7b32015-01-26 15:03:20 +0100261 if self.closed():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700262 return None
263 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
264 if first:
265 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
266 h = _winapi.CreateNamedPipe(
267 self._address, flags,
268 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
269 _winapi.PIPE_WAIT,
270 _winapi.PIPE_UNLIMITED_INSTANCES,
271 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
272 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
273 pipe = windows_utils.PipeHandle(h)
274 self._free_instances.add(pipe)
275 return pipe
276
Victor Stinnera19b7b32015-01-26 15:03:20 +0100277 def closed(self):
278 return (self._address is None)
279
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700280 def close(self):
Victor Stinnerb2614752014-08-25 23:20:52 +0200281 if self._accept_pipe_future is not None:
282 self._accept_pipe_future.cancel()
283 self._accept_pipe_future = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284 # Close all instances which have not been connected to by a client.
285 if self._address is not None:
286 for pipe in self._free_instances:
287 pipe.close()
288 self._pipe = None
289 self._address = None
290 self._free_instances.clear()
291
292 __del__ = close
293
294
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800295class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296 """Windows version of selector event loop."""
297
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298
299class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
300 """Windows version of proactor event loop using IOCP."""
301
302 def __init__(self, proactor=None):
303 if proactor is None:
304 proactor = IocpProactor()
305 super().__init__(proactor)
306
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200307 async def create_pipe_connection(self, protocol_factory, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308 f = self._proactor.connect_pipe(address)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200309 pipe = await f
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700310 protocol = protocol_factory()
311 trans = self._make_duplex_pipe_transport(pipe, protocol,
312 extra={'addr': address})
313 return trans, protocol
314
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200315 async def start_serving_pipe(self, protocol_factory, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700317
Victor Stinnerb2614752014-08-25 23:20:52 +0200318 def loop_accept_pipe(f=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319 pipe = None
320 try:
321 if f:
322 pipe = f.result()
323 server._free_instances.discard(pipe)
Victor Stinnera19b7b32015-01-26 15:03:20 +0100324
325 if server.closed():
326 # A client connected before the server was closed:
327 # drop the client (close the pipe) and exit
328 pipe.close()
329 return
330
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 protocol = protocol_factory()
332 self._make_duplex_pipe_transport(
333 pipe, protocol, extra={'addr': address})
Victor Stinnera19b7b32015-01-26 15:03:20 +0100334
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 pipe = server._get_unconnected_pipe()
336 if pipe is None:
337 return
Victor Stinnera19b7b32015-01-26 15:03:20 +0100338
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 f = self._proactor.accept_pipe(pipe)
Yury Selivanovff827f02014-02-18 18:02:19 -0500340 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700341 if pipe and pipe.fileno() != -1:
Yury Selivanovff827f02014-02-18 18:02:19 -0500342 self.call_exception_handler({
343 'message': 'Pipe accept failed',
344 'exception': exc,
345 'pipe': pipe,
346 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 pipe.close()
Victor Stinnerb2614752014-08-25 23:20:52 +0200348 elif self._debug:
349 logger.warning("Accept pipe failed on pipe %r",
350 pipe, exc_info=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 except futures.CancelledError:
352 if pipe:
353 pipe.close()
354 else:
Victor Stinnerb2614752014-08-25 23:20:52 +0200355 server._accept_pipe_future = f
356 f.add_done_callback(loop_accept_pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700357
Victor Stinnerb2614752014-08-25 23:20:52 +0200358 self.call_soon(loop_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359 return [server]
360
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200361 async def _make_subprocess_transport(self, protocol, args, shell,
362 stdin, stdout, stderr, bufsize,
363 extra=None, **kwargs):
Yury Selivanov7661db62016-05-16 15:38:39 -0400364 waiter = self.create_future()
Guido van Rossum59691282013-10-30 14:52:03 -0700365 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
366 stdin, stdout, stderr, bufsize,
Victor Stinner47cd10d2015-01-30 00:05:19 +0100367 waiter=waiter, extra=extra,
368 **kwargs)
Victor Stinner4bf22e02015-01-15 14:24:22 +0100369 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200370 await waiter
371 except Exception:
Victor Stinner4bf22e02015-01-15 14:24:22 +0100372 transp.close()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200373 await transp._wait()
374 raise
Victor Stinner4bf22e02015-01-15 14:24:22 +0100375
Guido van Rossum59691282013-10-30 14:52:03 -0700376 return transp
377
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378
379class IocpProactor:
380 """Proactor implementation using IOCP."""
381
382 def __init__(self, concurrency=0xffffffff):
383 self._loop = None
384 self._results = []
385 self._iocp = _overlapped.CreateIoCompletionPort(
386 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
387 self._cache = {}
388 self._registered = weakref.WeakSet()
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100389 self._unregistered = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390 self._stopped_serving = weakref.WeakSet()
391
Victor Stinnerfea6a102014-07-25 00:54:53 +0200392 def __repr__(self):
393 return ('<%s overlapped#=%s result#=%s>'
394 % (self.__class__.__name__, len(self._cache),
395 len(self._results)))
396
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397 def set_loop(self, loop):
398 self._loop = loop
399
400 def select(self, timeout=None):
401 if not self._results:
402 self._poll(timeout)
403 tmp = self._results
404 self._results = []
405 return tmp
406
Victor Stinner41063d22015-01-26 22:30:49 +0100407 def _result(self, value):
Yury Selivanov7661db62016-05-16 15:38:39 -0400408 fut = self._loop.create_future()
Victor Stinner41063d22015-01-26 22:30:49 +0100409 fut.set_result(value)
410 return fut
411
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412 def recv(self, conn, nbytes, flags=0):
413 self._register_with_iocp(conn)
414 ov = _overlapped.Overlapped(NULL)
Victor Stinner41063d22015-01-26 22:30:49 +0100415 try:
416 if isinstance(conn, socket.socket):
417 ov.WSARecv(conn.fileno(), nbytes, flags)
418 else:
419 ov.ReadFile(conn.fileno(), nbytes)
420 except BrokenPipeError:
421 return self._result(b'')
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700422
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100423 def finish_recv(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424 try:
425 return ov.getresult()
426 except OSError as exc:
427 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
428 raise ConnectionResetError(*exc.args)
429 else:
430 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700431
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100432 return self._register(ov, conn, finish_recv)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200434 def recv_into(self, conn, buf, flags=0):
435 self._register_with_iocp(conn)
436 ov = _overlapped.Overlapped(NULL)
437 try:
438 if isinstance(conn, socket.socket):
439 ov.WSARecvInto(conn.fileno(), buf, flags)
440 else:
441 ov.ReadFileInto(conn.fileno(), buf)
442 except BrokenPipeError:
443 return self._result(b'')
444
445 def finish_recv(trans, key, ov):
446 try:
447 return ov.getresult()
448 except OSError as exc:
449 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
450 raise ConnectionResetError(*exc.args)
451 else:
452 raise
453
454 return self._register(ov, conn, finish_recv)
455
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700456 def send(self, conn, buf, flags=0):
457 self._register_with_iocp(conn)
458 ov = _overlapped.Overlapped(NULL)
459 if isinstance(conn, socket.socket):
460 ov.WSASend(conn.fileno(), buf, flags)
461 else:
462 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700463
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100464 def finish_send(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465 try:
466 return ov.getresult()
467 except OSError as exc:
468 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
469 raise ConnectionResetError(*exc.args)
470 else:
471 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700472
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100473 return self._register(ov, conn, finish_send)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474
475 def accept(self, listener):
476 self._register_with_iocp(listener)
477 conn = self._get_accept_socket(listener.family)
478 ov = _overlapped.Overlapped(NULL)
479 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700480
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700481 def finish_accept(trans, key, ov):
482 ov.getresult()
483 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
484 buf = struct.pack('@P', listener.fileno())
485 conn.setsockopt(socket.SOL_SOCKET,
486 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
487 conn.settimeout(listener.gettimeout())
488 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700489
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200490 async def accept_coro(future, conn):
Victor Stinner7de26462014-01-11 00:03:21 +0100491 # Coroutine closing the accept socket if the future is cancelled
492 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200493 await future
Victor Stinner7de26462014-01-11 00:03:21 +0100494 except futures.CancelledError:
495 conn.close()
496 raise
497
498 future = self._register(ov, listener, finish_accept)
499 coro = accept_coro(future, conn)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400500 tasks.ensure_future(coro, loop=self._loop)
Victor Stinner7de26462014-01-11 00:03:21 +0100501 return future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700502
503 def connect(self, conn, address):
504 self._register_with_iocp(conn)
505 # The socket needs to be locally bound before we call ConnectEx().
506 try:
507 _overlapped.BindLocal(conn.fileno(), conn.family)
508 except OSError as e:
509 if e.winerror != errno.WSAEINVAL:
510 raise
511 # Probably already locally bound; check using getsockname().
512 if conn.getsockname()[1] == 0:
513 raise
514 ov = _overlapped.Overlapped(NULL)
515 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700516
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700517 def finish_connect(trans, key, ov):
518 ov.getresult()
519 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
520 conn.setsockopt(socket.SOL_SOCKET,
521 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
522 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700523
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700524 return self._register(ov, conn, finish_connect)
525
526 def accept_pipe(self, pipe):
527 self._register_with_iocp(pipe)
528 ov = _overlapped.Overlapped(NULL)
Victor Stinner2b77c542015-01-22 23:50:03 +0100529 connected = ov.ConnectNamedPipe(pipe.fileno())
530
531 if connected:
532 # ConnectNamePipe() failed with ERROR_PIPE_CONNECTED which means
533 # that the pipe is connected. There is no need to wait for the
534 # completion of the connection.
Victor Stinner41063d22015-01-26 22:30:49 +0100535 return self._result(pipe)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700536
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100537 def finish_accept_pipe(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700538 ov.getresult()
539 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700540
Victor Stinner2b77c542015-01-22 23:50:03 +0100541 return self._register(ov, pipe, finish_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200543 async def connect_pipe(self, address):
Victor Stinnere0fd1572015-01-26 15:04:03 +0100544 delay = CONNECT_PIPE_INIT_DELAY
545 while True:
546 # Unfortunately there is no way to do an overlapped connect to a pipe.
547 # Call CreateFile() in a loop until it doesn't fail with
548 # ERROR_PIPE_BUSY
549 try:
550 handle = _overlapped.ConnectPipe(address)
551 break
552 except OSError as exc:
553 if exc.winerror != _overlapped.ERROR_PIPE_BUSY:
554 raise
555
556 # ConnectPipe() failed with ERROR_PIPE_BUSY: retry later
557 delay = min(delay * 2, CONNECT_PIPE_MAX_DELAY)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200558 await tasks.sleep(delay, loop=self._loop)
Victor Stinnere0fd1572015-01-26 15:04:03 +0100559
560 return windows_utils.PipeHandle(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700561
Guido van Rossum90fb9142013-10-30 14:44:05 -0700562 def wait_for_handle(self, handle, timeout=None):
Victor Stinner4d825b42014-12-19 17:10:44 +0100563 """Wait for a handle.
564
565 Return a Future object. The result of the future is True if the wait
566 completed, or False if the wait did not complete (on timeout).
567 """
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100568 return self._wait_for_handle(handle, timeout, False)
569
570 def _wait_cancel(self, event, done_callback):
571 fut = self._wait_for_handle(event, None, True)
572 # add_done_callback() cannot be used because the wait may only complete
573 # in IocpProactor.close(), while the event loop is not running.
574 fut._done_callback = done_callback
575 return fut
576
577 def _wait_for_handle(self, handle, timeout, _is_cancel):
Guido van Rossum90fb9142013-10-30 14:44:05 -0700578 if timeout is None:
579 ms = _winapi.INFINITE
580 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100581 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
582 # round away from zero to wait *at least* timeout seconds.
583 ms = math.ceil(timeout * 1e3)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700584
585 # We only create ov so we can use ov.address as a key for the cache.
586 ov = _overlapped.Overlapped(NULL)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100587 wait_handle = _overlapped.RegisterWaitWithQueue(
Guido van Rossum90fb9142013-10-30 14:44:05 -0700588 handle, self._iocp, ov.address, ms)
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100589 if _is_cancel:
590 f = _WaitCancelFuture(ov, handle, wait_handle, loop=self._loop)
591 else:
592 f = _WaitHandleFuture(ov, handle, wait_handle, self,
593 loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200594 if f._source_traceback:
595 del f._source_traceback[-1]
Guido van Rossum90fb9142013-10-30 14:44:05 -0700596
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100597 def finish_wait_for_handle(trans, key, ov):
Richard Oudkerk71196e72013-11-24 17:50:40 +0000598 # Note that this second wait means that we should only use
599 # this with handles types where a successful wait has no
600 # effect. So events or processes are all right, but locks
601 # or semaphores are not. Also note if the handle is
602 # signalled and then quickly reset, then we may return
603 # False even though we have not timed out.
Victor Stinner313a9802014-07-29 12:58:23 +0200604 return f._poll()
Guido van Rossum90fb9142013-10-30 14:44:05 -0700605
Victor Stinner313a9802014-07-29 12:58:23 +0200606 self._cache[ov.address] = (f, ov, 0, finish_wait_for_handle)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700607 return f
608
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700609 def _register_with_iocp(self, obj):
610 # To get notifications of finished ops on this objects sent to the
611 # completion port, were must register the handle.
612 if obj not in self._registered:
613 self._registered.add(obj)
614 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
615 # XXX We could also use SetFileCompletionNotificationModes()
616 # to avoid sending notifications to completion port of ops
617 # that succeed immediately.
618
Victor Stinner2b77c542015-01-22 23:50:03 +0100619 def _register(self, ov, obj, callback):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700620 # Return a future which will be set with the result of the
621 # operation when it completes. The future's value is actually
622 # the value returned by callback().
623 f = _OverlappedFuture(ov, loop=self._loop)
Victor Stinner313a9802014-07-29 12:58:23 +0200624 if f._source_traceback:
625 del f._source_traceback[-1]
Victor Stinner2b77c542015-01-22 23:50:03 +0100626 if not ov.pending:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700627 # The operation has completed, so no need to postpone the
628 # work. We cannot take this short cut if we need the
629 # NumberOfBytes, CompletionKey values returned by
630 # PostQueuedCompletionStatus().
631 try:
632 value = callback(None, None, ov)
633 except OSError as e:
634 f.set_exception(e)
635 else:
636 f.set_result(value)
Victor Stinner42d3bde2014-07-28 00:18:43 +0200637 # Even if GetOverlappedResult() was called, we have to wait for the
638 # notification of the completion in GetQueuedCompletionStatus().
639 # Register the overlapped operation to keep a reference to the
640 # OVERLAPPED object, otherwise the memory is freed and Windows may
641 # read uninitialized memory.
Victor Stinner2b77c542015-01-22 23:50:03 +0100642
643 # Register the overlapped operation for later. Note that
644 # we only store obj to prevent it from being garbage
645 # collected too early.
646 self._cache[ov.address] = (f, ov, obj, callback)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647 return f
648
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100649 def _unregister(self, ov):
650 """Unregister an overlapped object.
651
652 Call this method when its future has been cancelled. The event can
653 already be signalled (pending in the proactor event queue). It is also
654 safe if the event is never signalled (because it was cancelled).
655 """
656 self._unregistered.append(ov)
657
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700658 def _get_accept_socket(self, family):
659 s = socket.socket(family)
660 s.settimeout(0)
661 return s
662
663 def _poll(self, timeout=None):
664 if timeout is None:
665 ms = INFINITE
666 elif timeout < 0:
667 raise ValueError("negative timeout")
668 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100669 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
670 # round away from zero to wait *at least* timeout seconds.
671 ms = math.ceil(timeout * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700672 if ms >= INFINITE:
673 raise ValueError("timeout too big")
Victor Stinner313a9802014-07-29 12:58:23 +0200674
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700675 while True:
676 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
677 if status is None:
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100678 break
Victor Stinner313a9802014-07-29 12:58:23 +0200679 ms = 0
680
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700681 err, transferred, key, address = status
682 try:
683 f, ov, obj, callback = self._cache.pop(address)
684 except KeyError:
Victor Stinner42d3bde2014-07-28 00:18:43 +0200685 if self._loop.get_debug():
686 self._loop.call_exception_handler({
687 'message': ('GetQueuedCompletionStatus() returned an '
688 'unexpected event'),
689 'status': ('err=%s transferred=%s key=%#x address=%#x'
690 % (err, transferred, key, address)),
691 })
692
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700693 # key is either zero, or it is used to return a pipe
694 # handle which should be closed to avoid a leak.
695 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
696 _winapi.CloseHandle(key)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700697 continue
Victor Stinner51e44ea2014-07-26 00:58:34 +0200698
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700699 if obj in self._stopped_serving:
700 f.cancel()
Victor Stinner42d3bde2014-07-28 00:18:43 +0200701 # Don't call the callback if _register() already read the result or
702 # if the overlapped has been cancelled
703 elif not f.done():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700704 try:
705 value = callback(transferred, key, ov)
706 except OSError as e:
707 f.set_exception(e)
708 self._results.append(f)
709 else:
710 f.set_result(value)
711 self._results.append(f)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700712
Victor Stinnerd0a28de2015-01-21 23:39:51 +0100713 # Remove unregisted futures
714 for ov in self._unregistered:
715 self._cache.pop(ov.address, None)
716 self._unregistered.clear()
717
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700718 def _stop_serving(self, obj):
719 # obj is a socket or pipe handle. It will be closed in
720 # BaseProactorEventLoop._stop_serving() which will make any
721 # pending operations fail quickly.
722 self._stopped_serving.add(obj)
723
724 def close(self):
725 # Cancel remaining registered operations.
Victor Stinnerfea6a102014-07-25 00:54:53 +0200726 for address, (fut, ov, obj, callback) in list(self._cache.items()):
Victor Stinner3d2256f2015-01-26 11:02:59 +0100727 if fut.cancelled():
Victor Stinner752aba72015-01-22 22:47:13 +0100728 # Nothing to do with cancelled futures
729 pass
Victor Stinner1ca93922015-01-22 00:17:54 +0100730 elif isinstance(fut, _WaitCancelFuture):
731 # _WaitCancelFuture must not be cancelled
732 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700733 else:
734 try:
Victor Stinnerfea6a102014-07-25 00:54:53 +0200735 fut.cancel()
736 except OSError as exc:
737 if self._loop is not None:
738 context = {
739 'message': 'Cancelling a future failed',
740 'exception': exc,
741 'future': fut,
742 }
743 if fut._source_traceback:
744 context['source_traceback'] = fut._source_traceback
745 self._loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700746
747 while self._cache:
748 if not self._poll(1):
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700749 logger.debug('taking long time to close proactor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700750
751 self._results = []
752 if self._iocp is not None:
753 _winapi.CloseHandle(self._iocp)
754 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700755
Victor Stinnerfea6a102014-07-25 00:54:53 +0200756 def __del__(self):
757 self.close()
758
Guido van Rossum59691282013-10-30 14:52:03 -0700759
760class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
761
762 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
763 self._proc = windows_utils.Popen(
764 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
765 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700766
Guido van Rossum59691282013-10-30 14:52:03 -0700767 def callback(f):
768 returncode = self._proc.poll()
769 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700770
Guido van Rossum59691282013-10-30 14:52:03 -0700771 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
772 f.add_done_callback(callback)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800773
774
775SelectorEventLoop = _WindowsSelectorEventLoop
776
777
778class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
779 _loop_factory = SelectorEventLoop
780
781
782DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy