blob: e6be9d13011405baeb2ab9a9346ec653ae569a77 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector and proactor event loops for Windows."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Victor Stinnerf2e17682014-01-31 16:25:24 +01003import _winapi
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import errno
Victor Stinnerf2e17682014-01-31 16:25:24 +01005import math
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import socket
Victor Stinnerf2e17682014-01-31 16:25:24 +01007import struct
Guido van Rossum59691282013-10-30 14:52:03 -07008import subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011from . import events
Guido van Rossum59691282013-10-30 14:52:03 -070012from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013from . import futures
14from . import proactor_events
15from . import selector_events
16from . import tasks
17from . import windows_utils
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070018from .log import logger
Guido van Rossum59691282013-10-30 14:52:03 -070019from . import _overlapped
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020
21
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080022__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
23 'DefaultEventLoopPolicy',
24 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
27NULL = 0
28INFINITE = 0xffffffff
29ERROR_CONNECTION_REFUSED = 1225
30ERROR_CONNECTION_ABORTED = 1236
31
32
33class _OverlappedFuture(futures.Future):
34 """Subclass of Future which represents an overlapped operation.
35
36 Cancelling it will immediately cancel the overlapped operation.
37 """
38
39 def __init__(self, ov, *, loop=None):
40 super().__init__(loop=loop)
41 self.ov = ov
42
43 def cancel(self):
44 try:
45 self.ov.cancel()
46 except OSError:
47 pass
48 return super().cancel()
49
50
Guido van Rossum90fb9142013-10-30 14:44:05 -070051class _WaitHandleFuture(futures.Future):
52 """Subclass of Future which represents a wait handle."""
53
54 def __init__(self, wait_handle, *, loop=None):
55 super().__init__(loop=loop)
56 self._wait_handle = wait_handle
57
58 def cancel(self):
59 super().cancel()
60 try:
61 _overlapped.UnregisterWait(self._wait_handle)
62 except OSError as e:
63 if e.winerror != _overlapped.ERROR_IO_PENDING:
64 raise
65
66
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070067class PipeServer(object):
68 """Class representing a pipe server.
69
70 This is much like a bound, listening socket.
71 """
72 def __init__(self, address):
73 self._address = address
74 self._free_instances = weakref.WeakSet()
75 self._pipe = self._server_pipe_handle(True)
76
77 def _get_unconnected_pipe(self):
78 # Create new instance and return previous one. This ensures
79 # that (until the server is closed) there is always at least
80 # one pipe handle for address. Therefore if a client attempt
81 # to connect it will not fail with FileNotFoundError.
82 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
83 return tmp
84
85 def _server_pipe_handle(self, first):
86 # Return a wrapper for a new pipe handle.
87 if self._address is None:
88 return None
89 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
90 if first:
91 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
92 h = _winapi.CreateNamedPipe(
93 self._address, flags,
94 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
95 _winapi.PIPE_WAIT,
96 _winapi.PIPE_UNLIMITED_INSTANCES,
97 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
98 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
99 pipe = windows_utils.PipeHandle(h)
100 self._free_instances.add(pipe)
101 return pipe
102
103 def close(self):
104 # Close all instances which have not been connected to by a client.
105 if self._address is not None:
106 for pipe in self._free_instances:
107 pipe.close()
108 self._pipe = None
109 self._address = None
110 self._free_instances.clear()
111
112 __del__ = close
113
114
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800115class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700116 """Windows version of selector event loop."""
117
118 def _socketpair(self):
119 return windows_utils.socketpair()
120
121
122class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
123 """Windows version of proactor event loop using IOCP."""
124
125 def __init__(self, proactor=None):
126 if proactor is None:
127 proactor = IocpProactor()
128 super().__init__(proactor)
129
130 def _socketpair(self):
131 return windows_utils.socketpair()
132
133 @tasks.coroutine
134 def create_pipe_connection(self, protocol_factory, address):
135 f = self._proactor.connect_pipe(address)
136 pipe = yield from f
137 protocol = protocol_factory()
138 trans = self._make_duplex_pipe_transport(pipe, protocol,
139 extra={'addr': address})
140 return trans, protocol
141
142 @tasks.coroutine
143 def start_serving_pipe(self, protocol_factory, address):
144 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700145
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146 def loop(f=None):
147 pipe = None
148 try:
149 if f:
150 pipe = f.result()
151 server._free_instances.discard(pipe)
152 protocol = protocol_factory()
153 self._make_duplex_pipe_transport(
154 pipe, protocol, extra={'addr': address})
155 pipe = server._get_unconnected_pipe()
156 if pipe is None:
157 return
158 f = self._proactor.accept_pipe(pipe)
Yury Selivanov569efa22014-02-18 18:02:19 -0500159 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160 if pipe and pipe.fileno() != -1:
Yury Selivanov569efa22014-02-18 18:02:19 -0500161 self.call_exception_handler({
162 'message': 'Pipe accept failed',
163 'exception': exc,
164 'pipe': pipe,
165 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166 pipe.close()
167 except futures.CancelledError:
168 if pipe:
169 pipe.close()
170 else:
171 f.add_done_callback(loop)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700172
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700173 self.call_soon(loop)
174 return [server]
175
Guido van Rossum59691282013-10-30 14:52:03 -0700176 @tasks.coroutine
177 def _make_subprocess_transport(self, protocol, args, shell,
178 stdin, stdout, stderr, bufsize,
179 extra=None, **kwargs):
180 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
181 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800182 extra=extra, **kwargs)
Guido van Rossum59691282013-10-30 14:52:03 -0700183 yield from transp._post_init()
184 return transp
185
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700186
187class IocpProactor:
188 """Proactor implementation using IOCP."""
189
190 def __init__(self, concurrency=0xffffffff):
191 self._loop = None
192 self._results = []
193 self._iocp = _overlapped.CreateIoCompletionPort(
194 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
195 self._cache = {}
196 self._registered = weakref.WeakSet()
197 self._stopped_serving = weakref.WeakSet()
198
199 def set_loop(self, loop):
200 self._loop = loop
201
202 def select(self, timeout=None):
203 if not self._results:
204 self._poll(timeout)
205 tmp = self._results
206 self._results = []
207 return tmp
208
209 def recv(self, conn, nbytes, flags=0):
210 self._register_with_iocp(conn)
211 ov = _overlapped.Overlapped(NULL)
212 if isinstance(conn, socket.socket):
213 ov.WSARecv(conn.fileno(), nbytes, flags)
214 else:
215 ov.ReadFile(conn.fileno(), nbytes)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700216
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217 def finish(trans, key, ov):
218 try:
219 return ov.getresult()
220 except OSError as exc:
221 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
222 raise ConnectionResetError(*exc.args)
223 else:
224 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700225
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700226 return self._register(ov, conn, finish)
227
228 def send(self, conn, buf, flags=0):
229 self._register_with_iocp(conn)
230 ov = _overlapped.Overlapped(NULL)
231 if isinstance(conn, socket.socket):
232 ov.WSASend(conn.fileno(), buf, flags)
233 else:
234 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700235
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700236 def finish(trans, key, ov):
237 try:
238 return ov.getresult()
239 except OSError as exc:
240 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
241 raise ConnectionResetError(*exc.args)
242 else:
243 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700244
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700245 return self._register(ov, conn, finish)
246
247 def accept(self, listener):
248 self._register_with_iocp(listener)
249 conn = self._get_accept_socket(listener.family)
250 ov = _overlapped.Overlapped(NULL)
251 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700252
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700253 def finish_accept(trans, key, ov):
254 ov.getresult()
255 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
256 buf = struct.pack('@P', listener.fileno())
257 conn.setsockopt(socket.SOL_SOCKET,
258 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
259 conn.settimeout(listener.gettimeout())
260 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700261
Victor Stinner7de26462014-01-11 00:03:21 +0100262 @tasks.coroutine
263 def accept_coro(future, conn):
264 # Coroutine closing the accept socket if the future is cancelled
265 try:
266 yield from future
267 except futures.CancelledError:
268 conn.close()
269 raise
270
271 future = self._register(ov, listener, finish_accept)
272 coro = accept_coro(future, conn)
273 tasks.async(coro, loop=self._loop)
274 return future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275
276 def connect(self, conn, address):
277 self._register_with_iocp(conn)
278 # The socket needs to be locally bound before we call ConnectEx().
279 try:
280 _overlapped.BindLocal(conn.fileno(), conn.family)
281 except OSError as e:
282 if e.winerror != errno.WSAEINVAL:
283 raise
284 # Probably already locally bound; check using getsockname().
285 if conn.getsockname()[1] == 0:
286 raise
287 ov = _overlapped.Overlapped(NULL)
288 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700289
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700290 def finish_connect(trans, key, ov):
291 ov.getresult()
292 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
293 conn.setsockopt(socket.SOL_SOCKET,
294 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
295 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700296
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 return self._register(ov, conn, finish_connect)
298
299 def accept_pipe(self, pipe):
300 self._register_with_iocp(pipe)
301 ov = _overlapped.Overlapped(NULL)
302 ov.ConnectNamedPipe(pipe.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700303
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700304 def finish(trans, key, ov):
305 ov.getresult()
306 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700307
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308 return self._register(ov, pipe, finish)
309
310 def connect_pipe(self, address):
311 ov = _overlapped.Overlapped(NULL)
312 ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700313
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 def finish(err, handle, ov):
315 # err, handle were arguments passed to PostQueuedCompletionStatus()
316 # in a function run in a thread pool.
317 if err == _overlapped.ERROR_SEM_TIMEOUT:
318 # Connection did not succeed within time limit.
319 msg = _overlapped.FormatMessage(err)
320 raise ConnectionRefusedError(0, msg, None, err)
321 elif err != 0:
322 msg = _overlapped.FormatMessage(err)
323 raise OSError(0, msg, None, err)
324 else:
325 return windows_utils.PipeHandle(handle)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700326
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327 return self._register(ov, None, finish, wait_for_post=True)
328
Guido van Rossum90fb9142013-10-30 14:44:05 -0700329 def wait_for_handle(self, handle, timeout=None):
330 if timeout is None:
331 ms = _winapi.INFINITE
332 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100333 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
334 # round away from zero to wait *at least* timeout seconds.
335 ms = math.ceil(timeout * 1e3)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700336
337 # We only create ov so we can use ov.address as a key for the cache.
338 ov = _overlapped.Overlapped(NULL)
339 wh = _overlapped.RegisterWaitWithQueue(
340 handle, self._iocp, ov.address, ms)
341 f = _WaitHandleFuture(wh, loop=self._loop)
342
Richard Oudkerk71196e72013-11-24 17:50:40 +0000343 def finish(trans, key, ov):
Guido van Rossum90fb9142013-10-30 14:44:05 -0700344 if not f.cancelled():
345 try:
346 _overlapped.UnregisterWait(wh)
347 except OSError as e:
348 if e.winerror != _overlapped.ERROR_IO_PENDING:
349 raise
Richard Oudkerk71196e72013-11-24 17:50:40 +0000350 # Note that this second wait means that we should only use
351 # this with handles types where a successful wait has no
352 # effect. So events or processes are all right, but locks
353 # or semaphores are not. Also note if the handle is
354 # signalled and then quickly reset, then we may return
355 # False even though we have not timed out.
356 return (_winapi.WaitForSingleObject(handle, 0) ==
357 _winapi.WAIT_OBJECT_0)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700358
359 self._cache[ov.address] = (f, ov, None, finish)
360 return f
361
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 def _register_with_iocp(self, obj):
363 # To get notifications of finished ops on this objects sent to the
364 # completion port, were must register the handle.
365 if obj not in self._registered:
366 self._registered.add(obj)
367 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
368 # XXX We could also use SetFileCompletionNotificationModes()
369 # to avoid sending notifications to completion port of ops
370 # that succeed immediately.
371
372 def _register(self, ov, obj, callback, wait_for_post=False):
373 # Return a future which will be set with the result of the
374 # operation when it completes. The future's value is actually
375 # the value returned by callback().
376 f = _OverlappedFuture(ov, loop=self._loop)
377 if ov.pending or wait_for_post:
378 # Register the overlapped operation for later. Note that
379 # we only store obj to prevent it from being garbage
380 # collected too early.
381 self._cache[ov.address] = (f, ov, obj, callback)
382 else:
383 # The operation has completed, so no need to postpone the
384 # work. We cannot take this short cut if we need the
385 # NumberOfBytes, CompletionKey values returned by
386 # PostQueuedCompletionStatus().
387 try:
388 value = callback(None, None, ov)
389 except OSError as e:
390 f.set_exception(e)
391 else:
392 f.set_result(value)
393 return f
394
395 def _get_accept_socket(self, family):
396 s = socket.socket(family)
397 s.settimeout(0)
398 return s
399
400 def _poll(self, timeout=None):
401 if timeout is None:
402 ms = INFINITE
403 elif timeout < 0:
404 raise ValueError("negative timeout")
405 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100406 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
407 # round away from zero to wait *at least* timeout seconds.
408 ms = math.ceil(timeout * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 if ms >= INFINITE:
410 raise ValueError("timeout too big")
411 while True:
412 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
413 if status is None:
414 return
415 err, transferred, key, address = status
416 try:
417 f, ov, obj, callback = self._cache.pop(address)
418 except KeyError:
419 # key is either zero, or it is used to return a pipe
420 # handle which should be closed to avoid a leak.
421 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
422 _winapi.CloseHandle(key)
423 ms = 0
424 continue
425 if obj in self._stopped_serving:
426 f.cancel()
427 elif not f.cancelled():
428 try:
429 value = callback(transferred, key, ov)
430 except OSError as e:
431 f.set_exception(e)
432 self._results.append(f)
433 else:
434 f.set_result(value)
435 self._results.append(f)
436 ms = 0
437
438 def _stop_serving(self, obj):
439 # obj is a socket or pipe handle. It will be closed in
440 # BaseProactorEventLoop._stop_serving() which will make any
441 # pending operations fail quickly.
442 self._stopped_serving.add(obj)
443
444 def close(self):
445 # Cancel remaining registered operations.
446 for address, (f, ov, obj, callback) in list(self._cache.items()):
447 if obj is None:
448 # The operation was started with connect_pipe() which
449 # queues a task to Windows' thread pool. This cannot
450 # be cancelled, so just forget it.
451 del self._cache[address]
452 else:
453 try:
454 ov.cancel()
455 except OSError:
456 pass
457
458 while self._cache:
459 if not self._poll(1):
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700460 logger.debug('taking long time to close proactor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461
462 self._results = []
463 if self._iocp is not None:
464 _winapi.CloseHandle(self._iocp)
465 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700466
467
468class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
469
470 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
471 self._proc = windows_utils.Popen(
472 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
473 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700474
Guido van Rossum59691282013-10-30 14:52:03 -0700475 def callback(f):
476 returncode = self._proc.poll()
477 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700478
Guido van Rossum59691282013-10-30 14:52:03 -0700479 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
480 f.add_done_callback(callback)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800481
482
483SelectorEventLoop = _WindowsSelectorEventLoop
484
485
486class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
487 _loop_factory = SelectorEventLoop
488
489
490DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy