blob: 97f9addde88a0ada13eba3e26907918bbe2bcb69 [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Yury Selivanovb057c522014-02-18 12:15:06 -050013from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080016from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020019from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020021from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070022from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
24
Victor Stinner915bcb02014-02-01 22:49:59 +010025__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080026 'AbstractChildWatcher', 'SafeChildWatcher',
27 'FastChildWatcher', 'DefaultEventLoopPolicy',
28 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030if sys.platform == 'win32': # pragma: no cover
31 raise ImportError('Signals are not really supported on Windows')
32
33
Victor Stinnerfe5649c2014-07-17 22:43:40 +020034def _sighandler_noop(signum, frame):
35 """Dummy signal handler."""
36 pass
37
38
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080039class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050040 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
Yury Selivanovb057c522014-02-18 12:15:06 -050042 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043 """
44
45 def __init__(self, selector=None):
46 super().__init__(selector)
47 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048
49 def _socketpair(self):
50 return socket.socketpair()
51
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080052 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020053 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080054 for sig in list(self._signal_handlers):
55 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080056
Victor Stinnerfe5649c2014-07-17 22:43:40 +020057 def _process_self_data(self, data):
58 for signum in data:
59 if not signum:
60 # ignore null bytes written by _write_to_self()
61 continue
62 self._handle_signal(signum)
63
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064 def add_signal_handler(self, sig, callback, *args):
65 """Add a handler for a signal. UNIX only.
66
67 Raise ValueError if the signal number is invalid or uncatchable.
68 Raise RuntimeError if there is a problem setting up the handler.
69 """
Victor Stinner2d99d932014-11-20 15:03:52 +010070 if (coroutines.iscoroutine(callback)
71 or coroutines.iscoroutinefunction(callback)):
Victor Stinner15cc6782015-01-09 00:09:10 +010072 raise TypeError("coroutines cannot be used "
73 "with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 self._check_signal(sig)
Victor Stinnere80bf0d2014-12-04 23:07:47 +010075 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 try:
77 # set_wakeup_fd() raises ValueError if this is not the
78 # main thread. By calling it early we ensure that an
79 # event loop running in another thread cannot add a signal
80 # handler.
81 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020082 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083 raise RuntimeError(str(exc))
84
Yury Selivanov569efa22014-02-18 18:02:19 -050085 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 self._signal_handlers[sig] = handle
87
88 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020089 # Register a dummy signal handler to ask Python to write the signal
90 # number in the wakup file descriptor. _process_self_data() will
91 # read signal numbers from this file descriptor to handle signals.
92 signal.signal(sig, _sighandler_noop)
93
Charles-François Natali74e7cf32013-12-05 22:47:19 +010094 # Set SA_RESTART to limit EINTR occurrences.
95 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096 except OSError as exc:
97 del self._signal_handlers[sig]
98 if not self._signal_handlers:
99 try:
100 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200101 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700102 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700103
104 if exc.errno == errno.EINVAL:
105 raise RuntimeError('sig {} cannot be caught'.format(sig))
106 else:
107 raise
108
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200109 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110 """Internal helper that is the actual signal handler."""
111 handle = self._signal_handlers.get(sig)
112 if handle is None:
113 return # Assume it's some race condition.
114 if handle._cancelled:
115 self.remove_signal_handler(sig) # Remove it properly.
116 else:
117 self._add_callback_signalsafe(handle)
118
119 def remove_signal_handler(self, sig):
120 """Remove a handler for a signal. UNIX only.
121
122 Return True if a signal handler was removed, False if not.
123 """
124 self._check_signal(sig)
125 try:
126 del self._signal_handlers[sig]
127 except KeyError:
128 return False
129
130 if sig == signal.SIGINT:
131 handler = signal.default_int_handler
132 else:
133 handler = signal.SIG_DFL
134
135 try:
136 signal.signal(sig, handler)
137 except OSError as exc:
138 if exc.errno == errno.EINVAL:
139 raise RuntimeError('sig {} cannot be caught'.format(sig))
140 else:
141 raise
142
143 if not self._signal_handlers:
144 try:
145 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200146 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700147 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700148
149 return True
150
151 def _check_signal(self, sig):
152 """Internal helper to validate a signal.
153
154 Raise ValueError if the signal number is invalid or uncatchable.
155 Raise RuntimeError if there is a problem setting up the handler.
156 """
157 if not isinstance(sig, int):
158 raise TypeError('sig must be an int, not {!r}'.format(sig))
159
160 if not (1 <= sig < signal.NSIG):
161 raise ValueError(
162 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
163
164 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
165 extra=None):
166 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
167
168 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
169 extra=None):
170 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
171
Victor Stinnerf951d282014-06-29 00:46:45 +0200172 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700173 def _make_subprocess_transport(self, protocol, args, shell,
174 stdin, stdout, stderr, bufsize,
175 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800176 with events.get_child_watcher() as watcher:
177 transp = _UnixSubprocessTransport(self, protocol, args, shell,
178 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800179 extra=extra, **kwargs)
Victor Stinner4bf22e02015-01-15 14:24:22 +0100180 try:
181 yield from transp._post_init()
182 except:
183 transp.close()
184 raise
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800185 watcher.add_child_handler(transp.get_pid(),
186 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800187
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700188 return transp
189
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800190 def _child_watcher_callback(self, pid, returncode, transp):
191 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700192
Victor Stinnerf951d282014-06-29 00:46:45 +0200193 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500194 def create_unix_connection(self, protocol_factory, path, *,
195 ssl=None, sock=None,
196 server_hostname=None):
197 assert server_hostname is None or isinstance(server_hostname, str)
198 if ssl:
199 if server_hostname is None:
200 raise ValueError(
201 'you have to pass server_hostname when using ssl')
202 else:
203 if server_hostname is not None:
204 raise ValueError('server_hostname is only meaningful with ssl')
205
206 if path is not None:
207 if sock is not None:
208 raise ValueError(
209 'path and sock can not be specified at the same time')
210
Victor Stinner79a29522014-02-19 01:45:59 +0100211 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500212 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500213 sock.setblocking(False)
214 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100215 except:
216 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500217 raise
218
219 else:
220 if sock is None:
221 raise ValueError('no path and sock were specified')
222 sock.setblocking(False)
223
224 transport, protocol = yield from self._create_connection_transport(
225 sock, protocol_factory, ssl, server_hostname)
226 return transport, protocol
227
Victor Stinnerf951d282014-06-29 00:46:45 +0200228 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500229 def create_unix_server(self, protocol_factory, path=None, *,
230 sock=None, backlog=100, ssl=None):
231 if isinstance(ssl, bool):
232 raise TypeError('ssl argument must be an SSLContext or None')
233
234 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200235 if sock is not None:
236 raise ValueError(
237 'path and sock can not be specified at the same time')
238
Yury Selivanovb057c522014-02-18 12:15:06 -0500239 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
240
241 try:
242 sock.bind(path)
243 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100244 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500245 if exc.errno == errno.EADDRINUSE:
246 # Let's improve the error message by adding
247 # with what exact address it occurs.
248 msg = 'Address {!r} is already in use'.format(path)
249 raise OSError(errno.EADDRINUSE, msg) from None
250 else:
251 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200252 except:
253 sock.close()
254 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500255 else:
256 if sock is None:
257 raise ValueError(
258 'path was not specified, and no sock specified')
259
260 if sock.family != socket.AF_UNIX:
261 raise ValueError(
262 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
263
264 server = base_events.Server(self, [sock])
265 sock.listen(backlog)
266 sock.setblocking(False)
267 self._start_serving(protocol_factory, sock, ssl, server)
268 return server
269
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700270
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200271if hasattr(os, 'set_blocking'):
272 def _set_nonblocking(fd):
273 os.set_blocking(fd, False)
274else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400275 import fcntl
276
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200277 def _set_nonblocking(fd):
278 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
279 flags = flags | os.O_NONBLOCK
280 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281
282
283class _UnixReadPipeTransport(transports.ReadTransport):
284
Yury Selivanovdec1a452014-02-18 22:27:48 -0500285 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286
287 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
288 super().__init__(extra)
289 self._extra['pipe'] = pipe
290 self._loop = loop
291 self._pipe = pipe
292 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700293 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800294 if not (stat.S_ISFIFO(mode) or
295 stat.S_ISSOCK(mode) or
296 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700297 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700298 _set_nonblocking(self._fileno)
299 self._protocol = protocol
300 self._closing = False
301 self._loop.add_reader(self._fileno, self._read_ready)
302 self._loop.call_soon(self._protocol.connection_made, self)
303 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200304 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200305 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306
Victor Stinnere912e652014-07-12 03:11:53 +0200307 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100308 info = [self.__class__.__name__]
309 if self._pipe is None:
310 info.append('closed')
311 elif self._closing:
312 info.append('closing')
313 info.append('fd=%s' % self._fileno)
Victor Stinnere912e652014-07-12 03:11:53 +0200314 if self._pipe is not None:
315 polling = selector_events._test_selector_event(
316 self._loop._selector,
317 self._fileno, selectors.EVENT_READ)
318 if polling:
319 info.append('polling')
320 else:
321 info.append('idle')
322 else:
323 info.append('closed')
324 return '<%s>' % ' '.join(info)
325
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326 def _read_ready(self):
327 try:
328 data = os.read(self._fileno, self.max_size)
329 except (BlockingIOError, InterruptedError):
330 pass
331 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100332 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700333 else:
334 if data:
335 self._protocol.data_received(data)
336 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200337 if self._loop.get_debug():
338 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 self._closing = True
340 self._loop.remove_reader(self._fileno)
341 self._loop.call_soon(self._protocol.eof_received)
342 self._loop.call_soon(self._call_connection_lost, None)
343
Guido van Rossum57497ad2013-10-18 07:58:20 -0700344 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 self._loop.remove_reader(self._fileno)
346
Guido van Rossum57497ad2013-10-18 07:58:20 -0700347 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700348 self._loop.add_reader(self._fileno, self._read_ready)
349
350 def close(self):
351 if not self._closing:
352 self._close(None)
353
Victor Stinner0ee29c22014-02-19 01:40:41 +0100354 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200356 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
357 if self._loop.get_debug():
358 logger.debug("%r: %s", self, message, exc_info=True)
359 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500360 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100361 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500362 'exception': exc,
363 'transport': self,
364 'protocol': self._protocol,
365 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 self._close(exc)
367
368 def _close(self, exc):
369 self._closing = True
370 self._loop.remove_reader(self._fileno)
371 self._loop.call_soon(self._call_connection_lost, exc)
372
373 def _call_connection_lost(self, exc):
374 try:
375 self._protocol.connection_lost(exc)
376 finally:
377 self._pipe.close()
378 self._pipe = None
379 self._protocol = None
380 self._loop = None
381
382
Yury Selivanov3cb99142014-02-18 18:41:13 -0500383class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800384 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385
386 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100387 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700389 self._pipe = pipe
390 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700391 mode = os.fstat(self._fileno).st_mode
392 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100393 if not (is_socket or
394 stat.S_ISFIFO(mode) or
395 stat.S_ISCHR(mode)):
396 raise ValueError("Pipe transport is only for "
397 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 _set_nonblocking(self._fileno)
399 self._protocol = protocol
400 self._buffer = []
401 self._conn_lost = 0
402 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700403
404 # On AIX, the reader trick only works for sockets.
405 # On other platforms it works for pipes and sockets.
406 # (Exception: OS X 10.4? Issue #19294.)
407 if is_socket or not sys.platform.startswith("aix"):
408 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409
410 self._loop.call_soon(self._protocol.connection_made, self)
411 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200412 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200413 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700414
Victor Stinnere912e652014-07-12 03:11:53 +0200415 def __repr__(self):
Victor Stinner29ad0112015-01-15 00:04:21 +0100416 info = [self.__class__.__name__]
417 if self._pipe is None:
418 info.append('closed')
419 elif self._closing:
420 info.append('closing')
421 info.append('fd=%s' % self._fileno)
Victor Stinnere912e652014-07-12 03:11:53 +0200422 if self._pipe is not None:
423 polling = selector_events._test_selector_event(
424 self._loop._selector,
425 self._fileno, selectors.EVENT_WRITE)
426 if polling:
427 info.append('polling')
428 else:
429 info.append('idle')
430
431 bufsize = self.get_write_buffer_size()
432 info.append('bufsize=%s' % bufsize)
433 else:
434 info.append('closed')
435 return '<%s>' % ' '.join(info)
436
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800437 def get_write_buffer_size(self):
438 return sum(len(data) for data in self._buffer)
439
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700440 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700441 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200442 if self._loop.get_debug():
443 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100444 if self._buffer:
445 self._close(BrokenPipeError())
446 else:
447 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448
449 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800450 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
451 if isinstance(data, bytearray):
452 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453 if not data:
454 return
455
456 if self._conn_lost or self._closing:
457 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700458 logger.warning('pipe closed by peer or '
459 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460 self._conn_lost += 1
461 return
462
463 if not self._buffer:
464 # Attempt to send it right away first.
465 try:
466 n = os.write(self._fileno, data)
467 except (BlockingIOError, InterruptedError):
468 n = 0
469 except Exception as exc:
470 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100471 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700472 return
473 if n == len(data):
474 return
475 elif n > 0:
476 data = data[n:]
477 self._loop.add_writer(self._fileno, self._write_ready)
478
479 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800480 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700481
482 def _write_ready(self):
483 data = b''.join(self._buffer)
484 assert data, 'Data should not be empty'
485
486 self._buffer.clear()
487 try:
488 n = os.write(self._fileno, data)
489 except (BlockingIOError, InterruptedError):
490 self._buffer.append(data)
491 except Exception as exc:
492 self._conn_lost += 1
493 # Remove writer here, _fatal_error() doesn't it
494 # because _buffer is empty.
495 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100496 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700497 else:
498 if n == len(data):
499 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800500 self._maybe_resume_protocol() # May append to buffer.
501 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700502 self._loop.remove_reader(self._fileno)
503 self._call_connection_lost(None)
504 return
505 elif n > 0:
506 data = data[n:]
507
508 self._buffer.append(data) # Try again later.
509
510 def can_write_eof(self):
511 return True
512
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700513 def write_eof(self):
514 if self._closing:
515 return
516 assert self._pipe
517 self._closing = True
518 if not self._buffer:
519 self._loop.remove_reader(self._fileno)
520 self._loop.call_soon(self._call_connection_lost, None)
521
522 def close(self):
Victor Stinner41ed9582015-01-15 13:16:50 +0100523 if self._pipe is not None and not self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700524 # write_eof is all what we needed to close the write pipe
525 self.write_eof()
526
527 def abort(self):
528 self._close(None)
529
Victor Stinner0ee29c22014-02-19 01:40:41 +0100530 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200532 if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
533 if self._loop.get_debug():
534 logger.debug("%r: %s", self, message, exc_info=True)
535 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500536 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100537 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500538 'exception': exc,
539 'transport': self,
540 'protocol': self._protocol,
541 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542 self._close(exc)
543
544 def _close(self, exc=None):
545 self._closing = True
546 if self._buffer:
547 self._loop.remove_writer(self._fileno)
548 self._buffer.clear()
549 self._loop.remove_reader(self._fileno)
550 self._loop.call_soon(self._call_connection_lost, exc)
551
552 def _call_connection_lost(self, exc):
553 try:
554 self._protocol.connection_lost(exc)
555 finally:
556 self._pipe.close()
557 self._pipe = None
558 self._protocol = None
559 self._loop = None
560
561
Victor Stinner1e40f102014-12-11 23:30:17 +0100562if hasattr(os, 'set_inheritable'):
563 # Python 3.4 and newer
564 _set_inheritable = os.set_inheritable
565else:
566 import fcntl
567
568 def _set_inheritable(fd, inheritable):
569 cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
570
571 old = fcntl.fcntl(fd, fcntl.F_GETFD)
572 if not inheritable:
573 fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
574 else:
575 fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
576
577
Guido van Rossum59691282013-10-30 14:52:03 -0700578class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579
Guido van Rossum59691282013-10-30 14:52:03 -0700580 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700581 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700582 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700583 # Use a socket pair for stdin, since not all platforms
584 # support selecting read events on the write end of a
585 # socket (which we use in order to detect closing of the
586 # other end). Notably this is needed on AIX, and works
587 # just fine on other platforms.
588 stdin, stdin_w = self._loop._socketpair()
Victor Stinner1e40f102014-12-11 23:30:17 +0100589
590 # Mark the write end of the stdin pipe as non-inheritable,
591 # needed by close_fds=False on Python 3.3 and older
592 # (Python 3.4 implements the PEP 446, socketpair returns
593 # non-inheritable sockets)
594 _set_inheritable(stdin_w.fileno(), False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700595 self._proc = subprocess.Popen(
596 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
597 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700598 if stdin_w is not None:
599 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200600 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800601
602
603class AbstractChildWatcher:
604 """Abstract base class for monitoring child processes.
605
606 Objects derived from this class monitor a collection of subprocesses and
607 report their termination or interruption by a signal.
608
609 New callbacks are registered with .add_child_handler(). Starting a new
610 process must be done within a 'with' block to allow the watcher to suspend
611 its activity until the new process if fully registered (this is needed to
612 prevent a race condition in some implementations).
613
614 Example:
615 with watcher:
616 proc = subprocess.Popen("sleep 1")
617 watcher.add_child_handler(proc.pid, callback)
618
619 Notes:
620 Implementations of this class must be thread-safe.
621
622 Since child watcher objects may catch the SIGCHLD signal and call
623 waitpid(-1), there should be only one active object per process.
624 """
625
626 def add_child_handler(self, pid, callback, *args):
627 """Register a new child handler.
628
629 Arrange for callback(pid, returncode, *args) to be called when
630 process 'pid' terminates. Specifying another callback for the same
631 process replaces the previous handler.
632
Victor Stinneracdb7822014-07-14 18:33:40 +0200633 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800634 """
635 raise NotImplementedError()
636
637 def remove_child_handler(self, pid):
638 """Removes the handler for process 'pid'.
639
640 The function returns True if the handler was successfully removed,
641 False if there was nothing to remove."""
642
643 raise NotImplementedError()
644
Guido van Rossum2bcae702013-11-13 15:50:08 -0800645 def attach_loop(self, loop):
646 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800647
Guido van Rossum2bcae702013-11-13 15:50:08 -0800648 If the watcher was previously attached to an event loop, then it is
649 first detached before attaching to the new loop.
650
651 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800652 """
653 raise NotImplementedError()
654
655 def close(self):
656 """Close the watcher.
657
658 This must be called to make sure that any underlying resource is freed.
659 """
660 raise NotImplementedError()
661
662 def __enter__(self):
663 """Enter the watcher's context and allow starting new processes
664
665 This function must return self"""
666 raise NotImplementedError()
667
668 def __exit__(self, a, b, c):
669 """Exit the watcher's context"""
670 raise NotImplementedError()
671
672
673class BaseChildWatcher(AbstractChildWatcher):
674
Guido van Rossum2bcae702013-11-13 15:50:08 -0800675 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800676 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800677
678 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800679 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800680
681 def _do_waitpid(self, expected_pid):
682 raise NotImplementedError()
683
684 def _do_waitpid_all(self):
685 raise NotImplementedError()
686
Guido van Rossum2bcae702013-11-13 15:50:08 -0800687 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800688 assert loop is None or isinstance(loop, events.AbstractEventLoop)
689
690 if self._loop is not None:
691 self._loop.remove_signal_handler(signal.SIGCHLD)
692
693 self._loop = loop
694 if loop is not None:
695 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
696
697 # Prevent a race condition in case a child terminated
698 # during the switch.
699 self._do_waitpid_all()
700
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800701 def _sig_chld(self):
702 try:
703 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500704 except Exception as exc:
705 # self._loop should always be available here
706 # as '_sig_chld' is added as a signal handler
707 # in 'attach_loop'
708 self._loop.call_exception_handler({
709 'message': 'Unknown exception in SIGCHLD handler',
710 'exception': exc,
711 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800712
713 def _compute_returncode(self, status):
714 if os.WIFSIGNALED(status):
715 # The child process died because of a signal.
716 return -os.WTERMSIG(status)
717 elif os.WIFEXITED(status):
718 # The child process exited (e.g sys.exit()).
719 return os.WEXITSTATUS(status)
720 else:
721 # The child exited, but we don't understand its status.
722 # This shouldn't happen, but if it does, let's just
723 # return that status; perhaps that helps debug it.
724 return status
725
726
727class SafeChildWatcher(BaseChildWatcher):
728 """'Safe' child watcher implementation.
729
730 This implementation avoids disrupting other code spawning processes by
731 polling explicitly each process in the SIGCHLD handler instead of calling
732 os.waitpid(-1).
733
734 This is a safe solution but it has a significant overhead when handling a
735 big number of children (O(n) each time SIGCHLD is raised)
736 """
737
Guido van Rossum2bcae702013-11-13 15:50:08 -0800738 def __init__(self):
739 super().__init__()
740 self._callbacks = {}
741
742 def close(self):
743 self._callbacks.clear()
744 super().close()
745
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800746 def __enter__(self):
747 return self
748
749 def __exit__(self, a, b, c):
750 pass
751
752 def add_child_handler(self, pid, callback, *args):
753 self._callbacks[pid] = callback, args
754
755 # Prevent a race condition in case the child is already terminated.
756 self._do_waitpid(pid)
757
Guido van Rossum2bcae702013-11-13 15:50:08 -0800758 def remove_child_handler(self, pid):
759 try:
760 del self._callbacks[pid]
761 return True
762 except KeyError:
763 return False
764
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800765 def _do_waitpid_all(self):
766
767 for pid in list(self._callbacks):
768 self._do_waitpid(pid)
769
770 def _do_waitpid(self, expected_pid):
771 assert expected_pid > 0
772
773 try:
774 pid, status = os.waitpid(expected_pid, os.WNOHANG)
775 except ChildProcessError:
776 # The child process is already reaped
777 # (may happen if waitpid() is called elsewhere).
778 pid = expected_pid
779 returncode = 255
780 logger.warning(
781 "Unknown child process pid %d, will report returncode 255",
782 pid)
783 else:
784 if pid == 0:
785 # The child process is still alive.
786 return
787
788 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200789 if self._loop.get_debug():
790 logger.debug('process %s exited with returncode %s',
791 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800792
793 try:
794 callback, args = self._callbacks.pop(pid)
795 except KeyError: # pragma: no cover
796 # May happen if .remove_child_handler() is called
797 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200798 if self._loop.get_debug():
799 logger.warning("Child watcher got an unexpected pid: %r",
800 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800801 else:
802 callback(pid, returncode, *args)
803
804
805class FastChildWatcher(BaseChildWatcher):
806 """'Fast' child watcher implementation.
807
808 This implementation reaps every terminated processes by calling
809 os.waitpid(-1) directly, possibly breaking other code spawning processes
810 and waiting for their termination.
811
812 There is no noticeable overhead when handling a big number of children
813 (O(1) each time a child terminates).
814 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800815 def __init__(self):
816 super().__init__()
817 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800818 self._lock = threading.Lock()
819 self._zombies = {}
820 self._forks = 0
821
822 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800823 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800824 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800825 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800826
827 def __enter__(self):
828 with self._lock:
829 self._forks += 1
830
831 return self
832
833 def __exit__(self, a, b, c):
834 with self._lock:
835 self._forks -= 1
836
837 if self._forks or not self._zombies:
838 return
839
840 collateral_victims = str(self._zombies)
841 self._zombies.clear()
842
843 logger.warning(
844 "Caught subprocesses termination from unknown pids: %s",
845 collateral_victims)
846
847 def add_child_handler(self, pid, callback, *args):
848 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800849 with self._lock:
850 try:
851 returncode = self._zombies.pop(pid)
852 except KeyError:
853 # The child is running.
854 self._callbacks[pid] = callback, args
855 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800856
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800857 # The child is dead already. We can fire the callback.
858 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800859
Guido van Rossum2bcae702013-11-13 15:50:08 -0800860 def remove_child_handler(self, pid):
861 try:
862 del self._callbacks[pid]
863 return True
864 except KeyError:
865 return False
866
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800867 def _do_waitpid_all(self):
868 # Because of signal coalescing, we must keep calling waitpid() as
869 # long as we're able to reap a child.
870 while True:
871 try:
872 pid, status = os.waitpid(-1, os.WNOHANG)
873 except ChildProcessError:
874 # No more child processes exist.
875 return
876 else:
877 if pid == 0:
878 # A child process is still alive.
879 return
880
881 returncode = self._compute_returncode(status)
882
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800883 with self._lock:
884 try:
885 callback, args = self._callbacks.pop(pid)
886 except KeyError:
887 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800888 if self._forks:
889 # It may not be registered yet.
890 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200891 if self._loop.get_debug():
892 logger.debug('unknown process %s exited '
893 'with returncode %s',
894 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800895 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800896 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200897 else:
898 if self._loop.get_debug():
899 logger.debug('process %s exited with returncode %s',
900 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800901
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800902 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800903 logger.warning(
904 "Caught subprocess termination from unknown pid: "
905 "%d -> %d", pid, returncode)
906 else:
907 callback(pid, returncode, *args)
908
909
910class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
Victor Stinner70db9e42015-01-09 21:32:05 +0100911 """UNIX event loop policy with a watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800912 _loop_factory = _UnixSelectorEventLoop
913
914 def __init__(self):
915 super().__init__()
916 self._watcher = None
917
918 def _init_watcher(self):
919 with events._lock:
920 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800921 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800922 if isinstance(threading.current_thread(),
923 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800924 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800925
926 def set_event_loop(self, loop):
927 """Set the event loop.
928
929 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800930 .set_event_loop() from the main thread will call .attach_loop(loop) on
931 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800932 """
933
934 super().set_event_loop(loop)
935
936 if self._watcher is not None and \
937 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800938 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800939
940 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200941 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800942
943 If not yet set, a SafeChildWatcher object is automatically created.
944 """
945 if self._watcher is None:
946 self._init_watcher()
947
948 return self._watcher
949
950 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200951 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800952
953 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
954
955 if self._watcher is not None:
956 self._watcher.close()
957
958 self._watcher = watcher
959
960SelectorEventLoop = _UnixSelectorEventLoop
961DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy