blob: b16f946ae4bc84b4b1afcebe8399cf4539fd2850 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Yury Selivanovb057c522014-02-18 12:15:06 -050013from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
16from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020018from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
Victor Stinner915bcb02014-02-01 22:49:59 +010024__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080025 'AbstractChildWatcher', 'SafeChildWatcher',
26 'FastChildWatcher', 'DefaultEventLoopPolicy',
27 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029if sys.platform == 'win32': # pragma: no cover
30 raise ImportError('Signals are not really supported on Windows')
31
32
Victor Stinnerfe5649c2014-07-17 22:43:40 +020033def _sighandler_noop(signum, frame):
34 """Dummy signal handler."""
35 pass
36
37
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080038class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050039 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040
Yury Selivanovb057c522014-02-18 12:15:06 -050041 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042 """
43
44 def __init__(self, selector=None):
45 super().__init__(selector)
46 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48 def _socketpair(self):
49 return socket.socketpair()
50
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080051 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020052 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080053 for sig in list(self._signal_handlers):
54 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080055
Victor Stinnerfe5649c2014-07-17 22:43:40 +020056 def _process_self_data(self, data):
57 for signum in data:
58 if not signum:
59 # ignore null bytes written by _write_to_self()
60 continue
61 self._handle_signal(signum)
62
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070063 def add_signal_handler(self, sig, callback, *args):
64 """Add a handler for a signal. UNIX only.
65
66 Raise ValueError if the signal number is invalid or uncatchable.
67 Raise RuntimeError if there is a problem setting up the handler.
68 """
69 self._check_signal(sig)
70 try:
71 # set_wakeup_fd() raises ValueError if this is not the
72 # main thread. By calling it early we ensure that an
73 # event loop running in another thread cannot add a signal
74 # handler.
75 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020076 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 raise RuntimeError(str(exc))
78
Yury Selivanov569efa22014-02-18 18:02:19 -050079 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070080 self._signal_handlers[sig] = handle
81
82 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020083 # Register a dummy signal handler to ask Python to write the signal
84 # number in the wakup file descriptor. _process_self_data() will
85 # read signal numbers from this file descriptor to handle signals.
86 signal.signal(sig, _sighandler_noop)
87
Charles-François Natali74e7cf32013-12-05 22:47:19 +010088 # Set SA_RESTART to limit EINTR occurrences.
89 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070090 except OSError as exc:
91 del self._signal_handlers[sig]
92 if not self._signal_handlers:
93 try:
94 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +020095 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070096 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070097
98 if exc.errno == errno.EINVAL:
99 raise RuntimeError('sig {} cannot be caught'.format(sig))
100 else:
101 raise
102
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200103 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700104 """Internal helper that is the actual signal handler."""
105 handle = self._signal_handlers.get(sig)
106 if handle is None:
107 return # Assume it's some race condition.
108 if handle._cancelled:
109 self.remove_signal_handler(sig) # Remove it properly.
110 else:
111 self._add_callback_signalsafe(handle)
112
113 def remove_signal_handler(self, sig):
114 """Remove a handler for a signal. UNIX only.
115
116 Return True if a signal handler was removed, False if not.
117 """
118 self._check_signal(sig)
119 try:
120 del self._signal_handlers[sig]
121 except KeyError:
122 return False
123
124 if sig == signal.SIGINT:
125 handler = signal.default_int_handler
126 else:
127 handler = signal.SIG_DFL
128
129 try:
130 signal.signal(sig, handler)
131 except OSError as exc:
132 if exc.errno == errno.EINVAL:
133 raise RuntimeError('sig {} cannot be caught'.format(sig))
134 else:
135 raise
136
137 if not self._signal_handlers:
138 try:
139 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200140 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700141 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142
143 return True
144
145 def _check_signal(self, sig):
146 """Internal helper to validate a signal.
147
148 Raise ValueError if the signal number is invalid or uncatchable.
149 Raise RuntimeError if there is a problem setting up the handler.
150 """
151 if not isinstance(sig, int):
152 raise TypeError('sig must be an int, not {!r}'.format(sig))
153
154 if not (1 <= sig < signal.NSIG):
155 raise ValueError(
156 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
157
158 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
159 extra=None):
160 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
161
162 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
163 extra=None):
164 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
165
Victor Stinnerf951d282014-06-29 00:46:45 +0200166 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 def _make_subprocess_transport(self, protocol, args, shell,
168 stdin, stdout, stderr, bufsize,
169 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800170 with events.get_child_watcher() as watcher:
171 transp = _UnixSubprocessTransport(self, protocol, args, shell,
172 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800173 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800174 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800175 watcher.add_child_handler(transp.get_pid(),
176 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800177
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700178 return transp
179
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800180 def _child_watcher_callback(self, pid, returncode, transp):
181 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182
Victor Stinnerf951d282014-06-29 00:46:45 +0200183 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500184 def create_unix_connection(self, protocol_factory, path, *,
185 ssl=None, sock=None,
186 server_hostname=None):
187 assert server_hostname is None or isinstance(server_hostname, str)
188 if ssl:
189 if server_hostname is None:
190 raise ValueError(
191 'you have to pass server_hostname when using ssl')
192 else:
193 if server_hostname is not None:
194 raise ValueError('server_hostname is only meaningful with ssl')
195
196 if path is not None:
197 if sock is not None:
198 raise ValueError(
199 'path and sock can not be specified at the same time')
200
Victor Stinner79a29522014-02-19 01:45:59 +0100201 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500202 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500203 sock.setblocking(False)
204 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100205 except:
206 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500207 raise
208
209 else:
210 if sock is None:
211 raise ValueError('no path and sock were specified')
212 sock.setblocking(False)
213
214 transport, protocol = yield from self._create_connection_transport(
215 sock, protocol_factory, ssl, server_hostname)
216 return transport, protocol
217
Victor Stinnerf951d282014-06-29 00:46:45 +0200218 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500219 def create_unix_server(self, protocol_factory, path=None, *,
220 sock=None, backlog=100, ssl=None):
221 if isinstance(ssl, bool):
222 raise TypeError('ssl argument must be an SSLContext or None')
223
224 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200225 if sock is not None:
226 raise ValueError(
227 'path and sock can not be specified at the same time')
228
Yury Selivanovb057c522014-02-18 12:15:06 -0500229 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
230
231 try:
232 sock.bind(path)
233 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100234 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500235 if exc.errno == errno.EADDRINUSE:
236 # Let's improve the error message by adding
237 # with what exact address it occurs.
238 msg = 'Address {!r} is already in use'.format(path)
239 raise OSError(errno.EADDRINUSE, msg) from None
240 else:
241 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200242 except:
243 sock.close()
244 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500245 else:
246 if sock is None:
247 raise ValueError(
248 'path was not specified, and no sock specified')
249
250 if sock.family != socket.AF_UNIX:
251 raise ValueError(
252 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
253
254 server = base_events.Server(self, [sock])
255 sock.listen(backlog)
256 sock.setblocking(False)
257 self._start_serving(protocol_factory, sock, ssl, server)
258 return server
259
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200261if hasattr(os, 'set_blocking'):
262 def _set_nonblocking(fd):
263 os.set_blocking(fd, False)
264else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400265 import fcntl
266
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200267 def _set_nonblocking(fd):
268 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
269 flags = flags | os.O_NONBLOCK
270 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271
272
273class _UnixReadPipeTransport(transports.ReadTransport):
274
Yury Selivanovdec1a452014-02-18 22:27:48 -0500275 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276
277 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
278 super().__init__(extra)
279 self._extra['pipe'] = pipe
280 self._loop = loop
281 self._pipe = pipe
282 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700283 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800284 if not (stat.S_ISFIFO(mode) or
285 stat.S_ISSOCK(mode) or
286 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700287 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288 _set_nonblocking(self._fileno)
289 self._protocol = protocol
290 self._closing = False
291 self._loop.add_reader(self._fileno, self._read_ready)
292 self._loop.call_soon(self._protocol.connection_made, self)
293 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200294 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200295 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296
Victor Stinnere912e652014-07-12 03:11:53 +0200297 def __repr__(self):
298 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
299 if self._pipe is not None:
300 polling = selector_events._test_selector_event(
301 self._loop._selector,
302 self._fileno, selectors.EVENT_READ)
303 if polling:
304 info.append('polling')
305 else:
306 info.append('idle')
307 else:
308 info.append('closed')
309 return '<%s>' % ' '.join(info)
310
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311 def _read_ready(self):
312 try:
313 data = os.read(self._fileno, self.max_size)
314 except (BlockingIOError, InterruptedError):
315 pass
316 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100317 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700318 else:
319 if data:
320 self._protocol.data_received(data)
321 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200322 if self._loop.get_debug():
323 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324 self._closing = True
325 self._loop.remove_reader(self._fileno)
326 self._loop.call_soon(self._protocol.eof_received)
327 self._loop.call_soon(self._call_connection_lost, None)
328
Guido van Rossum57497ad2013-10-18 07:58:20 -0700329 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330 self._loop.remove_reader(self._fileno)
331
Guido van Rossum57497ad2013-10-18 07:58:20 -0700332 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333 self._loop.add_reader(self._fileno, self._read_ready)
334
335 def close(self):
336 if not self._closing:
337 self._close(None)
338
Victor Stinner0ee29c22014-02-19 01:40:41 +0100339 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700340 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200341 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
342 if self._loop.get_debug():
343 logger.debug("%r: %s", self, message, exc_info=True)
344 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500345 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100346 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500347 'exception': exc,
348 'transport': self,
349 'protocol': self._protocol,
350 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 self._close(exc)
352
353 def _close(self, exc):
354 self._closing = True
355 self._loop.remove_reader(self._fileno)
356 self._loop.call_soon(self._call_connection_lost, exc)
357
358 def _call_connection_lost(self, exc):
359 try:
360 self._protocol.connection_lost(exc)
361 finally:
362 self._pipe.close()
363 self._pipe = None
364 self._protocol = None
365 self._loop = None
366
367
Yury Selivanov3cb99142014-02-18 18:41:13 -0500368class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800369 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370
371 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100372 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700373 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374 self._pipe = pipe
375 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700376 mode = os.fstat(self._fileno).st_mode
377 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100378 if not (is_socket or
379 stat.S_ISFIFO(mode) or
380 stat.S_ISCHR(mode)):
381 raise ValueError("Pipe transport is only for "
382 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383 _set_nonblocking(self._fileno)
384 self._protocol = protocol
385 self._buffer = []
386 self._conn_lost = 0
387 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700388
389 # On AIX, the reader trick only works for sockets.
390 # On other platforms it works for pipes and sockets.
391 # (Exception: OS X 10.4? Issue #19294.)
392 if is_socket or not sys.platform.startswith("aix"):
393 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394
395 self._loop.call_soon(self._protocol.connection_made, self)
396 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200397 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200398 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399
Victor Stinnere912e652014-07-12 03:11:53 +0200400 def __repr__(self):
401 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
402 if self._pipe is not None:
403 polling = selector_events._test_selector_event(
404 self._loop._selector,
405 self._fileno, selectors.EVENT_WRITE)
406 if polling:
407 info.append('polling')
408 else:
409 info.append('idle')
410
411 bufsize = self.get_write_buffer_size()
412 info.append('bufsize=%s' % bufsize)
413 else:
414 info.append('closed')
415 return '<%s>' % ' '.join(info)
416
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800417 def get_write_buffer_size(self):
418 return sum(len(data) for data in self._buffer)
419
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700421 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200422 if self._loop.get_debug():
423 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100424 if self._buffer:
425 self._close(BrokenPipeError())
426 else:
427 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428
429 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800430 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
431 if isinstance(data, bytearray):
432 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433 if not data:
434 return
435
436 if self._conn_lost or self._closing:
437 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700438 logger.warning('pipe closed by peer or '
439 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440 self._conn_lost += 1
441 return
442
443 if not self._buffer:
444 # Attempt to send it right away first.
445 try:
446 n = os.write(self._fileno, data)
447 except (BlockingIOError, InterruptedError):
448 n = 0
449 except Exception as exc:
450 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100451 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452 return
453 if n == len(data):
454 return
455 elif n > 0:
456 data = data[n:]
457 self._loop.add_writer(self._fileno, self._write_ready)
458
459 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800460 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461
462 def _write_ready(self):
463 data = b''.join(self._buffer)
464 assert data, 'Data should not be empty'
465
466 self._buffer.clear()
467 try:
468 n = os.write(self._fileno, data)
469 except (BlockingIOError, InterruptedError):
470 self._buffer.append(data)
471 except Exception as exc:
472 self._conn_lost += 1
473 # Remove writer here, _fatal_error() doesn't it
474 # because _buffer is empty.
475 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100476 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700477 else:
478 if n == len(data):
479 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800480 self._maybe_resume_protocol() # May append to buffer.
481 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482 self._loop.remove_reader(self._fileno)
483 self._call_connection_lost(None)
484 return
485 elif n > 0:
486 data = data[n:]
487
488 self._buffer.append(data) # Try again later.
489
490 def can_write_eof(self):
491 return True
492
493 # TODO: Make the relationships between write_eof(), close(),
494 # abort(), _fatal_error() and _close() more straightforward.
495
496 def write_eof(self):
497 if self._closing:
498 return
499 assert self._pipe
500 self._closing = True
501 if not self._buffer:
502 self._loop.remove_reader(self._fileno)
503 self._loop.call_soon(self._call_connection_lost, None)
504
505 def close(self):
506 if not self._closing:
507 # write_eof is all what we needed to close the write pipe
508 self.write_eof()
509
510 def abort(self):
511 self._close(None)
512
Victor Stinner0ee29c22014-02-19 01:40:41 +0100513 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700514 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200515 if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
516 if self._loop.get_debug():
517 logger.debug("%r: %s", self, message, exc_info=True)
518 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500519 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100520 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500521 'exception': exc,
522 'transport': self,
523 'protocol': self._protocol,
524 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700525 self._close(exc)
526
527 def _close(self, exc=None):
528 self._closing = True
529 if self._buffer:
530 self._loop.remove_writer(self._fileno)
531 self._buffer.clear()
532 self._loop.remove_reader(self._fileno)
533 self._loop.call_soon(self._call_connection_lost, exc)
534
535 def _call_connection_lost(self, exc):
536 try:
537 self._protocol.connection_lost(exc)
538 finally:
539 self._pipe.close()
540 self._pipe = None
541 self._protocol = None
542 self._loop = None
543
544
Guido van Rossum59691282013-10-30 14:52:03 -0700545class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700546
Guido van Rossum59691282013-10-30 14:52:03 -0700547 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700548 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700549 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700550 # Use a socket pair for stdin, since not all platforms
551 # support selecting read events on the write end of a
552 # socket (which we use in order to detect closing of the
553 # other end). Notably this is needed on AIX, and works
554 # just fine on other platforms.
555 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700556 self._proc = subprocess.Popen(
557 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
558 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700559 if stdin_w is not None:
560 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200561 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800562
563
564class AbstractChildWatcher:
565 """Abstract base class for monitoring child processes.
566
567 Objects derived from this class monitor a collection of subprocesses and
568 report their termination or interruption by a signal.
569
570 New callbacks are registered with .add_child_handler(). Starting a new
571 process must be done within a 'with' block to allow the watcher to suspend
572 its activity until the new process if fully registered (this is needed to
573 prevent a race condition in some implementations).
574
575 Example:
576 with watcher:
577 proc = subprocess.Popen("sleep 1")
578 watcher.add_child_handler(proc.pid, callback)
579
580 Notes:
581 Implementations of this class must be thread-safe.
582
583 Since child watcher objects may catch the SIGCHLD signal and call
584 waitpid(-1), there should be only one active object per process.
585 """
586
587 def add_child_handler(self, pid, callback, *args):
588 """Register a new child handler.
589
590 Arrange for callback(pid, returncode, *args) to be called when
591 process 'pid' terminates. Specifying another callback for the same
592 process replaces the previous handler.
593
Victor Stinneracdb7822014-07-14 18:33:40 +0200594 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800595 """
596 raise NotImplementedError()
597
598 def remove_child_handler(self, pid):
599 """Removes the handler for process 'pid'.
600
601 The function returns True if the handler was successfully removed,
602 False if there was nothing to remove."""
603
604 raise NotImplementedError()
605
Guido van Rossum2bcae702013-11-13 15:50:08 -0800606 def attach_loop(self, loop):
607 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800608
Guido van Rossum2bcae702013-11-13 15:50:08 -0800609 If the watcher was previously attached to an event loop, then it is
610 first detached before attaching to the new loop.
611
612 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800613 """
614 raise NotImplementedError()
615
616 def close(self):
617 """Close the watcher.
618
619 This must be called to make sure that any underlying resource is freed.
620 """
621 raise NotImplementedError()
622
623 def __enter__(self):
624 """Enter the watcher's context and allow starting new processes
625
626 This function must return self"""
627 raise NotImplementedError()
628
629 def __exit__(self, a, b, c):
630 """Exit the watcher's context"""
631 raise NotImplementedError()
632
633
634class BaseChildWatcher(AbstractChildWatcher):
635
Guido van Rossum2bcae702013-11-13 15:50:08 -0800636 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800637 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800638
639 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800640 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800641
642 def _do_waitpid(self, expected_pid):
643 raise NotImplementedError()
644
645 def _do_waitpid_all(self):
646 raise NotImplementedError()
647
Guido van Rossum2bcae702013-11-13 15:50:08 -0800648 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800649 assert loop is None or isinstance(loop, events.AbstractEventLoop)
650
651 if self._loop is not None:
652 self._loop.remove_signal_handler(signal.SIGCHLD)
653
654 self._loop = loop
655 if loop is not None:
656 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
657
658 # Prevent a race condition in case a child terminated
659 # during the switch.
660 self._do_waitpid_all()
661
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800662 def _sig_chld(self):
663 try:
664 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500665 except Exception as exc:
666 # self._loop should always be available here
667 # as '_sig_chld' is added as a signal handler
668 # in 'attach_loop'
669 self._loop.call_exception_handler({
670 'message': 'Unknown exception in SIGCHLD handler',
671 'exception': exc,
672 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800673
674 def _compute_returncode(self, status):
675 if os.WIFSIGNALED(status):
676 # The child process died because of a signal.
677 return -os.WTERMSIG(status)
678 elif os.WIFEXITED(status):
679 # The child process exited (e.g sys.exit()).
680 return os.WEXITSTATUS(status)
681 else:
682 # The child exited, but we don't understand its status.
683 # This shouldn't happen, but if it does, let's just
684 # return that status; perhaps that helps debug it.
685 return status
686
687
688class SafeChildWatcher(BaseChildWatcher):
689 """'Safe' child watcher implementation.
690
691 This implementation avoids disrupting other code spawning processes by
692 polling explicitly each process in the SIGCHLD handler instead of calling
693 os.waitpid(-1).
694
695 This is a safe solution but it has a significant overhead when handling a
696 big number of children (O(n) each time SIGCHLD is raised)
697 """
698
Guido van Rossum2bcae702013-11-13 15:50:08 -0800699 def __init__(self):
700 super().__init__()
701 self._callbacks = {}
702
703 def close(self):
704 self._callbacks.clear()
705 super().close()
706
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800707 def __enter__(self):
708 return self
709
710 def __exit__(self, a, b, c):
711 pass
712
713 def add_child_handler(self, pid, callback, *args):
714 self._callbacks[pid] = callback, args
715
716 # Prevent a race condition in case the child is already terminated.
717 self._do_waitpid(pid)
718
Guido van Rossum2bcae702013-11-13 15:50:08 -0800719 def remove_child_handler(self, pid):
720 try:
721 del self._callbacks[pid]
722 return True
723 except KeyError:
724 return False
725
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800726 def _do_waitpid_all(self):
727
728 for pid in list(self._callbacks):
729 self._do_waitpid(pid)
730
731 def _do_waitpid(self, expected_pid):
732 assert expected_pid > 0
733
734 try:
735 pid, status = os.waitpid(expected_pid, os.WNOHANG)
736 except ChildProcessError:
737 # The child process is already reaped
738 # (may happen if waitpid() is called elsewhere).
739 pid = expected_pid
740 returncode = 255
741 logger.warning(
742 "Unknown child process pid %d, will report returncode 255",
743 pid)
744 else:
745 if pid == 0:
746 # The child process is still alive.
747 return
748
749 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200750 if self._loop.get_debug():
751 logger.debug('process %s exited with returncode %s',
752 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800753
754 try:
755 callback, args = self._callbacks.pop(pid)
756 except KeyError: # pragma: no cover
757 # May happen if .remove_child_handler() is called
758 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200759 if self._loop.get_debug():
760 logger.warning("Child watcher got an unexpected pid: %r",
761 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800762 else:
763 callback(pid, returncode, *args)
764
765
766class FastChildWatcher(BaseChildWatcher):
767 """'Fast' child watcher implementation.
768
769 This implementation reaps every terminated processes by calling
770 os.waitpid(-1) directly, possibly breaking other code spawning processes
771 and waiting for their termination.
772
773 There is no noticeable overhead when handling a big number of children
774 (O(1) each time a child terminates).
775 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800776 def __init__(self):
777 super().__init__()
778 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800779 self._lock = threading.Lock()
780 self._zombies = {}
781 self._forks = 0
782
783 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800784 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800785 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800786 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800787
788 def __enter__(self):
789 with self._lock:
790 self._forks += 1
791
792 return self
793
794 def __exit__(self, a, b, c):
795 with self._lock:
796 self._forks -= 1
797
798 if self._forks or not self._zombies:
799 return
800
801 collateral_victims = str(self._zombies)
802 self._zombies.clear()
803
804 logger.warning(
805 "Caught subprocesses termination from unknown pids: %s",
806 collateral_victims)
807
808 def add_child_handler(self, pid, callback, *args):
809 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800810 with self._lock:
811 try:
812 returncode = self._zombies.pop(pid)
813 except KeyError:
814 # The child is running.
815 self._callbacks[pid] = callback, args
816 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800817
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800818 # The child is dead already. We can fire the callback.
819 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800820
Guido van Rossum2bcae702013-11-13 15:50:08 -0800821 def remove_child_handler(self, pid):
822 try:
823 del self._callbacks[pid]
824 return True
825 except KeyError:
826 return False
827
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800828 def _do_waitpid_all(self):
829 # Because of signal coalescing, we must keep calling waitpid() as
830 # long as we're able to reap a child.
831 while True:
832 try:
833 pid, status = os.waitpid(-1, os.WNOHANG)
834 except ChildProcessError:
835 # No more child processes exist.
836 return
837 else:
838 if pid == 0:
839 # A child process is still alive.
840 return
841
842 returncode = self._compute_returncode(status)
843
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800844 with self._lock:
845 try:
846 callback, args = self._callbacks.pop(pid)
847 except KeyError:
848 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800849 if self._forks:
850 # It may not be registered yet.
851 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200852 if self._loop.get_debug():
853 logger.debug('unknown process %s exited '
854 'with returncode %s',
855 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800856 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800857 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200858 else:
859 if self._loop.get_debug():
860 logger.debug('process %s exited with returncode %s',
861 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800862
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800863 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800864 logger.warning(
865 "Caught subprocess termination from unknown pid: "
866 "%d -> %d", pid, returncode)
867 else:
868 callback(pid, returncode, *args)
869
870
871class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
872 """XXX"""
873 _loop_factory = _UnixSelectorEventLoop
874
875 def __init__(self):
876 super().__init__()
877 self._watcher = None
878
879 def _init_watcher(self):
880 with events._lock:
881 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800882 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800883 if isinstance(threading.current_thread(),
884 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800885 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800886
887 def set_event_loop(self, loop):
888 """Set the event loop.
889
890 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800891 .set_event_loop() from the main thread will call .attach_loop(loop) on
892 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800893 """
894
895 super().set_event_loop(loop)
896
897 if self._watcher is not None and \
898 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800899 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800900
901 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200902 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800903
904 If not yet set, a SafeChildWatcher object is automatically created.
905 """
906 if self._watcher is None:
907 self._init_watcher()
908
909 return self._watcher
910
911 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200912 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800913
914 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
915
916 if self._watcher is not None:
917 self._watcher.close()
918
919 self._watcher = watcher
920
921SelectorEventLoop = _UnixSelectorEventLoop
922DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy