blob: 335a77d1bd5d01f60ac09cea1553be1079dc7749 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Yury Selivanovb057c522014-02-18 12:15:06 -050013from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
16from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020018from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
Victor Stinner915bcb02014-02-01 22:49:59 +010024__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080025 'AbstractChildWatcher', 'SafeChildWatcher',
26 'FastChildWatcher', 'DefaultEventLoopPolicy',
27 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029if sys.platform == 'win32': # pragma: no cover
30 raise ImportError('Signals are not really supported on Windows')
31
32
Victor Stinnerfe5649c2014-07-17 22:43:40 +020033def _sighandler_noop(signum, frame):
34 """Dummy signal handler."""
35 pass
36
37
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080038class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050039 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040
Yury Selivanovb057c522014-02-18 12:15:06 -050041 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042 """
43
44 def __init__(self, selector=None):
45 super().__init__(selector)
46 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48 def _socketpair(self):
49 return socket.socketpair()
50
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080051 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020052 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080053 for sig in list(self._signal_handlers):
54 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080055
Victor Stinnerfe5649c2014-07-17 22:43:40 +020056 def _process_self_data(self, data):
57 for signum in data:
58 if not signum:
59 # ignore null bytes written by _write_to_self()
60 continue
61 self._handle_signal(signum)
62
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070063 def add_signal_handler(self, sig, callback, *args):
64 """Add a handler for a signal. UNIX only.
65
66 Raise ValueError if the signal number is invalid or uncatchable.
67 Raise RuntimeError if there is a problem setting up the handler.
68 """
69 self._check_signal(sig)
70 try:
71 # set_wakeup_fd() raises ValueError if this is not the
72 # main thread. By calling it early we ensure that an
73 # event loop running in another thread cannot add a signal
74 # handler.
75 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020076 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 raise RuntimeError(str(exc))
78
Yury Selivanov569efa22014-02-18 18:02:19 -050079 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070080 self._signal_handlers[sig] = handle
81
82 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020083 # Register a dummy signal handler to ask Python to write the signal
84 # number in the wakup file descriptor. _process_self_data() will
85 # read signal numbers from this file descriptor to handle signals.
86 signal.signal(sig, _sighandler_noop)
87
Charles-François Natali74e7cf32013-12-05 22:47:19 +010088 # Set SA_RESTART to limit EINTR occurrences.
89 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070090 except OSError as exc:
91 del self._signal_handlers[sig]
92 if not self._signal_handlers:
93 try:
94 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +020095 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070096 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070097
98 if exc.errno == errno.EINVAL:
99 raise RuntimeError('sig {} cannot be caught'.format(sig))
100 else:
101 raise
102
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200103 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700104 """Internal helper that is the actual signal handler."""
105 handle = self._signal_handlers.get(sig)
106 if handle is None:
107 return # Assume it's some race condition.
108 if handle._cancelled:
109 self.remove_signal_handler(sig) # Remove it properly.
110 else:
111 self._add_callback_signalsafe(handle)
112
113 def remove_signal_handler(self, sig):
114 """Remove a handler for a signal. UNIX only.
115
116 Return True if a signal handler was removed, False if not.
117 """
118 self._check_signal(sig)
119 try:
120 del self._signal_handlers[sig]
121 except KeyError:
122 return False
123
124 if sig == signal.SIGINT:
125 handler = signal.default_int_handler
126 else:
127 handler = signal.SIG_DFL
128
129 try:
130 signal.signal(sig, handler)
131 except OSError as exc:
132 if exc.errno == errno.EINVAL:
133 raise RuntimeError('sig {} cannot be caught'.format(sig))
134 else:
135 raise
136
137 if not self._signal_handlers:
138 try:
139 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200140 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700141 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142
143 return True
144
145 def _check_signal(self, sig):
146 """Internal helper to validate a signal.
147
148 Raise ValueError if the signal number is invalid or uncatchable.
149 Raise RuntimeError if there is a problem setting up the handler.
150 """
151 if not isinstance(sig, int):
152 raise TypeError('sig must be an int, not {!r}'.format(sig))
153
154 if not (1 <= sig < signal.NSIG):
155 raise ValueError(
156 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
157
158 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
159 extra=None):
160 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
161
162 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
163 extra=None):
164 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
165
Victor Stinnerf951d282014-06-29 00:46:45 +0200166 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 def _make_subprocess_transport(self, protocol, args, shell,
168 stdin, stdout, stderr, bufsize,
169 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800170 with events.get_child_watcher() as watcher:
171 transp = _UnixSubprocessTransport(self, protocol, args, shell,
172 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800173 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800174 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800175 watcher.add_child_handler(transp.get_pid(),
176 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800177
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700178 return transp
179
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800180 def _child_watcher_callback(self, pid, returncode, transp):
181 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182
Victor Stinnerf951d282014-06-29 00:46:45 +0200183 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500184 def create_unix_connection(self, protocol_factory, path, *,
185 ssl=None, sock=None,
186 server_hostname=None):
187 assert server_hostname is None or isinstance(server_hostname, str)
188 if ssl:
189 if server_hostname is None:
190 raise ValueError(
191 'you have to pass server_hostname when using ssl')
192 else:
193 if server_hostname is not None:
194 raise ValueError('server_hostname is only meaningful with ssl')
195
196 if path is not None:
197 if sock is not None:
198 raise ValueError(
199 'path and sock can not be specified at the same time')
200
Victor Stinner79a29522014-02-19 01:45:59 +0100201 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500202 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500203 sock.setblocking(False)
204 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100205 except:
206 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500207 raise
208
209 else:
210 if sock is None:
211 raise ValueError('no path and sock were specified')
212 sock.setblocking(False)
213
214 transport, protocol = yield from self._create_connection_transport(
215 sock, protocol_factory, ssl, server_hostname)
216 return transport, protocol
217
Victor Stinnerf951d282014-06-29 00:46:45 +0200218 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500219 def create_unix_server(self, protocol_factory, path=None, *,
220 sock=None, backlog=100, ssl=None):
221 if isinstance(ssl, bool):
222 raise TypeError('ssl argument must be an SSLContext or None')
223
224 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200225 if sock is not None:
226 raise ValueError(
227 'path and sock can not be specified at the same time')
228
Yury Selivanovb057c522014-02-18 12:15:06 -0500229 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
230
231 try:
232 sock.bind(path)
233 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100234 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500235 if exc.errno == errno.EADDRINUSE:
236 # Let's improve the error message by adding
237 # with what exact address it occurs.
238 msg = 'Address {!r} is already in use'.format(path)
239 raise OSError(errno.EADDRINUSE, msg) from None
240 else:
241 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200242 except:
243 sock.close()
244 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500245 else:
246 if sock is None:
247 raise ValueError(
248 'path was not specified, and no sock specified')
249
250 if sock.family != socket.AF_UNIX:
251 raise ValueError(
252 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
253
254 server = base_events.Server(self, [sock])
255 sock.listen(backlog)
256 sock.setblocking(False)
257 self._start_serving(protocol_factory, sock, ssl, server)
258 return server
259
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200261if hasattr(os, 'set_blocking'):
262 def _set_nonblocking(fd):
263 os.set_blocking(fd, False)
264else:
265 def _set_nonblocking(fd):
266 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
267 flags = flags | os.O_NONBLOCK
268 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700269
270
271class _UnixReadPipeTransport(transports.ReadTransport):
272
Yury Selivanovdec1a452014-02-18 22:27:48 -0500273 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274
275 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
276 super().__init__(extra)
277 self._extra['pipe'] = pipe
278 self._loop = loop
279 self._pipe = pipe
280 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700281 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800282 if not (stat.S_ISFIFO(mode) or
283 stat.S_ISSOCK(mode) or
284 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700285 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286 _set_nonblocking(self._fileno)
287 self._protocol = protocol
288 self._closing = False
289 self._loop.add_reader(self._fileno, self._read_ready)
290 self._loop.call_soon(self._protocol.connection_made, self)
291 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200292 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200293 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294
Victor Stinnere912e652014-07-12 03:11:53 +0200295 def __repr__(self):
296 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
297 if self._pipe is not None:
298 polling = selector_events._test_selector_event(
299 self._loop._selector,
300 self._fileno, selectors.EVENT_READ)
301 if polling:
302 info.append('polling')
303 else:
304 info.append('idle')
305 else:
306 info.append('closed')
307 return '<%s>' % ' '.join(info)
308
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 def _read_ready(self):
310 try:
311 data = os.read(self._fileno, self.max_size)
312 except (BlockingIOError, InterruptedError):
313 pass
314 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100315 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 else:
317 if data:
318 self._protocol.data_received(data)
319 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200320 if self._loop.get_debug():
321 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 self._closing = True
323 self._loop.remove_reader(self._fileno)
324 self._loop.call_soon(self._protocol.eof_received)
325 self._loop.call_soon(self._call_connection_lost, None)
326
Guido van Rossum57497ad2013-10-18 07:58:20 -0700327 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328 self._loop.remove_reader(self._fileno)
329
Guido van Rossum57497ad2013-10-18 07:58:20 -0700330 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 self._loop.add_reader(self._fileno, self._read_ready)
332
333 def close(self):
334 if not self._closing:
335 self._close(None)
336
Victor Stinner0ee29c22014-02-19 01:40:41 +0100337 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200339 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
340 if self._loop.get_debug():
341 logger.debug("%r: %s", self, message, exc_info=True)
342 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500343 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100344 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500345 'exception': exc,
346 'transport': self,
347 'protocol': self._protocol,
348 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349 self._close(exc)
350
351 def _close(self, exc):
352 self._closing = True
353 self._loop.remove_reader(self._fileno)
354 self._loop.call_soon(self._call_connection_lost, exc)
355
356 def _call_connection_lost(self, exc):
357 try:
358 self._protocol.connection_lost(exc)
359 finally:
360 self._pipe.close()
361 self._pipe = None
362 self._protocol = None
363 self._loop = None
364
365
Yury Selivanov3cb99142014-02-18 18:41:13 -0500366class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800367 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368
369 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
370 super().__init__(extra)
371 self._extra['pipe'] = pipe
372 self._loop = loop
373 self._pipe = pipe
374 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700375 mode = os.fstat(self._fileno).st_mode
376 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100377 if not (is_socket or
378 stat.S_ISFIFO(mode) or
379 stat.S_ISCHR(mode)):
380 raise ValueError("Pipe transport is only for "
381 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700382 _set_nonblocking(self._fileno)
383 self._protocol = protocol
384 self._buffer = []
385 self._conn_lost = 0
386 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700387
388 # On AIX, the reader trick only works for sockets.
389 # On other platforms it works for pipes and sockets.
390 # (Exception: OS X 10.4? Issue #19294.)
391 if is_socket or not sys.platform.startswith("aix"):
392 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700393
394 self._loop.call_soon(self._protocol.connection_made, self)
395 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200396 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200397 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398
Victor Stinnere912e652014-07-12 03:11:53 +0200399 def __repr__(self):
400 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
401 if self._pipe is not None:
402 polling = selector_events._test_selector_event(
403 self._loop._selector,
404 self._fileno, selectors.EVENT_WRITE)
405 if polling:
406 info.append('polling')
407 else:
408 info.append('idle')
409
410 bufsize = self.get_write_buffer_size()
411 info.append('bufsize=%s' % bufsize)
412 else:
413 info.append('closed')
414 return '<%s>' % ' '.join(info)
415
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800416 def get_write_buffer_size(self):
417 return sum(len(data) for data in self._buffer)
418
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700420 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200421 if self._loop.get_debug():
422 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100423 if self._buffer:
424 self._close(BrokenPipeError())
425 else:
426 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700427
428 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800429 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
430 if isinstance(data, bytearray):
431 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432 if not data:
433 return
434
435 if self._conn_lost or self._closing:
436 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700437 logger.warning('pipe closed by peer or '
438 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439 self._conn_lost += 1
440 return
441
442 if not self._buffer:
443 # Attempt to send it right away first.
444 try:
445 n = os.write(self._fileno, data)
446 except (BlockingIOError, InterruptedError):
447 n = 0
448 except Exception as exc:
449 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100450 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451 return
452 if n == len(data):
453 return
454 elif n > 0:
455 data = data[n:]
456 self._loop.add_writer(self._fileno, self._write_ready)
457
458 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800459 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460
461 def _write_ready(self):
462 data = b''.join(self._buffer)
463 assert data, 'Data should not be empty'
464
465 self._buffer.clear()
466 try:
467 n = os.write(self._fileno, data)
468 except (BlockingIOError, InterruptedError):
469 self._buffer.append(data)
470 except Exception as exc:
471 self._conn_lost += 1
472 # Remove writer here, _fatal_error() doesn't it
473 # because _buffer is empty.
474 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100475 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700476 else:
477 if n == len(data):
478 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800479 self._maybe_resume_protocol() # May append to buffer.
480 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700481 self._loop.remove_reader(self._fileno)
482 self._call_connection_lost(None)
483 return
484 elif n > 0:
485 data = data[n:]
486
487 self._buffer.append(data) # Try again later.
488
489 def can_write_eof(self):
490 return True
491
492 # TODO: Make the relationships between write_eof(), close(),
493 # abort(), _fatal_error() and _close() more straightforward.
494
495 def write_eof(self):
496 if self._closing:
497 return
498 assert self._pipe
499 self._closing = True
500 if not self._buffer:
501 self._loop.remove_reader(self._fileno)
502 self._loop.call_soon(self._call_connection_lost, None)
503
504 def close(self):
505 if not self._closing:
506 # write_eof is all what we needed to close the write pipe
507 self.write_eof()
508
509 def abort(self):
510 self._close(None)
511
Victor Stinner0ee29c22014-02-19 01:40:41 +0100512 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700513 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200514 if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
515 if self._loop.get_debug():
516 logger.debug("%r: %s", self, message, exc_info=True)
517 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500518 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100519 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500520 'exception': exc,
521 'transport': self,
522 'protocol': self._protocol,
523 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700524 self._close(exc)
525
526 def _close(self, exc=None):
527 self._closing = True
528 if self._buffer:
529 self._loop.remove_writer(self._fileno)
530 self._buffer.clear()
531 self._loop.remove_reader(self._fileno)
532 self._loop.call_soon(self._call_connection_lost, exc)
533
534 def _call_connection_lost(self, exc):
535 try:
536 self._protocol.connection_lost(exc)
537 finally:
538 self._pipe.close()
539 self._pipe = None
540 self._protocol = None
541 self._loop = None
542
543
Guido van Rossum59691282013-10-30 14:52:03 -0700544class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700545
Guido van Rossum59691282013-10-30 14:52:03 -0700546 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700547 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700548 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700549 # Use a socket pair for stdin, since not all platforms
550 # support selecting read events on the write end of a
551 # socket (which we use in order to detect closing of the
552 # other end). Notably this is needed on AIX, and works
553 # just fine on other platforms.
554 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700555 self._proc = subprocess.Popen(
556 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
557 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700558 if stdin_w is not None:
559 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200560 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800561
562
563class AbstractChildWatcher:
564 """Abstract base class for monitoring child processes.
565
566 Objects derived from this class monitor a collection of subprocesses and
567 report their termination or interruption by a signal.
568
569 New callbacks are registered with .add_child_handler(). Starting a new
570 process must be done within a 'with' block to allow the watcher to suspend
571 its activity until the new process if fully registered (this is needed to
572 prevent a race condition in some implementations).
573
574 Example:
575 with watcher:
576 proc = subprocess.Popen("sleep 1")
577 watcher.add_child_handler(proc.pid, callback)
578
579 Notes:
580 Implementations of this class must be thread-safe.
581
582 Since child watcher objects may catch the SIGCHLD signal and call
583 waitpid(-1), there should be only one active object per process.
584 """
585
586 def add_child_handler(self, pid, callback, *args):
587 """Register a new child handler.
588
589 Arrange for callback(pid, returncode, *args) to be called when
590 process 'pid' terminates. Specifying another callback for the same
591 process replaces the previous handler.
592
Victor Stinneracdb7822014-07-14 18:33:40 +0200593 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800594 """
595 raise NotImplementedError()
596
597 def remove_child_handler(self, pid):
598 """Removes the handler for process 'pid'.
599
600 The function returns True if the handler was successfully removed,
601 False if there was nothing to remove."""
602
603 raise NotImplementedError()
604
Guido van Rossum2bcae702013-11-13 15:50:08 -0800605 def attach_loop(self, loop):
606 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800607
Guido van Rossum2bcae702013-11-13 15:50:08 -0800608 If the watcher was previously attached to an event loop, then it is
609 first detached before attaching to the new loop.
610
611 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800612 """
613 raise NotImplementedError()
614
615 def close(self):
616 """Close the watcher.
617
618 This must be called to make sure that any underlying resource is freed.
619 """
620 raise NotImplementedError()
621
622 def __enter__(self):
623 """Enter the watcher's context and allow starting new processes
624
625 This function must return self"""
626 raise NotImplementedError()
627
628 def __exit__(self, a, b, c):
629 """Exit the watcher's context"""
630 raise NotImplementedError()
631
632
633class BaseChildWatcher(AbstractChildWatcher):
634
Guido van Rossum2bcae702013-11-13 15:50:08 -0800635 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800636 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800637
638 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800639 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800640
641 def _do_waitpid(self, expected_pid):
642 raise NotImplementedError()
643
644 def _do_waitpid_all(self):
645 raise NotImplementedError()
646
Guido van Rossum2bcae702013-11-13 15:50:08 -0800647 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800648 assert loop is None or isinstance(loop, events.AbstractEventLoop)
649
650 if self._loop is not None:
651 self._loop.remove_signal_handler(signal.SIGCHLD)
652
653 self._loop = loop
654 if loop is not None:
655 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
656
657 # Prevent a race condition in case a child terminated
658 # during the switch.
659 self._do_waitpid_all()
660
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800661 def _sig_chld(self):
662 try:
663 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500664 except Exception as exc:
665 # self._loop should always be available here
666 # as '_sig_chld' is added as a signal handler
667 # in 'attach_loop'
668 self._loop.call_exception_handler({
669 'message': 'Unknown exception in SIGCHLD handler',
670 'exception': exc,
671 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800672
673 def _compute_returncode(self, status):
674 if os.WIFSIGNALED(status):
675 # The child process died because of a signal.
676 return -os.WTERMSIG(status)
677 elif os.WIFEXITED(status):
678 # The child process exited (e.g sys.exit()).
679 return os.WEXITSTATUS(status)
680 else:
681 # The child exited, but we don't understand its status.
682 # This shouldn't happen, but if it does, let's just
683 # return that status; perhaps that helps debug it.
684 return status
685
686
687class SafeChildWatcher(BaseChildWatcher):
688 """'Safe' child watcher implementation.
689
690 This implementation avoids disrupting other code spawning processes by
691 polling explicitly each process in the SIGCHLD handler instead of calling
692 os.waitpid(-1).
693
694 This is a safe solution but it has a significant overhead when handling a
695 big number of children (O(n) each time SIGCHLD is raised)
696 """
697
Guido van Rossum2bcae702013-11-13 15:50:08 -0800698 def __init__(self):
699 super().__init__()
700 self._callbacks = {}
701
702 def close(self):
703 self._callbacks.clear()
704 super().close()
705
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800706 def __enter__(self):
707 return self
708
709 def __exit__(self, a, b, c):
710 pass
711
712 def add_child_handler(self, pid, callback, *args):
713 self._callbacks[pid] = callback, args
714
715 # Prevent a race condition in case the child is already terminated.
716 self._do_waitpid(pid)
717
Guido van Rossum2bcae702013-11-13 15:50:08 -0800718 def remove_child_handler(self, pid):
719 try:
720 del self._callbacks[pid]
721 return True
722 except KeyError:
723 return False
724
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800725 def _do_waitpid_all(self):
726
727 for pid in list(self._callbacks):
728 self._do_waitpid(pid)
729
730 def _do_waitpid(self, expected_pid):
731 assert expected_pid > 0
732
733 try:
734 pid, status = os.waitpid(expected_pid, os.WNOHANG)
735 except ChildProcessError:
736 # The child process is already reaped
737 # (may happen if waitpid() is called elsewhere).
738 pid = expected_pid
739 returncode = 255
740 logger.warning(
741 "Unknown child process pid %d, will report returncode 255",
742 pid)
743 else:
744 if pid == 0:
745 # The child process is still alive.
746 return
747
748 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200749 if self._loop.get_debug():
750 logger.debug('process %s exited with returncode %s',
751 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800752
753 try:
754 callback, args = self._callbacks.pop(pid)
755 except KeyError: # pragma: no cover
756 # May happen if .remove_child_handler() is called
757 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200758 if self._loop.get_debug():
759 logger.warning("Child watcher got an unexpected pid: %r",
760 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800761 else:
762 callback(pid, returncode, *args)
763
764
765class FastChildWatcher(BaseChildWatcher):
766 """'Fast' child watcher implementation.
767
768 This implementation reaps every terminated processes by calling
769 os.waitpid(-1) directly, possibly breaking other code spawning processes
770 and waiting for their termination.
771
772 There is no noticeable overhead when handling a big number of children
773 (O(1) each time a child terminates).
774 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800775 def __init__(self):
776 super().__init__()
777 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800778 self._lock = threading.Lock()
779 self._zombies = {}
780 self._forks = 0
781
782 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800783 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800784 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800785 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800786
787 def __enter__(self):
788 with self._lock:
789 self._forks += 1
790
791 return self
792
793 def __exit__(self, a, b, c):
794 with self._lock:
795 self._forks -= 1
796
797 if self._forks or not self._zombies:
798 return
799
800 collateral_victims = str(self._zombies)
801 self._zombies.clear()
802
803 logger.warning(
804 "Caught subprocesses termination from unknown pids: %s",
805 collateral_victims)
806
807 def add_child_handler(self, pid, callback, *args):
808 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800809 with self._lock:
810 try:
811 returncode = self._zombies.pop(pid)
812 except KeyError:
813 # The child is running.
814 self._callbacks[pid] = callback, args
815 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800816
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800817 # The child is dead already. We can fire the callback.
818 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800819
Guido van Rossum2bcae702013-11-13 15:50:08 -0800820 def remove_child_handler(self, pid):
821 try:
822 del self._callbacks[pid]
823 return True
824 except KeyError:
825 return False
826
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800827 def _do_waitpid_all(self):
828 # Because of signal coalescing, we must keep calling waitpid() as
829 # long as we're able to reap a child.
830 while True:
831 try:
832 pid, status = os.waitpid(-1, os.WNOHANG)
833 except ChildProcessError:
834 # No more child processes exist.
835 return
836 else:
837 if pid == 0:
838 # A child process is still alive.
839 return
840
841 returncode = self._compute_returncode(status)
842
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800843 with self._lock:
844 try:
845 callback, args = self._callbacks.pop(pid)
846 except KeyError:
847 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800848 if self._forks:
849 # It may not be registered yet.
850 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200851 if self._loop.get_debug():
852 logger.debug('unknown process %s exited '
853 'with returncode %s',
854 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800855 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800856 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200857 else:
858 if self._loop.get_debug():
859 logger.debug('process %s exited with returncode %s',
860 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800861
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800862 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800863 logger.warning(
864 "Caught subprocess termination from unknown pid: "
865 "%d -> %d", pid, returncode)
866 else:
867 callback(pid, returncode, *args)
868
869
870class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
871 """XXX"""
872 _loop_factory = _UnixSelectorEventLoop
873
874 def __init__(self):
875 super().__init__()
876 self._watcher = None
877
878 def _init_watcher(self):
879 with events._lock:
880 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800881 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800882 if isinstance(threading.current_thread(),
883 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800884 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800885
886 def set_event_loop(self, loop):
887 """Set the event loop.
888
889 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800890 .set_event_loop() from the main thread will call .attach_loop(loop) on
891 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800892 """
893
894 super().set_event_loop(loop)
895
896 if self._watcher is not None and \
897 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800898 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800899
900 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200901 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800902
903 If not yet set, a SafeChildWatcher object is automatically created.
904 """
905 if self._watcher is None:
906 self._init_watcher()
907
908 return self._watcher
909
910 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200911 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800912
913 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
914
915 if self._watcher is not None:
916 self._watcher.close()
917
918 self._watcher = watcher
919
920SelectorEventLoop = _UnixSelectorEventLoop
921DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy