blob: d7444bdf655e23a179264256d34e9340f4173a4c [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Selector and proactor eventloops for Windows."""
2
3import errno
4import socket
Guido van Rossum59691282013-10-30 14:52:03 -07005import subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006import weakref
7import struct
8import _winapi
9
Guido van Rossum59691282013-10-30 14:52:03 -070010from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011from . import futures
12from . import proactor_events
13from . import selector_events
14from . import tasks
15from . import windows_utils
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070016from .log import logger
Guido van Rossum59691282013-10-30 14:52:03 -070017from . import _overlapped
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018
19
20__all__ = ['SelectorEventLoop', 'ProactorEventLoop', 'IocpProactor']
21
22
23NULL = 0
24INFINITE = 0xffffffff
25ERROR_CONNECTION_REFUSED = 1225
26ERROR_CONNECTION_ABORTED = 1236
27
28
29class _OverlappedFuture(futures.Future):
30 """Subclass of Future which represents an overlapped operation.
31
32 Cancelling it will immediately cancel the overlapped operation.
33 """
34
35 def __init__(self, ov, *, loop=None):
36 super().__init__(loop=loop)
37 self.ov = ov
38
39 def cancel(self):
40 try:
41 self.ov.cancel()
42 except OSError:
43 pass
44 return super().cancel()
45
46
Guido van Rossum90fb9142013-10-30 14:44:05 -070047class _WaitHandleFuture(futures.Future):
48 """Subclass of Future which represents a wait handle."""
49
50 def __init__(self, wait_handle, *, loop=None):
51 super().__init__(loop=loop)
52 self._wait_handle = wait_handle
53
54 def cancel(self):
55 super().cancel()
56 try:
57 _overlapped.UnregisterWait(self._wait_handle)
58 except OSError as e:
59 if e.winerror != _overlapped.ERROR_IO_PENDING:
60 raise
61
62
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070063class PipeServer(object):
64 """Class representing a pipe server.
65
66 This is much like a bound, listening socket.
67 """
68 def __init__(self, address):
69 self._address = address
70 self._free_instances = weakref.WeakSet()
71 self._pipe = self._server_pipe_handle(True)
72
73 def _get_unconnected_pipe(self):
74 # Create new instance and return previous one. This ensures
75 # that (until the server is closed) there is always at least
76 # one pipe handle for address. Therefore if a client attempt
77 # to connect it will not fail with FileNotFoundError.
78 tmp, self._pipe = self._pipe, self._server_pipe_handle(False)
79 return tmp
80
81 def _server_pipe_handle(self, first):
82 # Return a wrapper for a new pipe handle.
83 if self._address is None:
84 return None
85 flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
86 if first:
87 flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
88 h = _winapi.CreateNamedPipe(
89 self._address, flags,
90 _winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
91 _winapi.PIPE_WAIT,
92 _winapi.PIPE_UNLIMITED_INSTANCES,
93 windows_utils.BUFSIZE, windows_utils.BUFSIZE,
94 _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
95 pipe = windows_utils.PipeHandle(h)
96 self._free_instances.add(pipe)
97 return pipe
98
99 def close(self):
100 # Close all instances which have not been connected to by a client.
101 if self._address is not None:
102 for pipe in self._free_instances:
103 pipe.close()
104 self._pipe = None
105 self._address = None
106 self._free_instances.clear()
107
108 __del__ = close
109
110
111class SelectorEventLoop(selector_events.BaseSelectorEventLoop):
112 """Windows version of selector event loop."""
113
114 def _socketpair(self):
115 return windows_utils.socketpair()
116
117
118class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
119 """Windows version of proactor event loop using IOCP."""
120
121 def __init__(self, proactor=None):
122 if proactor is None:
123 proactor = IocpProactor()
124 super().__init__(proactor)
125
126 def _socketpair(self):
127 return windows_utils.socketpair()
128
129 @tasks.coroutine
130 def create_pipe_connection(self, protocol_factory, address):
131 f = self._proactor.connect_pipe(address)
132 pipe = yield from f
133 protocol = protocol_factory()
134 trans = self._make_duplex_pipe_transport(pipe, protocol,
135 extra={'addr': address})
136 return trans, protocol
137
138 @tasks.coroutine
139 def start_serving_pipe(self, protocol_factory, address):
140 server = PipeServer(address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700141
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142 def loop(f=None):
143 pipe = None
144 try:
145 if f:
146 pipe = f.result()
147 server._free_instances.discard(pipe)
148 protocol = protocol_factory()
149 self._make_duplex_pipe_transport(
150 pipe, protocol, extra={'addr': address})
151 pipe = server._get_unconnected_pipe()
152 if pipe is None:
153 return
154 f = self._proactor.accept_pipe(pipe)
155 except OSError:
156 if pipe and pipe.fileno() != -1:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700157 logger.exception('Pipe accept failed')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158 pipe.close()
159 except futures.CancelledError:
160 if pipe:
161 pipe.close()
162 else:
163 f.add_done_callback(loop)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700164
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700165 self.call_soon(loop)
166 return [server]
167
168 def _stop_serving(self, server):
169 server.close()
170
Guido van Rossum59691282013-10-30 14:52:03 -0700171 @tasks.coroutine
172 def _make_subprocess_transport(self, protocol, args, shell,
173 stdin, stdout, stderr, bufsize,
174 extra=None, **kwargs):
175 transp = _WindowsSubprocessTransport(self, protocol, args, shell,
176 stdin, stdout, stderr, bufsize,
177 extra=None, **kwargs)
178 yield from transp._post_init()
179 return transp
180
181 def _subprocess_closed(self, transport):
182 pass
183
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700184
185class IocpProactor:
186 """Proactor implementation using IOCP."""
187
188 def __init__(self, concurrency=0xffffffff):
189 self._loop = None
190 self._results = []
191 self._iocp = _overlapped.CreateIoCompletionPort(
192 _overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
193 self._cache = {}
194 self._registered = weakref.WeakSet()
195 self._stopped_serving = weakref.WeakSet()
196
197 def set_loop(self, loop):
198 self._loop = loop
199
200 def select(self, timeout=None):
201 if not self._results:
202 self._poll(timeout)
203 tmp = self._results
204 self._results = []
205 return tmp
206
207 def recv(self, conn, nbytes, flags=0):
208 self._register_with_iocp(conn)
209 ov = _overlapped.Overlapped(NULL)
210 if isinstance(conn, socket.socket):
211 ov.WSARecv(conn.fileno(), nbytes, flags)
212 else:
213 ov.ReadFile(conn.fileno(), nbytes)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700214
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700215 def finish(trans, key, ov):
216 try:
217 return ov.getresult()
218 except OSError as exc:
219 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
220 raise ConnectionResetError(*exc.args)
221 else:
222 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700223
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700224 return self._register(ov, conn, finish)
225
226 def send(self, conn, buf, flags=0):
227 self._register_with_iocp(conn)
228 ov = _overlapped.Overlapped(NULL)
229 if isinstance(conn, socket.socket):
230 ov.WSASend(conn.fileno(), buf, flags)
231 else:
232 ov.WriteFile(conn.fileno(), buf)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700233
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234 def finish(trans, key, ov):
235 try:
236 return ov.getresult()
237 except OSError as exc:
238 if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
239 raise ConnectionResetError(*exc.args)
240 else:
241 raise
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700242
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700243 return self._register(ov, conn, finish)
244
245 def accept(self, listener):
246 self._register_with_iocp(listener)
247 conn = self._get_accept_socket(listener.family)
248 ov = _overlapped.Overlapped(NULL)
249 ov.AcceptEx(listener.fileno(), conn.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700250
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251 def finish_accept(trans, key, ov):
252 ov.getresult()
253 # Use SO_UPDATE_ACCEPT_CONTEXT so getsockname() etc work.
254 buf = struct.pack('@P', listener.fileno())
255 conn.setsockopt(socket.SOL_SOCKET,
256 _overlapped.SO_UPDATE_ACCEPT_CONTEXT, buf)
257 conn.settimeout(listener.gettimeout())
258 return conn, conn.getpeername()
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700259
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260 return self._register(ov, listener, finish_accept)
261
262 def connect(self, conn, address):
263 self._register_with_iocp(conn)
264 # The socket needs to be locally bound before we call ConnectEx().
265 try:
266 _overlapped.BindLocal(conn.fileno(), conn.family)
267 except OSError as e:
268 if e.winerror != errno.WSAEINVAL:
269 raise
270 # Probably already locally bound; check using getsockname().
271 if conn.getsockname()[1] == 0:
272 raise
273 ov = _overlapped.Overlapped(NULL)
274 ov.ConnectEx(conn.fileno(), address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700275
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276 def finish_connect(trans, key, ov):
277 ov.getresult()
278 # Use SO_UPDATE_CONNECT_CONTEXT so getsockname() etc work.
279 conn.setsockopt(socket.SOL_SOCKET,
280 _overlapped.SO_UPDATE_CONNECT_CONTEXT, 0)
281 return conn
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700282
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700283 return self._register(ov, conn, finish_connect)
284
285 def accept_pipe(self, pipe):
286 self._register_with_iocp(pipe)
287 ov = _overlapped.Overlapped(NULL)
288 ov.ConnectNamedPipe(pipe.fileno())
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700289
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700290 def finish(trans, key, ov):
291 ov.getresult()
292 return pipe
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700293
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 return self._register(ov, pipe, finish)
295
296 def connect_pipe(self, address):
297 ov = _overlapped.Overlapped(NULL)
298 ov.WaitNamedPipeAndConnect(address, self._iocp, ov.address)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700299
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 def finish(err, handle, ov):
301 # err, handle were arguments passed to PostQueuedCompletionStatus()
302 # in a function run in a thread pool.
303 if err == _overlapped.ERROR_SEM_TIMEOUT:
304 # Connection did not succeed within time limit.
305 msg = _overlapped.FormatMessage(err)
306 raise ConnectionRefusedError(0, msg, None, err)
307 elif err != 0:
308 msg = _overlapped.FormatMessage(err)
309 raise OSError(0, msg, None, err)
310 else:
311 return windows_utils.PipeHandle(handle)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700312
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313 return self._register(ov, None, finish, wait_for_post=True)
314
Guido van Rossum90fb9142013-10-30 14:44:05 -0700315 def wait_for_handle(self, handle, timeout=None):
316 if timeout is None:
317 ms = _winapi.INFINITE
318 else:
319 ms = int(timeout * 1000 + 0.5)
320
321 # We only create ov so we can use ov.address as a key for the cache.
322 ov = _overlapped.Overlapped(NULL)
323 wh = _overlapped.RegisterWaitWithQueue(
324 handle, self._iocp, ov.address, ms)
325 f = _WaitHandleFuture(wh, loop=self._loop)
326
327 def finish(timed_out, _, ov):
328 if not f.cancelled():
329 try:
330 _overlapped.UnregisterWait(wh)
331 except OSError as e:
332 if e.winerror != _overlapped.ERROR_IO_PENDING:
333 raise
334 return not timed_out
335
336 self._cache[ov.address] = (f, ov, None, finish)
337 return f
338
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 def _register_with_iocp(self, obj):
340 # To get notifications of finished ops on this objects sent to the
341 # completion port, were must register the handle.
342 if obj not in self._registered:
343 self._registered.add(obj)
344 _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
345 # XXX We could also use SetFileCompletionNotificationModes()
346 # to avoid sending notifications to completion port of ops
347 # that succeed immediately.
348
349 def _register(self, ov, obj, callback, wait_for_post=False):
350 # Return a future which will be set with the result of the
351 # operation when it completes. The future's value is actually
352 # the value returned by callback().
353 f = _OverlappedFuture(ov, loop=self._loop)
354 if ov.pending or wait_for_post:
355 # Register the overlapped operation for later. Note that
356 # we only store obj to prevent it from being garbage
357 # collected too early.
358 self._cache[ov.address] = (f, ov, obj, callback)
359 else:
360 # The operation has completed, so no need to postpone the
361 # work. We cannot take this short cut if we need the
362 # NumberOfBytes, CompletionKey values returned by
363 # PostQueuedCompletionStatus().
364 try:
365 value = callback(None, None, ov)
366 except OSError as e:
367 f.set_exception(e)
368 else:
369 f.set_result(value)
370 return f
371
372 def _get_accept_socket(self, family):
373 s = socket.socket(family)
374 s.settimeout(0)
375 return s
376
377 def _poll(self, timeout=None):
378 if timeout is None:
379 ms = INFINITE
380 elif timeout < 0:
381 raise ValueError("negative timeout")
382 else:
383 ms = int(timeout * 1000 + 0.5)
384 if ms >= INFINITE:
385 raise ValueError("timeout too big")
386 while True:
387 status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
388 if status is None:
389 return
390 err, transferred, key, address = status
391 try:
392 f, ov, obj, callback = self._cache.pop(address)
393 except KeyError:
394 # key is either zero, or it is used to return a pipe
395 # handle which should be closed to avoid a leak.
396 if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
397 _winapi.CloseHandle(key)
398 ms = 0
399 continue
400 if obj in self._stopped_serving:
401 f.cancel()
402 elif not f.cancelled():
403 try:
404 value = callback(transferred, key, ov)
405 except OSError as e:
406 f.set_exception(e)
407 self._results.append(f)
408 else:
409 f.set_result(value)
410 self._results.append(f)
411 ms = 0
412
413 def _stop_serving(self, obj):
414 # obj is a socket or pipe handle. It will be closed in
415 # BaseProactorEventLoop._stop_serving() which will make any
416 # pending operations fail quickly.
417 self._stopped_serving.add(obj)
418
419 def close(self):
420 # Cancel remaining registered operations.
421 for address, (f, ov, obj, callback) in list(self._cache.items()):
422 if obj is None:
423 # The operation was started with connect_pipe() which
424 # queues a task to Windows' thread pool. This cannot
425 # be cancelled, so just forget it.
426 del self._cache[address]
427 else:
428 try:
429 ov.cancel()
430 except OSError:
431 pass
432
433 while self._cache:
434 if not self._poll(1):
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700435 logger.debug('taking long time to close proactor')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436
437 self._results = []
438 if self._iocp is not None:
439 _winapi.CloseHandle(self._iocp)
440 self._iocp = None
Guido van Rossum59691282013-10-30 14:52:03 -0700441
442
443class _WindowsSubprocessTransport(base_subprocess.BaseSubprocessTransport):
444
445 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
446 self._proc = windows_utils.Popen(
447 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
448 bufsize=bufsize, **kwargs)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700449
Guido van Rossum59691282013-10-30 14:52:03 -0700450 def callback(f):
451 returncode = self._proc.poll()
452 self._process_exited(returncode)
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700453
Guido van Rossum59691282013-10-30 14:52:03 -0700454 f = self._loop._proactor.wait_for_handle(int(self._proc._handle))
455 f.add_done_callback(callback)