blob: 4ba4f49cb9950e787e16c779742b382e95f9d32e [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Yury Selivanovb057c522014-02-18 12:15:06 -050014from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070015from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016from . import constants
17from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020019from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020021from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070022from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
24
Victor Stinner915bcb02014-02-01 22:49:59 +010025__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080026 'AbstractChildWatcher', 'SafeChildWatcher',
27 'FastChildWatcher', 'DefaultEventLoopPolicy',
28 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030if sys.platform == 'win32': # pragma: no cover
31 raise ImportError('Signals are not really supported on Windows')
32
33
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080034class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050035 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070036
Yury Selivanovb057c522014-02-18 12:15:06 -050037 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038 """
39
40 def __init__(self, selector=None):
41 super().__init__(selector)
42 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043
44 def _socketpair(self):
45 return socket.socketpair()
46
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080047 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020048 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080049 for sig in list(self._signal_handlers):
50 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080051
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070052 def add_signal_handler(self, sig, callback, *args):
53 """Add a handler for a signal. UNIX only.
54
55 Raise ValueError if the signal number is invalid or uncatchable.
56 Raise RuntimeError if there is a problem setting up the handler.
57 """
58 self._check_signal(sig)
59 try:
60 # set_wakeup_fd() raises ValueError if this is not the
61 # main thread. By calling it early we ensure that an
62 # event loop running in another thread cannot add a signal
63 # handler.
64 signal.set_wakeup_fd(self._csock.fileno())
65 except ValueError as exc:
66 raise RuntimeError(str(exc))
67
Yury Selivanov569efa22014-02-18 18:02:19 -050068 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069 self._signal_handlers[sig] = handle
70
71 try:
72 signal.signal(sig, self._handle_signal)
Charles-François Natali74e7cf32013-12-05 22:47:19 +010073 # Set SA_RESTART to limit EINTR occurrences.
74 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075 except OSError as exc:
76 del self._signal_handlers[sig]
77 if not self._signal_handlers:
78 try:
79 signal.set_wakeup_fd(-1)
80 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070081 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070082
83 if exc.errno == errno.EINVAL:
84 raise RuntimeError('sig {} cannot be caught'.format(sig))
85 else:
86 raise
87
88 def _handle_signal(self, sig, arg):
89 """Internal helper that is the actual signal handler."""
90 handle = self._signal_handlers.get(sig)
91 if handle is None:
92 return # Assume it's some race condition.
93 if handle._cancelled:
94 self.remove_signal_handler(sig) # Remove it properly.
95 else:
96 self._add_callback_signalsafe(handle)
97
98 def remove_signal_handler(self, sig):
99 """Remove a handler for a signal. UNIX only.
100
101 Return True if a signal handler was removed, False if not.
102 """
103 self._check_signal(sig)
104 try:
105 del self._signal_handlers[sig]
106 except KeyError:
107 return False
108
109 if sig == signal.SIGINT:
110 handler = signal.default_int_handler
111 else:
112 handler = signal.SIG_DFL
113
114 try:
115 signal.signal(sig, handler)
116 except OSError as exc:
117 if exc.errno == errno.EINVAL:
118 raise RuntimeError('sig {} cannot be caught'.format(sig))
119 else:
120 raise
121
122 if not self._signal_handlers:
123 try:
124 signal.set_wakeup_fd(-1)
125 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700126 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127
128 return True
129
130 def _check_signal(self, sig):
131 """Internal helper to validate a signal.
132
133 Raise ValueError if the signal number is invalid or uncatchable.
134 Raise RuntimeError if there is a problem setting up the handler.
135 """
136 if not isinstance(sig, int):
137 raise TypeError('sig must be an int, not {!r}'.format(sig))
138
139 if not (1 <= sig < signal.NSIG):
140 raise ValueError(
141 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
142
143 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
144 extra=None):
145 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
146
147 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
148 extra=None):
149 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
150
Victor Stinnerf951d282014-06-29 00:46:45 +0200151 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700152 def _make_subprocess_transport(self, protocol, args, shell,
153 stdin, stdout, stderr, bufsize,
154 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800155 with events.get_child_watcher() as watcher:
156 transp = _UnixSubprocessTransport(self, protocol, args, shell,
157 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800158 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800159 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800160 watcher.add_child_handler(transp.get_pid(),
161 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800162
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 return transp
164
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800165 def _child_watcher_callback(self, pid, returncode, transp):
166 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167
Victor Stinnerf951d282014-06-29 00:46:45 +0200168 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500169 def create_unix_connection(self, protocol_factory, path, *,
170 ssl=None, sock=None,
171 server_hostname=None):
172 assert server_hostname is None or isinstance(server_hostname, str)
173 if ssl:
174 if server_hostname is None:
175 raise ValueError(
176 'you have to pass server_hostname when using ssl')
177 else:
178 if server_hostname is not None:
179 raise ValueError('server_hostname is only meaningful with ssl')
180
181 if path is not None:
182 if sock is not None:
183 raise ValueError(
184 'path and sock can not be specified at the same time')
185
Victor Stinner79a29522014-02-19 01:45:59 +0100186 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500187 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500188 sock.setblocking(False)
189 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100190 except:
191 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500192 raise
193
194 else:
195 if sock is None:
196 raise ValueError('no path and sock were specified')
197 sock.setblocking(False)
198
199 transport, protocol = yield from self._create_connection_transport(
200 sock, protocol_factory, ssl, server_hostname)
201 return transport, protocol
202
Victor Stinnerf951d282014-06-29 00:46:45 +0200203 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500204 def create_unix_server(self, protocol_factory, path=None, *,
205 sock=None, backlog=100, ssl=None):
206 if isinstance(ssl, bool):
207 raise TypeError('ssl argument must be an SSLContext or None')
208
209 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200210 if sock is not None:
211 raise ValueError(
212 'path and sock can not be specified at the same time')
213
Yury Selivanovb057c522014-02-18 12:15:06 -0500214 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
215
216 try:
217 sock.bind(path)
218 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100219 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500220 if exc.errno == errno.EADDRINUSE:
221 # Let's improve the error message by adding
222 # with what exact address it occurs.
223 msg = 'Address {!r} is already in use'.format(path)
224 raise OSError(errno.EADDRINUSE, msg) from None
225 else:
226 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200227 except:
228 sock.close()
229 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500230 else:
231 if sock is None:
232 raise ValueError(
233 'path was not specified, and no sock specified')
234
235 if sock.family != socket.AF_UNIX:
236 raise ValueError(
237 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
238
239 server = base_events.Server(self, [sock])
240 sock.listen(backlog)
241 sock.setblocking(False)
242 self._start_serving(protocol_factory, sock, ssl, server)
243 return server
244
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700245
246def _set_nonblocking(fd):
247 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
248 flags = flags | os.O_NONBLOCK
249 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
250
251
252class _UnixReadPipeTransport(transports.ReadTransport):
253
Yury Selivanovdec1a452014-02-18 22:27:48 -0500254 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700255
256 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
257 super().__init__(extra)
258 self._extra['pipe'] = pipe
259 self._loop = loop
260 self._pipe = pipe
261 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700262 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800263 if not (stat.S_ISFIFO(mode) or
264 stat.S_ISSOCK(mode) or
265 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700266 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700267 _set_nonblocking(self._fileno)
268 self._protocol = protocol
269 self._closing = False
270 self._loop.add_reader(self._fileno, self._read_ready)
271 self._loop.call_soon(self._protocol.connection_made, self)
272 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200273 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200274 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275
Victor Stinnere912e652014-07-12 03:11:53 +0200276 def __repr__(self):
277 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
278 if self._pipe is not None:
279 polling = selector_events._test_selector_event(
280 self._loop._selector,
281 self._fileno, selectors.EVENT_READ)
282 if polling:
283 info.append('polling')
284 else:
285 info.append('idle')
286 else:
287 info.append('closed')
288 return '<%s>' % ' '.join(info)
289
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700290 def _read_ready(self):
291 try:
292 data = os.read(self._fileno, self.max_size)
293 except (BlockingIOError, InterruptedError):
294 pass
295 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100296 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 else:
298 if data:
299 self._protocol.data_received(data)
300 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200301 if self._loop.get_debug():
302 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303 self._closing = True
304 self._loop.remove_reader(self._fileno)
305 self._loop.call_soon(self._protocol.eof_received)
306 self._loop.call_soon(self._call_connection_lost, None)
307
Guido van Rossum57497ad2013-10-18 07:58:20 -0700308 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 self._loop.remove_reader(self._fileno)
310
Guido van Rossum57497ad2013-10-18 07:58:20 -0700311 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312 self._loop.add_reader(self._fileno, self._read_ready)
313
314 def close(self):
315 if not self._closing:
316 self._close(None)
317
Victor Stinner0ee29c22014-02-19 01:40:41 +0100318 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319 # should be called by exception handler only
Guido van Rossum02757ea2014-01-10 13:30:04 -0800320 if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
Yury Selivanov569efa22014-02-18 18:02:19 -0500321 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100322 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500323 'exception': exc,
324 'transport': self,
325 'protocol': self._protocol,
326 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700327 self._close(exc)
328
329 def _close(self, exc):
330 self._closing = True
331 self._loop.remove_reader(self._fileno)
332 self._loop.call_soon(self._call_connection_lost, exc)
333
334 def _call_connection_lost(self, exc):
335 try:
336 self._protocol.connection_lost(exc)
337 finally:
338 self._pipe.close()
339 self._pipe = None
340 self._protocol = None
341 self._loop = None
342
343
Yury Selivanov3cb99142014-02-18 18:41:13 -0500344class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800345 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346
347 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
348 super().__init__(extra)
349 self._extra['pipe'] = pipe
350 self._loop = loop
351 self._pipe = pipe
352 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700353 mode = os.fstat(self._fileno).st_mode
354 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100355 if not (is_socket or
356 stat.S_ISFIFO(mode) or
357 stat.S_ISCHR(mode)):
358 raise ValueError("Pipe transport is only for "
359 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700360 _set_nonblocking(self._fileno)
361 self._protocol = protocol
362 self._buffer = []
363 self._conn_lost = 0
364 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700365
366 # On AIX, the reader trick only works for sockets.
367 # On other platforms it works for pipes and sockets.
368 # (Exception: OS X 10.4? Issue #19294.)
369 if is_socket or not sys.platform.startswith("aix"):
370 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700371
372 self._loop.call_soon(self._protocol.connection_made, self)
373 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200374 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200375 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376
Victor Stinnere912e652014-07-12 03:11:53 +0200377 def __repr__(self):
378 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
379 if self._pipe is not None:
380 polling = selector_events._test_selector_event(
381 self._loop._selector,
382 self._fileno, selectors.EVENT_WRITE)
383 if polling:
384 info.append('polling')
385 else:
386 info.append('idle')
387
388 bufsize = self.get_write_buffer_size()
389 info.append('bufsize=%s' % bufsize)
390 else:
391 info.append('closed')
392 return '<%s>' % ' '.join(info)
393
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800394 def get_write_buffer_size(self):
395 return sum(len(data) for data in self._buffer)
396
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700398 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200399 if self._loop.get_debug():
400 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100401 if self._buffer:
402 self._close(BrokenPipeError())
403 else:
404 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405
406 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800407 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
408 if isinstance(data, bytearray):
409 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410 if not data:
411 return
412
413 if self._conn_lost or self._closing:
414 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700415 logger.warning('pipe closed by peer or '
416 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417 self._conn_lost += 1
418 return
419
420 if not self._buffer:
421 # Attempt to send it right away first.
422 try:
423 n = os.write(self._fileno, data)
424 except (BlockingIOError, InterruptedError):
425 n = 0
426 except Exception as exc:
427 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100428 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 return
430 if n == len(data):
431 return
432 elif n > 0:
433 data = data[n:]
434 self._loop.add_writer(self._fileno, self._write_ready)
435
436 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800437 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438
439 def _write_ready(self):
440 data = b''.join(self._buffer)
441 assert data, 'Data should not be empty'
442
443 self._buffer.clear()
444 try:
445 n = os.write(self._fileno, data)
446 except (BlockingIOError, InterruptedError):
447 self._buffer.append(data)
448 except Exception as exc:
449 self._conn_lost += 1
450 # Remove writer here, _fatal_error() doesn't it
451 # because _buffer is empty.
452 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100453 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454 else:
455 if n == len(data):
456 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800457 self._maybe_resume_protocol() # May append to buffer.
458 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700459 self._loop.remove_reader(self._fileno)
460 self._call_connection_lost(None)
461 return
462 elif n > 0:
463 data = data[n:]
464
465 self._buffer.append(data) # Try again later.
466
467 def can_write_eof(self):
468 return True
469
470 # TODO: Make the relationships between write_eof(), close(),
471 # abort(), _fatal_error() and _close() more straightforward.
472
473 def write_eof(self):
474 if self._closing:
475 return
476 assert self._pipe
477 self._closing = True
478 if not self._buffer:
479 self._loop.remove_reader(self._fileno)
480 self._loop.call_soon(self._call_connection_lost, None)
481
482 def close(self):
483 if not self._closing:
484 # write_eof is all what we needed to close the write pipe
485 self.write_eof()
486
487 def abort(self):
488 self._close(None)
489
Victor Stinner0ee29c22014-02-19 01:40:41 +0100490 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700491 # should be called by exception handler only
Victor Stinner63b4d4b2014-01-29 13:12:03 -0800492 if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
Yury Selivanov569efa22014-02-18 18:02:19 -0500493 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100494 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500495 'exception': exc,
496 'transport': self,
497 'protocol': self._protocol,
498 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700499 self._close(exc)
500
501 def _close(self, exc=None):
502 self._closing = True
503 if self._buffer:
504 self._loop.remove_writer(self._fileno)
505 self._buffer.clear()
506 self._loop.remove_reader(self._fileno)
507 self._loop.call_soon(self._call_connection_lost, exc)
508
509 def _call_connection_lost(self, exc):
510 try:
511 self._protocol.connection_lost(exc)
512 finally:
513 self._pipe.close()
514 self._pipe = None
515 self._protocol = None
516 self._loop = None
517
518
Guido van Rossum59691282013-10-30 14:52:03 -0700519class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700520
Guido van Rossum59691282013-10-30 14:52:03 -0700521 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700522 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700523 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700524 # Use a socket pair for stdin, since not all platforms
525 # support selecting read events on the write end of a
526 # socket (which we use in order to detect closing of the
527 # other end). Notably this is needed on AIX, and works
528 # just fine on other platforms.
529 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530 self._proc = subprocess.Popen(
531 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
532 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700533 if stdin_w is not None:
534 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200535 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800536
537
538class AbstractChildWatcher:
539 """Abstract base class for monitoring child processes.
540
541 Objects derived from this class monitor a collection of subprocesses and
542 report their termination or interruption by a signal.
543
544 New callbacks are registered with .add_child_handler(). Starting a new
545 process must be done within a 'with' block to allow the watcher to suspend
546 its activity until the new process if fully registered (this is needed to
547 prevent a race condition in some implementations).
548
549 Example:
550 with watcher:
551 proc = subprocess.Popen("sleep 1")
552 watcher.add_child_handler(proc.pid, callback)
553
554 Notes:
555 Implementations of this class must be thread-safe.
556
557 Since child watcher objects may catch the SIGCHLD signal and call
558 waitpid(-1), there should be only one active object per process.
559 """
560
561 def add_child_handler(self, pid, callback, *args):
562 """Register a new child handler.
563
564 Arrange for callback(pid, returncode, *args) to be called when
565 process 'pid' terminates. Specifying another callback for the same
566 process replaces the previous handler.
567
Victor Stinneracdb7822014-07-14 18:33:40 +0200568 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800569 """
570 raise NotImplementedError()
571
572 def remove_child_handler(self, pid):
573 """Removes the handler for process 'pid'.
574
575 The function returns True if the handler was successfully removed,
576 False if there was nothing to remove."""
577
578 raise NotImplementedError()
579
Guido van Rossum2bcae702013-11-13 15:50:08 -0800580 def attach_loop(self, loop):
581 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800582
Guido van Rossum2bcae702013-11-13 15:50:08 -0800583 If the watcher was previously attached to an event loop, then it is
584 first detached before attaching to the new loop.
585
586 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800587 """
588 raise NotImplementedError()
589
590 def close(self):
591 """Close the watcher.
592
593 This must be called to make sure that any underlying resource is freed.
594 """
595 raise NotImplementedError()
596
597 def __enter__(self):
598 """Enter the watcher's context and allow starting new processes
599
600 This function must return self"""
601 raise NotImplementedError()
602
603 def __exit__(self, a, b, c):
604 """Exit the watcher's context"""
605 raise NotImplementedError()
606
607
608class BaseChildWatcher(AbstractChildWatcher):
609
Guido van Rossum2bcae702013-11-13 15:50:08 -0800610 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800611 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800612
613 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800614 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800615
616 def _do_waitpid(self, expected_pid):
617 raise NotImplementedError()
618
619 def _do_waitpid_all(self):
620 raise NotImplementedError()
621
Guido van Rossum2bcae702013-11-13 15:50:08 -0800622 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800623 assert loop is None or isinstance(loop, events.AbstractEventLoop)
624
625 if self._loop is not None:
626 self._loop.remove_signal_handler(signal.SIGCHLD)
627
628 self._loop = loop
629 if loop is not None:
630 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
631
632 # Prevent a race condition in case a child terminated
633 # during the switch.
634 self._do_waitpid_all()
635
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800636 def _sig_chld(self):
637 try:
638 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500639 except Exception as exc:
640 # self._loop should always be available here
641 # as '_sig_chld' is added as a signal handler
642 # in 'attach_loop'
643 self._loop.call_exception_handler({
644 'message': 'Unknown exception in SIGCHLD handler',
645 'exception': exc,
646 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800647
648 def _compute_returncode(self, status):
649 if os.WIFSIGNALED(status):
650 # The child process died because of a signal.
651 return -os.WTERMSIG(status)
652 elif os.WIFEXITED(status):
653 # The child process exited (e.g sys.exit()).
654 return os.WEXITSTATUS(status)
655 else:
656 # The child exited, but we don't understand its status.
657 # This shouldn't happen, but if it does, let's just
658 # return that status; perhaps that helps debug it.
659 return status
660
661
662class SafeChildWatcher(BaseChildWatcher):
663 """'Safe' child watcher implementation.
664
665 This implementation avoids disrupting other code spawning processes by
666 polling explicitly each process in the SIGCHLD handler instead of calling
667 os.waitpid(-1).
668
669 This is a safe solution but it has a significant overhead when handling a
670 big number of children (O(n) each time SIGCHLD is raised)
671 """
672
Guido van Rossum2bcae702013-11-13 15:50:08 -0800673 def __init__(self):
674 super().__init__()
675 self._callbacks = {}
676
677 def close(self):
678 self._callbacks.clear()
679 super().close()
680
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800681 def __enter__(self):
682 return self
683
684 def __exit__(self, a, b, c):
685 pass
686
687 def add_child_handler(self, pid, callback, *args):
688 self._callbacks[pid] = callback, args
689
690 # Prevent a race condition in case the child is already terminated.
691 self._do_waitpid(pid)
692
Guido van Rossum2bcae702013-11-13 15:50:08 -0800693 def remove_child_handler(self, pid):
694 try:
695 del self._callbacks[pid]
696 return True
697 except KeyError:
698 return False
699
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800700 def _do_waitpid_all(self):
701
702 for pid in list(self._callbacks):
703 self._do_waitpid(pid)
704
705 def _do_waitpid(self, expected_pid):
706 assert expected_pid > 0
707
708 try:
709 pid, status = os.waitpid(expected_pid, os.WNOHANG)
710 except ChildProcessError:
711 # The child process is already reaped
712 # (may happen if waitpid() is called elsewhere).
713 pid = expected_pid
714 returncode = 255
715 logger.warning(
716 "Unknown child process pid %d, will report returncode 255",
717 pid)
718 else:
719 if pid == 0:
720 # The child process is still alive.
721 return
722
723 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200724 if self._loop.get_debug():
725 logger.debug('process %s exited with returncode %s',
726 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800727
728 try:
729 callback, args = self._callbacks.pop(pid)
730 except KeyError: # pragma: no cover
731 # May happen if .remove_child_handler() is called
732 # after os.waitpid() returns.
733 pass
734 else:
735 callback(pid, returncode, *args)
736
737
738class FastChildWatcher(BaseChildWatcher):
739 """'Fast' child watcher implementation.
740
741 This implementation reaps every terminated processes by calling
742 os.waitpid(-1) directly, possibly breaking other code spawning processes
743 and waiting for their termination.
744
745 There is no noticeable overhead when handling a big number of children
746 (O(1) each time a child terminates).
747 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800748 def __init__(self):
749 super().__init__()
750 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800751 self._lock = threading.Lock()
752 self._zombies = {}
753 self._forks = 0
754
755 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800756 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800757 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800758 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800759
760 def __enter__(self):
761 with self._lock:
762 self._forks += 1
763
764 return self
765
766 def __exit__(self, a, b, c):
767 with self._lock:
768 self._forks -= 1
769
770 if self._forks or not self._zombies:
771 return
772
773 collateral_victims = str(self._zombies)
774 self._zombies.clear()
775
776 logger.warning(
777 "Caught subprocesses termination from unknown pids: %s",
778 collateral_victims)
779
780 def add_child_handler(self, pid, callback, *args):
781 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800782 with self._lock:
783 try:
784 returncode = self._zombies.pop(pid)
785 except KeyError:
786 # The child is running.
787 self._callbacks[pid] = callback, args
788 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800789
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800790 # The child is dead already. We can fire the callback.
791 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800792
Guido van Rossum2bcae702013-11-13 15:50:08 -0800793 def remove_child_handler(self, pid):
794 try:
795 del self._callbacks[pid]
796 return True
797 except KeyError:
798 return False
799
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800800 def _do_waitpid_all(self):
801 # Because of signal coalescing, we must keep calling waitpid() as
802 # long as we're able to reap a child.
803 while True:
804 try:
805 pid, status = os.waitpid(-1, os.WNOHANG)
806 except ChildProcessError:
807 # No more child processes exist.
808 return
809 else:
810 if pid == 0:
811 # A child process is still alive.
812 return
813
814 returncode = self._compute_returncode(status)
815
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800816 with self._lock:
817 try:
818 callback, args = self._callbacks.pop(pid)
819 except KeyError:
820 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800821 if self._forks:
822 # It may not be registered yet.
823 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200824 if self._loop.get_debug():
825 logger.debug('unknown process %s exited '
826 'with returncode %s',
827 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800828 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800829 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200830 else:
831 if self._loop.get_debug():
832 logger.debug('process %s exited with returncode %s',
833 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800834
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800835 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800836 logger.warning(
837 "Caught subprocess termination from unknown pid: "
838 "%d -> %d", pid, returncode)
839 else:
840 callback(pid, returncode, *args)
841
842
843class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
844 """XXX"""
845 _loop_factory = _UnixSelectorEventLoop
846
847 def __init__(self):
848 super().__init__()
849 self._watcher = None
850
851 def _init_watcher(self):
852 with events._lock:
853 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800854 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800855 if isinstance(threading.current_thread(),
856 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800857 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800858
859 def set_event_loop(self, loop):
860 """Set the event loop.
861
862 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800863 .set_event_loop() from the main thread will call .attach_loop(loop) on
864 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800865 """
866
867 super().set_event_loop(loop)
868
869 if self._watcher is not None and \
870 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800871 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800872
873 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200874 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800875
876 If not yet set, a SafeChildWatcher object is automatically created.
877 """
878 if self._watcher is None:
879 self._init_watcher()
880
881 return self._watcher
882
883 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200884 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800885
886 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
887
888 if self._watcher is not None:
889 self._watcher.close()
890
891 self._watcher = watcher
892
893SelectorEventLoop = _UnixSelectorEventLoop
894DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy