blob: 2e9ec69734675e653bf4a8b3f92317fbee610832 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Selector and proactor eventloops for Windows."""
2
3import errno
4import socket
Guido van Rossum59691282013-10-30 14:52:03 -07005import subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import weakref
7import struct
8import _winapi
9
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010from . import events
Guido van Rossum59691282013-10-30 14:52:03 -070011from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012from . import futures
13from . import proactor_events
14from . import selector_events
15from . import tasks
16from . import windows_utils
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070017from .log import logger
Guido van Rossum59691282013-10-30 14:52:03 -070018from . import _overlapped
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019
20
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080021__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor',
22 'DefaultEventLoopPolicy',
23 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024
25
26NULL = 0
27INFINITE = 0xffffffff
28ERROR_CONNECTION_REFUSED = 1225
29ERROR_CONNECTION_ABORTED = 1236
30
31
32class _OverlappedFuture(futures.Future):
33 """Subclass of Future which represents an overlapped operation.
34
35 Cancelling it will immediately cancel the overlapped operation.
36 """
37
38 def __init__(self, ov, *, loop=None):
39 super().__init__(loop=loop)
40 self.ov = ov
41
42 def cancel(self):
43 try:
44 self.ov.cancel()
45 except OSError:
46 pass
47 return super().cancel()
48
49
Guido van Rossum90fb9142013-10-30 14:44:05 -070050class _WaitHandleFuture(futures.Future):
51 """Subclass of Future which represents a wait handle."""
52
53 def __init__(self, wait_handle, *, loop=None):
54 super().__init__(loop=loop)
55 self._wait_handle = wait_handle
56
57 def cancel(self):
58 super().cancel()
59 try:
60 _overlapped.UnregisterWait(self._wait_handle)
61 except OSError as e:
62 if e.winerror != _overlapped.ERROR_IO_PENDING:
63 raise
64
65
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070066class PipeServer(object):
67 """Class representing a pipe server.
68
69 This is much like a bound, listening socket.
70 """
71 def __init__(self, address):
72 self._address = address
73 self._free_instances = weakref.WeakSet()
74 self._pipe = self._server_pipe_handle(True)
75
76 def _get_unconnected_pipe(self):
77 # Create new instance and return previous one. This ensures
78 # that (until the server is closed) there is always at least
79 # one pipe handle for address. Therefore if a client attempt
80 # to connect it will not fail with FileNotFoundError.
81 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
82 return tmp
83
84 def _server_pipe_handle(self, first):
85 # Return a wrapper for a new pipe handle.
86 if self._address is None:
87 return None
88 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
89 if first:
90 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
91 h = _winapi.CreateNamedPipe(
92 self._address, flags,
93 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
94 _winapi.PIPE_WAIT,
95 _winapi.PIPE_UNLIMITED_INSTANCES,
96 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
97 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
98 pipe = windows_utils.PipeHandle(h)
99 self._free_instances.add(pipe)
100 return pipe
101
102 def close(self):
103 # Close all instances which have not been connected to by a client.
104 if self._address is not None:
105 for pipe in self._free_instances:
106 pipe.close()
107 self._pipe = None
108 self._address = None
109 self._free_instances.clear()
110
111 __del__ = close
112
113
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800114class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700115 """Windows version of selector event loop."""
116
117 def _socketpair(self):
118 return windows_utils.socketpair()
119
120
121class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
122 """Windows version of proactor event loop using IOCP."""
123
124 def __init__(self, proactor=None):
125 if proactor is None:
126 proactor = IocpProactor()
127 super().__init__(proactor)
128
129 def _socketpair(self):
130 return windows_utils.socketpair()
131
132 @tasks.coroutine
133 def create_pipe_connection(self, protocol_factory, address):
134 f = self._proactor.connect_pipe(address)
135 pipe = yield from f
136 protocol = protocol_factory()
137 trans = self._make_duplex_pipe_transport(pipe, protocol,
138 extra={'addr': address})
139 return trans, protocol
140
141 @tasks.coroutine
142 def start_serving_pipe(self, protocol_factory, address):
143 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700144
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700145 def loop(f=None):
146 pipe = None
147 try:
148 if f:
149 pipe = f.result()
150 server._free_instances.discard(pipe)
151 protocol = protocol_factory()
152 self._make_duplex_pipe_transport(
153 pipe, protocol, extra={'addr': address})
154 pipe = server._get_unconnected_pipe()
155 if pipe is None:
156 return
157 f = self._proactor.accept_pipe(pipe)
158 except OSError:
159 if pipe and pipe.fileno() != -1:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700160 logger.exception('Pipe accept failed')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700161 pipe.close()
162 except futures.CancelledError:
163 if pipe:
164 pipe.close()
165 else:
166 f.add_done_callback(loop)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700167
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700168 self.call_soon(loop)
169 return [server]
170
Guido van Rossum59691282013-10-30 14:52:03 -0700171 @tasks.coroutine
172 def _make_subprocess_transport(self, protocol, args, shell,
173 stdin, stdout, stderr, bufsize,
174 extra=None, **kwargs):
175 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
176 stdin, stdout, stderr, bufsize,
177 extra=None, **kwargs)
178 yield from transp._post_init()
179 return transp
180
181 def _subprocess_closed(self, transport):
182 pass
183
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700184
185class IocpProactor:
186 """Proactor implementation using IOCP."""
187
188 def __init__(self, concurrency=0xffffffff):
189 self._loop = None
190 self._results = []
191 self._iocp = _overlapped.CreateIoCompletionPort(
192 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
193 self._cache = {}
194 self._registered = weakref.WeakSet()
195 self._stopped_serving = weakref.WeakSet()
196
197 def set_loop(self, loop):
198 self._loop = loop
199
200 def select(self, timeout=None):
201 if not self._results:
202 self._poll(timeout)
203 tmp = self._results
204 self._results = []
205 return tmp
206
207 def recv(self, conn, nbytes, flags=0):
208 self._register_with_iocp(conn)
209 ov = _overlapped.Overlapped(NULL)
210 if isinstance(conn, socket.socket):
211 ov.WSARecv(conn.fileno(), nbytes, flags)
212 else:
213 ov.ReadFile(conn.fileno(), nbytes)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700214
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700215 def finish(trans, key, ov):
216 try:
217 return ov.getresult()
218 except OSError as exc:
219 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
220 raise ConnectionResetError(*exc.args)
221 else:
222 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700223
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700224 return self._register(ov, conn, finish)
225
226 def send(self, conn, buf, flags=0):
227 self._register_with_iocp(conn)
228 ov = _overlapped.Overlapped(NULL)
229 if isinstance(conn, socket.socket):
230 ov.WSASend(conn.fileno(), buf, flags)
231 else:
232 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700233
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234 def finish(trans, key, ov):
235 try:
236 return ov.getresult()
237 except OSError as exc:
238 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
239 raise ConnectionResetError(*exc.args)
240 else:
241 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700242
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700243 return self._register(ov, conn, finish)
244
245 def accept(self, listener):
246 self._register_with_iocp(listener)
247 conn = self._get_accept_socket(listener.family)
248 ov = _overlapped.Overlapped(NULL)
249 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700250
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251 def finish_accept(trans, key, ov):
252 ov.getresult()
253 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
254 buf = struct.pack('@P', listener.fileno())
255 conn.setsockopt(socket.SOL_SOCKET,
256 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
257 conn.settimeout(listener.gettimeout())
258 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700259
Victor Stinner7de26462014-01-11 00:03:21 +0100260 @tasks.coroutine
261 def accept_coro(future, conn):
262 # Coroutine closing the accept socket if the future is cancelled
263 try:
264 yield from future
265 except futures.CancelledError:
266 conn.close()
267 raise
268
269 future = self._register(ov, listener, finish_accept)
270 coro = accept_coro(future, conn)
271 tasks.async(coro, loop=self._loop)
272 return future
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700273
274 def connect(self, conn, address):
275 self._register_with_iocp(conn)
276 # The socket needs to be locally bound before we call ConnectEx().
277 try:
278 _overlapped.BindLocal(conn.fileno(), conn.family)
279 except OSError as e:
280 if e.winerror != errno.WSAEINVAL:
281 raise
282 # Probably already locally bound; check using getsockname().
283 if conn.getsockname()[1] == 0:
284 raise
285 ov = _overlapped.Overlapped(NULL)
286 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700287
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288 def finish_connect(trans, key, ov):
289 ov.getresult()
290 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
291 conn.setsockopt(socket.SOL_SOCKET,
292 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
293 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700294
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700295 return self._register(ov, conn, finish_connect)
296
297 def accept_pipe(self, pipe):
298 self._register_with_iocp(pipe)
299 ov = _overlapped.Overlapped(NULL)
300 ov.ConnectNamedPipe(pipe.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700301
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302 def finish(trans, key, ov):
303 ov.getresult()
304 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700305
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306 return self._register(ov, pipe, finish)
307
308 def connect_pipe(self, address):
309 ov = _overlapped.Overlapped(NULL)
310 ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700311
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312 def finish(err, handle, ov):
313 # err, handle were arguments passed to PostQueuedCompletionStatus()
314 # in a function run in a thread pool.
315 if err == _overlapped.ERROR_SEM_TIMEOUT:
316 # Connection did not succeed within time limit.
317 msg = _overlapped.FormatMessage(err)
318 raise ConnectionRefusedError(0, msg, None, err)
319 elif err != 0:
320 msg = _overlapped.FormatMessage(err)
321 raise OSError(0, msg, None, err)
322 else:
323 return windows_utils.PipeHandle(handle)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700324
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700325 return self._register(ov, None, finish, wait_for_post=True)
326
Guido van Rossum90fb9142013-10-30 14:44:05 -0700327 def wait_for_handle(self, handle, timeout=None):
328 if timeout is None:
329 ms = _winapi.INFINITE
330 else:
331 ms = int(timeout * 1000 + 0.5)
332
333 # We only create ov so we can use ov.address as a key for the cache.
334 ov = _overlapped.Overlapped(NULL)
335 wh = _overlapped.RegisterWaitWithQueue(
336 handle, self._iocp, ov.address, ms)
337 f = _WaitHandleFuture(wh, loop=self._loop)
338
Richard Oudkerk71196e72013-11-24 17:50:40 +0000339 def finish(trans, key, ov):
Guido van Rossum90fb9142013-10-30 14:44:05 -0700340 if not f.cancelled():
341 try:
342 _overlapped.UnregisterWait(wh)
343 except OSError as e:
344 if e.winerror != _overlapped.ERROR_IO_PENDING:
345 raise
Richard Oudkerk71196e72013-11-24 17:50:40 +0000346 # Note that this second wait means that we should only use
347 # this with handles types where a successful wait has no
348 # effect. So events or processes are all right, but locks
349 # or semaphores are not. Also note if the handle is
350 # signalled and then quickly reset, then we may return
351 # False even though we have not timed out.
352 return (_winapi.WaitForSingleObject(handle, 0) ==
353 _winapi.WAIT_OBJECT_0)
Guido van Rossum90fb9142013-10-30 14:44:05 -0700354
355 self._cache[ov.address] = (f, ov, None, finish)
356 return f
357
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 def _register_with_iocp(self, obj):
359 # To get notifications of finished ops on this objects sent to the
360 # completion port, were must register the handle.
361 if obj not in self._registered:
362 self._registered.add(obj)
363 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
364 # XXX We could also use SetFileCompletionNotificationModes()
365 # to avoid sending notifications to completion port of ops
366 # that succeed immediately.
367
368 def _register(self, ov, obj, callback, wait_for_post=False):
369 # Return a future which will be set with the result of the
370 # operation when it completes. The future's value is actually
371 # the value returned by callback().
372 f = _OverlappedFuture(ov, loop=self._loop)
373 if ov.pending or wait_for_post:
374 # Register the overlapped operation for later. Note that
375 # we only store obj to prevent it from being garbage
376 # collected too early.
377 self._cache[ov.address] = (f, ov, obj, callback)
378 else:
379 # The operation has completed, so no need to postpone the
380 # work. We cannot take this short cut if we need the
381 # NumberOfBytes, CompletionKey values returned by
382 # PostQueuedCompletionStatus().
383 try:
384 value = callback(None, None, ov)
385 except OSError as e:
386 f.set_exception(e)
387 else:
388 f.set_result(value)
389 return f
390
391 def _get_accept_socket(self, family):
392 s = socket.socket(family)
393 s.settimeout(0)
394 return s
395
396 def _poll(self, timeout=None):
397 if timeout is None:
398 ms = INFINITE
399 elif timeout < 0:
400 raise ValueError("negative timeout")
401 else:
402 ms = int(timeout * 1000 + 0.5)
403 if ms >= INFINITE:
404 raise ValueError("timeout too big")
405 while True:
406 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
407 if status is None:
408 return
409 err, transferred, key, address = status
410 try:
411 f, ov, obj, callback = self._cache.pop(address)
412 except KeyError:
413 # key is either zero, or it is used to return a pipe
414 # handle which should be closed to avoid a leak.
415 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
416 _winapi.CloseHandle(key)
417 ms = 0
418 continue
419 if obj in self._stopped_serving:
420 f.cancel()
421 elif not f.cancelled():
422 try:
423 value = callback(transferred, key, ov)
424 except OSError as e:
425 f.set_exception(e)
426 self._results.append(f)
427 else:
428 f.set_result(value)
429 self._results.append(f)
430 ms = 0
431
432 def _stop_serving(self, obj):
433 # obj is a socket or pipe handle. It will be closed in
434 # BaseProactorEventLoop._stop_serving() which will make any
435 # pending operations fail quickly.
436 self._stopped_serving.add(obj)
437
438 def close(self):
439 # Cancel remaining registered operations.
440 for address, (f, ov, obj, callback) in list(self._cache.items()):
441 if obj is None:
442 # The operation was started with connect_pipe() which
443 # queues a task to Windows' thread pool. This cannot
444 # be cancelled, so just forget it.
445 del self._cache[address]
446 else:
447 try:
448 ov.cancel()
449 except OSError:
450 pass
451
452 while self._cache:
453 if not self._poll(1):
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700454 logger.debug('taking long time to close proactor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455
456 self._results = []
457 if self._iocp is not None:
458 _winapi.CloseHandle(self._iocp)
459 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700460
461
462class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
463
464 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
465 self._proc = windows_utils.Popen(
466 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
467 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700468
Guido van Rossum59691282013-10-30 14:52:03 -0700469 def callback(f):
470 returncode = self._proc.poll()
471 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700472
Guido van Rossum59691282013-10-30 14:52:03 -0700473 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
474 f.add_done_callback(callback)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800475
476
477SelectorEventLoop = _WindowsSelectorEventLoop
478
479
480class _WindowsDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
481 _loop_factory = SelectorEventLoop
482
483
484DefaultEventLoopPolicy = _WindowsDefaultEventLoopPolicy