blob: 19f25882cd346e8770a968fdcdfab0ab61ff8df6 [file] [log] [blame]
Yury Selivanovb0b0e622014-02-18 22:27:48 -05001"""Selector and proactor event loops for Windows."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Victor Stinnerf2e17682014-01-31 16:25:24 +01003import _winapi
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import errno
Victor Stinnerf2e17682014-01-31 16:25:24 +01005import math
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import socket
Victor Stinnerf2e17682014-01-31 16:25:24 +01007import struct
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010from . import events
Guido van Rossum59691282013-10-30 14:52:03 -070011from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012from . import futures
13from . import proactor_events
14from . import selector_events
15from . import tasks
16from . import windows_utils
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070017from .log import logger
Guido van Rossum59691282013-10-30 14:52:03 -070018from . import _overlapped
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019
20
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080021__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
22 'DefaultEventLoopPolicy',
23 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024
25
26NULL = 0
27INFINITE = 0xffffffff
28ERROR_CONNECTION_REFUSED = 1225
29ERROR_CONNECTION_ABORTED = 1236
30
31
32class _OverlappedFuture(futures.Future):
33 """Subclass of Future which represents an overlapped operation.
34
35 Cancelling it will immediately cancel the overlapped operation.
36 """
37
38 def __init__(self, ov, *, loop=None):
39 super().__init__(loop=loop)
40 self.ov = ov
41
42 def cancel(self):
43 try:
44 self.ov.cancel()
45 except OSError:
46 pass
47 return super().cancel()
48
49
Guido van Rossum90fb9142013-10-30 14:44:05 -070050class _WaitHandleFuture(futures.Future):
51 """Subclass of Future which represents a wait handle."""
52
53 def __init__(self, wait_handle, *, loop=None):
54 super().__init__(loop=loop)
55 self._wait_handle = wait_handle
56
57 def cancel(self):
58 super().cancel()
59 try:
60 _overlapped.UnregisterWait(self._wait_handle)
61 except OSError as e:
62 if e.winerror != _overlapped.ERROR_IO_PENDING:
63 raise
64
65
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070066class PipeServer(object):
67 """Class representing a pipe server.
68
69 This is much like a bound, listening socket.
70 """
71 def __init__(self, address):
72 self._address = address
73 self._free_instances = weakref.WeakSet()
74 self._pipe = self._server_pipe_handle(True)
75
76 def _get_unconnected_pipe(self):
77 # Create new instance and return previous one. This ensures
78 # that (until the server is closed) there is always at least
79 # one pipe handle for address. Therefore if a client attempt
80 # to connect it will not fail with FileNotFoundError.
81 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
82 return tmp
83
84 def _server_pipe_handle(self, first):
85 # Return a wrapper for a new pipe handle.
86 if self._address is None:
87 return None
88 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
89 if first:
90 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
91 h = _winapi.CreateNamedPipe(
92 self._address, flags,
93 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
94 _winapi.PIPE_WAIT,
95 _winapi.PIPE_UNLIMITED_INSTANCES,
96 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
97 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
98 pipe = windows_utils.PipeHandle(h)
99 self._free_instances.add(pipe)
100 return pipe
101
102 def close(self):
103 # Close all instances which have not been connected to by a client.
104 if self._address is not None:
105 for pipe in self._free_instances:
106 pipe.close()
107 self._pipe = None
108 self._address = None
109 self._free_instances.clear()
110
111 __del__ = close
112
113
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800114class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700115 """Windows version of selector event loop."""
116
117 def _socketpair(self):
118 return windows_utils.socketpair()
119
120
121class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
122 """Windows version of proactor event loop using IOCP."""
123
124 def __init__(self, proactor=None):
125 if proactor is None:
126 proactor = IocpProactor()
127 super().__init__(proactor)
128
129 def _socketpair(self):
130 return windows_utils.socketpair()
131
132 @tasks.coroutine
133 def create_pipe_connection(self, protocol_factory, address):
134 f = self._proactor.connect_pipe(address)
135 pipe = yield from f
136 protocol = protocol_factory()
137 trans = self._make_duplex_pipe_transport(pipe, protocol,
138 extra={'addr': address})
139 return trans, protocol
140
141 @tasks.coroutine
142 def start_serving_pipe(self, protocol_factory, address):
143 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700144
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700145 def loop(f=None):
146 pipe = None
147 try:
148 if f:
149 pipe = f.result()
150 server._free_instances.discard(pipe)
151 protocol = protocol_factory()
152 self._make_duplex_pipe_transport(
153 pipe, protocol, extra={'addr': address})
154 pipe = server._get_unconnected_pipe()
155 if pipe is None:
156 return
157 f = self._proactor.accept_pipe(pipe)
Yury Selivanovff827f02014-02-18 18:02:19 -0500158 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159 if pipe and pipe.fileno() != -1:
Yury Selivanovff827f02014-02-18 18:02:19 -0500160 self.call_exception_handler({
161 'message': 'Pipe accept failed',
162 'exception': exc,
163 'pipe': pipe,
164 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700165 pipe.close()
166 except futures.CancelledError:
167 if pipe:
168 pipe.close()
169 else:
170 f.add_done_callback(loop)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700171
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172 self.call_soon(loop)
173 return [server]
174
Guido van Rossum59691282013-10-30 14:52:03 -0700175 @tasks.coroutine
176 def _make_subprocess_transport(self, protocol, args, shell,
177 stdin, stdout, stderr, bufsize,
178 extra=None, **kwargs):
179 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
180 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800181 extra=extra, **kwargs)
Guido van Rossum59691282013-10-30 14:52:03 -0700182 yield from transp._post_init()
183 return transp
184
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185
186class IocpProactor:
187 """Proactor implementation using IOCP."""
188
189 def __init__(self, concurrency=0xffffffff):
190 self._loop = None
191 self._results = []
192 self._iocp = _overlapped.CreateIoCompletionPort(
193 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
194 self._cache = {}
195 self._registered = weakref.WeakSet()
196 self._stopped_serving = weakref.WeakSet()
197
198 def set_loop(self, loop):
199 self._loop = loop
200
201 def select(self, timeout=None):
202 if not self._results:
203 self._poll(timeout)
204 tmp = self._results
205 self._results = []
206 return tmp
207
208 def recv(self, conn, nbytes, flags=0):
209 self._register_with_iocp(conn)
210 ov = _overlapped.Overlapped(NULL)
211 if isinstance(conn, socket.socket):
212 ov.WSARecv(conn.fileno(), nbytes, flags)
213 else:
214 ov.ReadFile(conn.fileno(), nbytes)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700215
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100216 def finish_recv(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217 try:
218 return ov.getresult()
219 except OSError as exc:
220 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
221 raise ConnectionResetError(*exc.args)
222 else:
223 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700224
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100225 return self._register(ov, conn, finish_recv)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700226
227 def send(self, conn, buf, flags=0):
228 self._register_with_iocp(conn)
229 ov = _overlapped.Overlapped(NULL)
230 if isinstance(conn, socket.socket):
231 ov.WSASend(conn.fileno(), buf, flags)
232 else:
233 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700234
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100235 def finish_send(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700236 try:
237 return ov.getresult()
238 except OSError as exc:
239 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
240 raise ConnectionResetError(*exc.args)
241 else:
242 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700243
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100244 return self._register(ov, conn, finish_send)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700245
246 def accept(self, listener):
247 self._register_with_iocp(listener)
248 conn = self._get_accept_socket(listener.family)
249 ov = _overlapped.Overlapped(NULL)
250 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700251
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700252 def finish_accept(trans, key, ov):
253 ov.getresult()
254 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
255 buf = struct.pack('@P', listener.fileno())
256 conn.setsockopt(socket.SOL_SOCKET,
257 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
258 conn.settimeout(listener.gettimeout())
259 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700260
Victor Stinner7de26462014-01-11 00:03:21 +0100261 @tasks.coroutine
262 def accept_coro(future, conn):
263 # Coroutine closing the accept socket if the future is cancelled
264 try:
265 yield from future
266 except futures.CancelledError:
267 conn.close()
268 raise
269
270 future = self._register(ov, listener, finish_accept)
271 coro = accept_coro(future, conn)
272 tasks.async(coro, loop=self._loop)
273 return future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274
275 def connect(self, conn, address):
276 self._register_with_iocp(conn)
277 # The socket needs to be locally bound before we call ConnectEx().
278 try:
279 _overlapped.BindLocal(conn.fileno(), conn.family)
280 except OSError as e:
281 if e.winerror != errno.WSAEINVAL:
282 raise
283 # Probably already locally bound; check using getsockname().
284 if conn.getsockname()[1] == 0:
285 raise
286 ov = _overlapped.Overlapped(NULL)
287 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700288
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289 def finish_connect(trans, key, ov):
290 ov.getresult()
291 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
292 conn.setsockopt(socket.SOL_SOCKET,
293 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
294 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700295
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296 return self._register(ov, conn, finish_connect)
297
298 def accept_pipe(self, pipe):
299 self._register_with_iocp(pipe)
300 ov = _overlapped.Overlapped(NULL)
301 ov.ConnectNamedPipe(pipe.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700302
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100303 def finish_accept_pipe(trans, key, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700304 ov.getresult()
305 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700306
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100307 return self._register(ov, pipe, finish_accept_pipe)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308
309 def connect_pipe(self, address):
310 ov = _overlapped.Overlapped(NULL)
311 ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700312
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100313 def finish_connect_pipe(err, handle, ov):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700314 # err, handle were arguments passed to PostQueuedCompletionStatus()
315 # in a function run in a thread pool.
316 if err == _overlapped.ERROR_SEM_TIMEOUT:
317 # Connection did not succeed within time limit.
318 msg = _overlapped.FormatMessage(err)
319 raise ConnectionRefusedError(0, msg, None, err)
320 elif err != 0:
321 msg = _overlapped.FormatMessage(err)
322 raise OSError(0, msg, None, err)
323 else:
324 return windows_utils.PipeHandle(handle)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700325
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100326 return self._register(ov, None, finish_connect_pipe, wait_for_post=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327
Guido van Rossum90fb9142013-10-30 14:44:05 -0700328 def wait_for_handle(self, handle, timeout=None):
329 if timeout is None:
330 ms = _winapi.INFINITE
331 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100332 # RegisterWaitForSingleObject() has a resolution of 1 millisecond,
333 # round away from zero to wait *at least* timeout seconds.
334 ms = math.ceil(timeout * 1e3)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700335
336 # We only create ov so we can use ov.address as a key for the cache.
337 ov = _overlapped.Overlapped(NULL)
338 wh = _overlapped.RegisterWaitWithQueue(
339 handle, self._iocp, ov.address, ms)
340 f = _WaitHandleFuture(wh, loop=self._loop)
341
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100342 def finish_wait_for_handle(trans, key, ov):
Guido van Rossum90fb9142013-10-30 14:44:05 -0700343 if not f.cancelled():
344 try:
345 _overlapped.UnregisterWait(wh)
346 except OSError as e:
347 if e.winerror != _overlapped.ERROR_IO_PENDING:
348 raise
Richard Oudkerk71196e72013-11-24 17:50:40 +0000349 # Note that this second wait means that we should only use
350 # this with handles types where a successful wait has no
351 # effect. So events or processes are all right, but locks
352 # or semaphores are not. Also note if the handle is
353 # signalled and then quickly reset, then we may return
354 # False even though we have not timed out.
355 return (_winapi.WaitForSingleObject(handle, 0) ==
356 _winapi.WAIT_OBJECT_0)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700357
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100358 self._cache[ov.address] = (f, ov, None, finish_wait_for_handle)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700359 return f
360
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 def _register_with_iocp(self, obj):
362 # To get notifications of finished ops on this objects sent to the
363 # completion port, were must register the handle.
364 if obj not in self._registered:
365 self._registered.add(obj)
366 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
367 # XXX We could also use SetFileCompletionNotificationModes()
368 # to avoid sending notifications to completion port of ops
369 # that succeed immediately.
370
371 def _register(self, ov, obj, callback, wait_for_post=False):
372 # Return a future which will be set with the result of the
373 # operation when it completes. The future's value is actually
374 # the value returned by callback().
375 f = _OverlappedFuture(ov, loop=self._loop)
376 if ov.pending or wait_for_post:
377 # Register the overlapped operation for later. Note that
378 # we only store obj to prevent it from being garbage
379 # collected too early.
380 self._cache[ov.address] = (f, ov, obj, callback)
381 else:
382 # The operation has completed, so no need to postpone the
383 # work. We cannot take this short cut if we need the
384 # NumberOfBytes, CompletionKey values returned by
385 # PostQueuedCompletionStatus().
386 try:
387 value = callback(None, None, ov)
388 except OSError as e:
389 f.set_exception(e)
390 else:
391 f.set_result(value)
392 return f
393
394 def _get_accept_socket(self, family):
395 s = socket.socket(family)
396 s.settimeout(0)
397 return s
398
399 def _poll(self, timeout=None):
400 if timeout is None:
401 ms = INFINITE
402 elif timeout < 0:
403 raise ValueError("negative timeout")
404 else:
Victor Stinnerf2e17682014-01-31 16:25:24 +0100405 # GetQueuedCompletionStatus() has a resolution of 1 millisecond,
406 # round away from zero to wait *at least* timeout seconds.
407 ms = math.ceil(timeout * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700408 if ms >= INFINITE:
409 raise ValueError("timeout too big")
410 while True:
411 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
412 if status is None:
413 return
414 err, transferred, key, address = status
415 try:
416 f, ov, obj, callback = self._cache.pop(address)
417 except KeyError:
418 # key is either zero, or it is used to return a pipe
419 # handle which should be closed to avoid a leak.
420 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
421 _winapi.CloseHandle(key)
422 ms = 0
423 continue
424 if obj in self._stopped_serving:
425 f.cancel()
426 elif not f.cancelled():
427 try:
428 value = callback(transferred, key, ov)
429 except OSError as e:
430 f.set_exception(e)
431 self._results.append(f)
432 else:
433 f.set_result(value)
434 self._results.append(f)
435 ms = 0
436
437 def _stop_serving(self, obj):
438 # obj is a socket or pipe handle. It will be closed in
439 # BaseProactorEventLoop._stop_serving() which will make any
440 # pending operations fail quickly.
441 self._stopped_serving.add(obj)
442
443 def close(self):
444 # Cancel remaining registered operations.
445 for address, (f, ov, obj, callback) in list(self._cache.items()):
446 if obj is None:
447 # The operation was started with connect_pipe() which
448 # queues a task to Windows' thread pool. This cannot
449 # be cancelled, so just forget it.
450 del self._cache[address]
451 else:
452 try:
453 ov.cancel()
454 except OSError:
455 pass
456
457 while self._cache:
458 if not self._poll(1):
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700459 logger.debug('taking long time to close proactor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460
461 self._results = []
462 if self._iocp is not None:
463 _winapi.CloseHandle(self._iocp)
464 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700465
466
467class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
468
469 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
470 self._proc = windows_utils.Popen(
471 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
472 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700473
Guido van Rossum59691282013-10-30 14:52:03 -0700474 def callback(f):
475 returncode = self._proc.poll()
476 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700477
Guido van Rossum59691282013-10-30 14:52:03 -0700478 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
479 f.add_done_callback(callback)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800480
481
482SelectorEventLoop = _WindowsSelectorEventLoop
483
484
485class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
486 _loop_factory = SelectorEventLoop
487
488
489DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy