blob: 9f4005cb13a690c77be775669f41af4f2c0b65d9 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Yury Selivanovb057c522014-02-18 12:15:06 -050013from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080016from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020019from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020021from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070022from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
24
Victor Stinner915bcb02014-02-01 22:49:59 +010025__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080026 'AbstractChildWatcher', 'SafeChildWatcher',
27 'FastChildWatcher', 'DefaultEventLoopPolicy',
28 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030if sys.platform == 'win32': # pragma: no cover
31 raise ImportError('Signals are not really supported on Windows')
32
33
Victor Stinnerfe5649c2014-07-17 22:43:40 +020034def _sighandler_noop(signum, frame):
35 """Dummy signal handler."""
36 pass
37
38
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080039class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050040 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
Yury Selivanovb057c522014-02-18 12:15:06 -050042 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043 """
44
45 def __init__(self, selector=None):
46 super().__init__(selector)
47 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048
49 def _socketpair(self):
50 return socket.socketpair()
51
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080052 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020053 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080054 for sig in list(self._signal_handlers):
55 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080056
Victor Stinnerfe5649c2014-07-17 22:43:40 +020057 def _process_self_data(self, data):
58 for signum in data:
59 if not signum:
60 # ignore null bytes written by _write_to_self()
61 continue
62 self._handle_signal(signum)
63
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064 def add_signal_handler(self, sig, callback, *args):
65 """Add a handler for a signal. UNIX only.
66
67 Raise ValueError if the signal number is invalid or uncatchable.
68 Raise RuntimeError if there is a problem setting up the handler.
69 """
Victor Stinner2d99d932014-11-20 15:03:52 +010070 if (coroutines.iscoroutine(callback)
71 or coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010072 raise TypeError("coroutines cannot be used "
73 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010075 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 try:
77 # set_wakeup_fd() raises ValueError if this is not the
78 # main thread. By calling it early we ensure that an
79 # event loop running in another thread cannot add a signal
80 # handler.
81 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020082 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083 raise RuntimeError(str(exc))
84
Yury Selivanov569efa22014-02-18 18:02:19 -050085 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 self._signal_handlers[sig] = handle
87
88 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020089 # Register a dummy signal handler to ask Python to write the signal
90 # number in the wakup file descriptor. _process_self_data() will
91 # read signal numbers from this file descriptor to handle signals.
92 signal.signal(sig, _sighandler_noop)
93
Charles-François Natali74e7cf32013-12-05 22:47:19 +010094 # Set SA_RESTART to limit EINTR occurrences.
95 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096 except OSError as exc:
97 del self._signal_handlers[sig]
98 if not self._signal_handlers:
99 try:
100 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200101 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700102 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700103
104 if exc.errno == errno.EINVAL:
105 raise RuntimeError('sig {} cannot be caught'.format(sig))
106 else:
107 raise
108
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200109 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110 """Internal helper that is the actual signal handler."""
111 handle = self._signal_handlers.get(sig)
112 if handle is None:
113 return # Assume it's some race condition.
114 if handle._cancelled:
115 self.remove_signal_handler(sig) # Remove it properly.
116 else:
117 self._add_callback_signalsafe(handle)
118
119 def remove_signal_handler(self, sig):
120 """Remove a handler for a signal. UNIX only.
121
122 Return True if a signal handler was removed, False if not.
123 """
124 self._check_signal(sig)
125 try:
126 del self._signal_handlers[sig]
127 except KeyError:
128 return False
129
130 if sig == signal.SIGINT:
131 handler = signal.default_int_handler
132 else:
133 handler = signal.SIG_DFL
134
135 try:
136 signal.signal(sig, handler)
137 except OSError as exc:
138 if exc.errno == errno.EINVAL:
139 raise RuntimeError('sig {} cannot be caught'.format(sig))
140 else:
141 raise
142
143 if not self._signal_handlers:
144 try:
145 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200146 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700147 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700148
149 return True
150
151 def _check_signal(self, sig):
152 """Internal helper to validate a signal.
153
154 Raise ValueError if the signal number is invalid or uncatchable.
155 Raise RuntimeError if there is a problem setting up the handler.
156 """
157 if not isinstance(sig, int):
158 raise TypeError('sig must be an int, not {!r}'.format(sig))
159
160 if not (1 <= sig < signal.NSIG):
161 raise ValueError(
162 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
163
164 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
165 extra=None):
166 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
167
168 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
169 extra=None):
170 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
171
Victor Stinnerf951d282014-06-29 00:46:45 +0200172 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700173 def _make_subprocess_transport(self, protocol, args, shell,
174 stdin, stdout, stderr, bufsize,
175 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800176 with events.get_child_watcher() as watcher:
177 transp = _UnixSubprocessTransport(self, protocol, args, shell,
178 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800179 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800180 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800181 watcher.add_child_handler(transp.get_pid(),
182 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800183
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700184 return transp
185
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800186 def _child_watcher_callback(self, pid, returncode, transp):
187 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700188
Victor Stinnerf951d282014-06-29 00:46:45 +0200189 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500190 def create_unix_connection(self, protocol_factory, path, *,
191 ssl=None, sock=None,
192 server_hostname=None):
193 assert server_hostname is None or isinstance(server_hostname, str)
194 if ssl:
195 if server_hostname is None:
196 raise ValueError(
197 'you have to pass server_hostname when using ssl')
198 else:
199 if server_hostname is not None:
200 raise ValueError('server_hostname is only meaningful with ssl')
201
202 if path is not None:
203 if sock is not None:
204 raise ValueError(
205 'path and sock can not be specified at the same time')
206
Victor Stinner79a29522014-02-19 01:45:59 +0100207 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500208 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500209 sock.setblocking(False)
210 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100211 except:
212 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500213 raise
214
215 else:
216 if sock is None:
217 raise ValueError('no path and sock were specified')
218 sock.setblocking(False)
219
220 transport, protocol = yield from self._create_connection_transport(
221 sock, protocol_factory, ssl, server_hostname)
222 return transport, protocol
223
Victor Stinnerf951d282014-06-29 00:46:45 +0200224 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500225 def create_unix_server(self, protocol_factory, path=None, *,
226 sock=None, backlog=100, ssl=None):
227 if isinstance(ssl, bool):
228 raise TypeError('ssl argument must be an SSLContext or None')
229
230 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200231 if sock is not None:
232 raise ValueError(
233 'path and sock can not be specified at the same time')
234
Yury Selivanovb057c522014-02-18 12:15:06 -0500235 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
236
237 try:
238 sock.bind(path)
239 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100240 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500241 if exc.errno == errno.EADDRINUSE:
242 # Let's improve the error message by adding
243 # with what exact address it occurs.
244 msg = 'Address {!r} is already in use'.format(path)
245 raise OSError(errno.EADDRINUSE, msg) from None
246 else:
247 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200248 except:
249 sock.close()
250 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500251 else:
252 if sock is None:
253 raise ValueError(
254 'path was not specified, and no sock specified')
255
256 if sock.family != socket.AF_UNIX:
257 raise ValueError(
258 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
259
260 server = base_events.Server(self, [sock])
261 sock.listen(backlog)
262 sock.setblocking(False)
263 self._start_serving(protocol_factory, sock, ssl, server)
264 return server
265
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700266
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200267if hasattr(os, 'set_blocking'):
268 def _set_nonblocking(fd):
269 os.set_blocking(fd, False)
270else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400271 import fcntl
272
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200273 def _set_nonblocking(fd):
274 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
275 flags = flags | os.O_NONBLOCK
276 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700277
278
279class _UnixReadPipeTransport(transports.ReadTransport):
280
Yury Selivanovdec1a452014-02-18 22:27:48 -0500281 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700282
283 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
284 super().__init__(extra)
285 self._extra['pipe'] = pipe
286 self._loop = loop
287 self._pipe = pipe
288 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700289 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800290 if not (stat.S_ISFIFO(mode) or
291 stat.S_ISSOCK(mode) or
292 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700293 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 _set_nonblocking(self._fileno)
295 self._protocol = protocol
296 self._closing = False
297 self._loop.add_reader(self._fileno, self._read_ready)
298 self._loop.call_soon(self._protocol.connection_made, self)
299 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200300 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200301 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302
Victor Stinnere912e652014-07-12 03:11:53 +0200303 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100304 info = [self.__class__.__name__]
305 if self._pipe is None:
306 info.append('closed')
307 elif self._closing:
308 info.append('closing')
309 info.append('fd=%s' % self._fileno)
Victor Stinnere912e652014-07-12 03:11:53 +0200310 if self._pipe is not None:
311 polling = selector_events._test_selector_event(
312 self._loop._selector,
313 self._fileno, selectors.EVENT_READ)
314 if polling:
315 info.append('polling')
316 else:
317 info.append('idle')
318 else:
319 info.append('closed')
320 return '<%s>' % ' '.join(info)
321
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 def _read_ready(self):
323 try:
324 data = os.read(self._fileno, self.max_size)
325 except (BlockingIOError, InterruptedError):
326 pass
327 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100328 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329 else:
330 if data:
331 self._protocol.data_received(data)
332 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200333 if self._loop.get_debug():
334 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700335 self._closing = True
336 self._loop.remove_reader(self._fileno)
337 self._loop.call_soon(self._protocol.eof_received)
338 self._loop.call_soon(self._call_connection_lost, None)
339
Guido van Rossum57497ad2013-10-18 07:58:20 -0700340 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700341 self._loop.remove_reader(self._fileno)
342
Guido van Rossum57497ad2013-10-18 07:58:20 -0700343 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344 self._loop.add_reader(self._fileno, self._read_ready)
345
346 def close(self):
347 if not self._closing:
348 self._close(None)
349
Victor Stinner0ee29c22014-02-19 01:40:41 +0100350 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200352 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
353 if self._loop.get_debug():
354 logger.debug("%r: %s", self, message, exc_info=True)
355 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500356 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100357 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500358 'exception': exc,
359 'transport': self,
360 'protocol': self._protocol,
361 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 self._close(exc)
363
364 def _close(self, exc):
365 self._closing = True
366 self._loop.remove_reader(self._fileno)
367 self._loop.call_soon(self._call_connection_lost, exc)
368
369 def _call_connection_lost(self, exc):
370 try:
371 self._protocol.connection_lost(exc)
372 finally:
373 self._pipe.close()
374 self._pipe = None
375 self._protocol = None
376 self._loop = None
377
378
Yury Selivanov3cb99142014-02-18 18:41:13 -0500379class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800380 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381
382 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100383 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385 self._pipe = pipe
386 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700387 mode = os.fstat(self._fileno).st_mode
388 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100389 if not (is_socket or
390 stat.S_ISFIFO(mode) or
391 stat.S_ISCHR(mode)):
392 raise ValueError("Pipe transport is only for "
393 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394 _set_nonblocking(self._fileno)
395 self._protocol = protocol
396 self._buffer = []
397 self._conn_lost = 0
398 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700399
400 # On AIX, the reader trick only works for sockets.
401 # On other platforms it works for pipes and sockets.
402 # (Exception: OS X 10.4? Issue #19294.)
403 if is_socket or not sys.platform.startswith("aix"):
404 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405
406 self._loop.call_soon(self._protocol.connection_made, self)
407 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200408 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200409 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700410
Victor Stinnere912e652014-07-12 03:11:53 +0200411 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100412 info = [self.__class__.__name__]
413 if self._pipe is None:
414 info.append('closed')
415 elif self._closing:
416 info.append('closing')
417 info.append('fd=%s' % self._fileno)
Victor Stinnere912e652014-07-12 03:11:53 +0200418 if self._pipe is not None:
419 polling = selector_events._test_selector_event(
420 self._loop._selector,
421 self._fileno, selectors.EVENT_WRITE)
422 if polling:
423 info.append('polling')
424 else:
425 info.append('idle')
426
427 bufsize = self.get_write_buffer_size()
428 info.append('bufsize=%s' % bufsize)
429 else:
430 info.append('closed')
431 return '<%s>' % ' '.join(info)
432
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800433 def get_write_buffer_size(self):
434 return sum(len(data) for data in self._buffer)
435
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700437 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200438 if self._loop.get_debug():
439 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100440 if self._buffer:
441 self._close(BrokenPipeError())
442 else:
443 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444
445 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800446 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
447 if isinstance(data, bytearray):
448 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700449 if not data:
450 return
451
452 if self._conn_lost or self._closing:
453 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700454 logger.warning('pipe closed by peer or '
455 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700456 self._conn_lost += 1
457 return
458
459 if not self._buffer:
460 # Attempt to send it right away first.
461 try:
462 n = os.write(self._fileno, data)
463 except (BlockingIOError, InterruptedError):
464 n = 0
465 except Exception as exc:
466 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100467 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700468 return
469 if n == len(data):
470 return
471 elif n > 0:
472 data = data[n:]
473 self._loop.add_writer(self._fileno, self._write_ready)
474
475 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800476 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700477
478 def _write_ready(self):
479 data = b''.join(self._buffer)
480 assert data, 'Data should not be empty'
481
482 self._buffer.clear()
483 try:
484 n = os.write(self._fileno, data)
485 except (BlockingIOError, InterruptedError):
486 self._buffer.append(data)
487 except Exception as exc:
488 self._conn_lost += 1
489 # Remove writer here, _fatal_error() doesn't it
490 # because _buffer is empty.
491 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100492 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700493 else:
494 if n == len(data):
495 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800496 self._maybe_resume_protocol() # May append to buffer.
497 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 self._loop.remove_reader(self._fileno)
499 self._call_connection_lost(None)
500 return
501 elif n > 0:
502 data = data[n:]
503
504 self._buffer.append(data) # Try again later.
505
506 def can_write_eof(self):
507 return True
508
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700509 def write_eof(self):
510 if self._closing:
511 return
512 assert self._pipe
513 self._closing = True
514 if not self._buffer:
515 self._loop.remove_reader(self._fileno)
516 self._loop.call_soon(self._call_connection_lost, None)
517
518 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100519 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700520 # write_eof is all what we needed to close the write pipe
521 self.write_eof()
522
523 def abort(self):
524 self._close(None)
525
Victor Stinner0ee29c22014-02-19 01:40:41 +0100526 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700527 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200528 if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
529 if self._loop.get_debug():
530 logger.debug("%r: %s", self, message, exc_info=True)
531 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500532 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100533 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500534 'exception': exc,
535 'transport': self,
536 'protocol': self._protocol,
537 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700538 self._close(exc)
539
540 def _close(self, exc=None):
541 self._closing = True
542 if self._buffer:
543 self._loop.remove_writer(self._fileno)
544 self._buffer.clear()
545 self._loop.remove_reader(self._fileno)
546 self._loop.call_soon(self._call_connection_lost, exc)
547
548 def _call_connection_lost(self, exc):
549 try:
550 self._protocol.connection_lost(exc)
551 finally:
552 self._pipe.close()
553 self._pipe = None
554 self._protocol = None
555 self._loop = None
556
557
Victor Stinner1e40f102014-12-11 23:30:17 +0100558if hasattr(os, 'set_inheritable'):
559 # Python 3.4 and newer
560 _set_inheritable = os.set_inheritable
561else:
562 import fcntl
563
564 def _set_inheritable(fd, inheritable):
565 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
566
567 old = fcntl.fcntl(fd, fcntl.F_GETFD)
568 if not inheritable:
569 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
570 else:
571 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
572
573
Guido van Rossum59691282013-10-30 14:52:03 -0700574class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700575
Guido van Rossum59691282013-10-30 14:52:03 -0700576 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700577 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700578 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700579 # Use a socket pair for stdin, since not all platforms
580 # support selecting read events on the write end of a
581 # socket (which we use in order to detect closing of the
582 # other end). Notably this is needed on AIX, and works
583 # just fine on other platforms.
584 stdin, stdin_w = self._loop._socketpair()
Victor Stinner1e40f102014-12-11 23:30:17 +0100585
586 # Mark the write end of the stdin pipe as non-inheritable,
587 # needed by close_fds=False on Python 3.3 and older
588 # (Python 3.4 implements the PEP 446, socketpair returns
589 # non-inheritable sockets)
590 _set_inheritable(stdin_w.fileno(), False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700591 self._proc = subprocess.Popen(
592 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
593 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700594 if stdin_w is not None:
595 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200596 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800597
598
599class AbstractChildWatcher:
600 """Abstract base class for monitoring child processes.
601
602 Objects derived from this class monitor a collection of subprocesses and
603 report their termination or interruption by a signal.
604
605 New callbacks are registered with .add_child_handler(). Starting a new
606 process must be done within a 'with' block to allow the watcher to suspend
607 its activity until the new process if fully registered (this is needed to
608 prevent a race condition in some implementations).
609
610 Example:
611 with watcher:
612 proc = subprocess.Popen("sleep 1")
613 watcher.add_child_handler(proc.pid, callback)
614
615 Notes:
616 Implementations of this class must be thread-safe.
617
618 Since child watcher objects may catch the SIGCHLD signal and call
619 waitpid(-1), there should be only one active object per process.
620 """
621
622 def add_child_handler(self, pid, callback, *args):
623 """Register a new child handler.
624
625 Arrange for callback(pid, returncode, *args) to be called when
626 process 'pid' terminates. Specifying another callback for the same
627 process replaces the previous handler.
628
Victor Stinneracdb7822014-07-14 18:33:40 +0200629 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800630 """
631 raise NotImplementedError()
632
633 def remove_child_handler(self, pid):
634 """Removes the handler for process 'pid'.
635
636 The function returns True if the handler was successfully removed,
637 False if there was nothing to remove."""
638
639 raise NotImplementedError()
640
Guido van Rossum2bcae702013-11-13 15:50:08 -0800641 def attach_loop(self, loop):
642 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800643
Guido van Rossum2bcae702013-11-13 15:50:08 -0800644 If the watcher was previously attached to an event loop, then it is
645 first detached before attaching to the new loop.
646
647 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800648 """
649 raise NotImplementedError()
650
651 def close(self):
652 """Close the watcher.
653
654 This must be called to make sure that any underlying resource is freed.
655 """
656 raise NotImplementedError()
657
658 def __enter__(self):
659 """Enter the watcher's context and allow starting new processes
660
661 This function must return self"""
662 raise NotImplementedError()
663
664 def __exit__(self, a, b, c):
665 """Exit the watcher's context"""
666 raise NotImplementedError()
667
668
669class BaseChildWatcher(AbstractChildWatcher):
670
Guido van Rossum2bcae702013-11-13 15:50:08 -0800671 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800672 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800673
674 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800675 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800676
677 def _do_waitpid(self, expected_pid):
678 raise NotImplementedError()
679
680 def _do_waitpid_all(self):
681 raise NotImplementedError()
682
Guido van Rossum2bcae702013-11-13 15:50:08 -0800683 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800684 assert loop is None or isinstance(loop, events.AbstractEventLoop)
685
686 if self._loop is not None:
687 self._loop.remove_signal_handler(signal.SIGCHLD)
688
689 self._loop = loop
690 if loop is not None:
691 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
692
693 # Prevent a race condition in case a child terminated
694 # during the switch.
695 self._do_waitpid_all()
696
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800697 def _sig_chld(self):
698 try:
699 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500700 except Exception as exc:
701 # self._loop should always be available here
702 # as '_sig_chld' is added as a signal handler
703 # in 'attach_loop'
704 self._loop.call_exception_handler({
705 'message': 'Unknown exception in SIGCHLD handler',
706 'exception': exc,
707 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800708
709 def _compute_returncode(self, status):
710 if os.WIFSIGNALED(status):
711 # The child process died because of a signal.
712 return -os.WTERMSIG(status)
713 elif os.WIFEXITED(status):
714 # The child process exited (e.g sys.exit()).
715 return os.WEXITSTATUS(status)
716 else:
717 # The child exited, but we don't understand its status.
718 # This shouldn't happen, but if it does, let's just
719 # return that status; perhaps that helps debug it.
720 return status
721
722
723class SafeChildWatcher(BaseChildWatcher):
724 """'Safe' child watcher implementation.
725
726 This implementation avoids disrupting other code spawning processes by
727 polling explicitly each process in the SIGCHLD handler instead of calling
728 os.waitpid(-1).
729
730 This is a safe solution but it has a significant overhead when handling a
731 big number of children (O(n) each time SIGCHLD is raised)
732 """
733
Guido van Rossum2bcae702013-11-13 15:50:08 -0800734 def __init__(self):
735 super().__init__()
736 self._callbacks = {}
737
738 def close(self):
739 self._callbacks.clear()
740 super().close()
741
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800742 def __enter__(self):
743 return self
744
745 def __exit__(self, a, b, c):
746 pass
747
748 def add_child_handler(self, pid, callback, *args):
749 self._callbacks[pid] = callback, args
750
751 # Prevent a race condition in case the child is already terminated.
752 self._do_waitpid(pid)
753
Guido van Rossum2bcae702013-11-13 15:50:08 -0800754 def remove_child_handler(self, pid):
755 try:
756 del self._callbacks[pid]
757 return True
758 except KeyError:
759 return False
760
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800761 def _do_waitpid_all(self):
762
763 for pid in list(self._callbacks):
764 self._do_waitpid(pid)
765
766 def _do_waitpid(self, expected_pid):
767 assert expected_pid > 0
768
769 try:
770 pid, status = os.waitpid(expected_pid, os.WNOHANG)
771 except ChildProcessError:
772 # The child process is already reaped
773 # (may happen if waitpid() is called elsewhere).
774 pid = expected_pid
775 returncode = 255
776 logger.warning(
777 "Unknown child process pid %d, will report returncode 255",
778 pid)
779 else:
780 if pid == 0:
781 # The child process is still alive.
782 return
783
784 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200785 if self._loop.get_debug():
786 logger.debug('process %s exited with returncode %s',
787 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800788
789 try:
790 callback, args = self._callbacks.pop(pid)
791 except KeyError: # pragma: no cover
792 # May happen if .remove_child_handler() is called
793 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200794 if self._loop.get_debug():
795 logger.warning("Child watcher got an unexpected pid: %r",
796 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800797 else:
798 callback(pid, returncode, *args)
799
800
801class FastChildWatcher(BaseChildWatcher):
802 """'Fast' child watcher implementation.
803
804 This implementation reaps every terminated processes by calling
805 os.waitpid(-1) directly, possibly breaking other code spawning processes
806 and waiting for their termination.
807
808 There is no noticeable overhead when handling a big number of children
809 (O(1) each time a child terminates).
810 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800811 def __init__(self):
812 super().__init__()
813 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800814 self._lock = threading.Lock()
815 self._zombies = {}
816 self._forks = 0
817
818 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800819 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800820 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800821 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800822
823 def __enter__(self):
824 with self._lock:
825 self._forks += 1
826
827 return self
828
829 def __exit__(self, a, b, c):
830 with self._lock:
831 self._forks -= 1
832
833 if self._forks or not self._zombies:
834 return
835
836 collateral_victims = str(self._zombies)
837 self._zombies.clear()
838
839 logger.warning(
840 "Caught subprocesses termination from unknown pids: %s",
841 collateral_victims)
842
843 def add_child_handler(self, pid, callback, *args):
844 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800845 with self._lock:
846 try:
847 returncode = self._zombies.pop(pid)
848 except KeyError:
849 # The child is running.
850 self._callbacks[pid] = callback, args
851 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800852
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800853 # The child is dead already. We can fire the callback.
854 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800855
Guido van Rossum2bcae702013-11-13 15:50:08 -0800856 def remove_child_handler(self, pid):
857 try:
858 del self._callbacks[pid]
859 return True
860 except KeyError:
861 return False
862
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800863 def _do_waitpid_all(self):
864 # Because of signal coalescing, we must keep calling waitpid() as
865 # long as we're able to reap a child.
866 while True:
867 try:
868 pid, status = os.waitpid(-1, os.WNOHANG)
869 except ChildProcessError:
870 # No more child processes exist.
871 return
872 else:
873 if pid == 0:
874 # A child process is still alive.
875 return
876
877 returncode = self._compute_returncode(status)
878
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800879 with self._lock:
880 try:
881 callback, args = self._callbacks.pop(pid)
882 except KeyError:
883 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800884 if self._forks:
885 # It may not be registered yet.
886 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200887 if self._loop.get_debug():
888 logger.debug('unknown process %s exited '
889 'with returncode %s',
890 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800891 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800892 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200893 else:
894 if self._loop.get_debug():
895 logger.debug('process %s exited with returncode %s',
896 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800897
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800898 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800899 logger.warning(
900 "Caught subprocess termination from unknown pid: "
901 "%d -> %d", pid, returncode)
902 else:
903 callback(pid, returncode, *args)
904
905
906class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +0100907 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800908 _loop_factory = _UnixSelectorEventLoop
909
910 def __init__(self):
911 super().__init__()
912 self._watcher = None
913
914 def _init_watcher(self):
915 with events._lock:
916 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800917 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800918 if isinstance(threading.current_thread(),
919 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800920 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800921
922 def set_event_loop(self, loop):
923 """Set the event loop.
924
925 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800926 .set_event_loop() from the main thread will call .attach_loop(loop) on
927 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800928 """
929
930 super().set_event_loop(loop)
931
932 if self._watcher is not None and \
933 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800934 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800935
936 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200937 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800938
939 If not yet set, a SafeChildWatcher object is automatically created.
940 """
941 if self._watcher is None:
942 self._init_watcher()
943
944 return self._watcher
945
946 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200947 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800948
949 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
950
951 if self._watcher is not None:
952 self._watcher.close()
953
954 self._watcher = watcher
955
956SelectorEventLoop = _UnixSelectorEventLoop
957DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy