blob: efe06d4a1999ee00746efec2de609a8f86b3a16e [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Yury Selivanovb057c522014-02-18 12:15:06 -050013from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
Guido van Rossume36fcde2014-11-14 11:45:47 -080016from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070018from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020019from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070020from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020021from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070022from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023
24
Victor Stinner915bcb02014-02-01 22:49:59 +010025__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080026 'AbstractChildWatcher', 'SafeChildWatcher',
27 'FastChildWatcher', 'DefaultEventLoopPolicy',
28 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030if sys.platform == 'win32': # pragma: no cover
31 raise ImportError('Signals are not really supported on Windows')
32
33
Victor Stinnerfe5649c2014-07-17 22:43:40 +020034def _sighandler_noop(signum, frame):
35 """Dummy signal handler."""
36 pass
37
38
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080039class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050040 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041
Yury Selivanovb057c522014-02-18 12:15:06 -050042 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043 """
44
45 def __init__(self, selector=None):
46 super().__init__(selector)
47 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048
49 def _socketpair(self):
50 return socket.socketpair()
51
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080052 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020053 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080054 for sig in list(self._signal_handlers):
55 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080056
Victor Stinnerfe5649c2014-07-17 22:43:40 +020057 def _process_self_data(self, data):
58 for signum in data:
59 if not signum:
60 # ignore null bytes written by _write_to_self()
61 continue
62 self._handle_signal(signum)
63
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064 def add_signal_handler(self, sig, callback, *args):
65 """Add a handler for a signal. UNIX only.
66
67 Raise ValueError if the signal number is invalid or uncatchable.
68 Raise RuntimeError if there is a problem setting up the handler.
69 """
Victor Stinner2d99d932014-11-20 15:03:52 +010070 if (coroutines.iscoroutine(callback)
71 or coroutines.iscoroutinefunction(callback)):
72 raise TypeError("coroutines cannot be used with add_signal_handler()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070073 self._check_signal(sig)
74 try:
75 # set_wakeup_fd() raises ValueError if this is not the
76 # main thread. By calling it early we ensure that an
77 # event loop running in another thread cannot add a signal
78 # handler.
79 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020080 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081 raise RuntimeError(str(exc))
82
Yury Selivanov569efa22014-02-18 18:02:19 -050083 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084 self._signal_handlers[sig] = handle
85
86 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020087 # Register a dummy signal handler to ask Python to write the signal
88 # number in the wakup file descriptor. _process_self_data() will
89 # read signal numbers from this file descriptor to handle signals.
90 signal.signal(sig, _sighandler_noop)
91
Charles-François Natali74e7cf32013-12-05 22:47:19 +010092 # Set SA_RESTART to limit EINTR occurrences.
93 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070094 except OSError as exc:
95 del self._signal_handlers[sig]
96 if not self._signal_handlers:
97 try:
98 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +020099 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700100 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700101
102 if exc.errno == errno.EINVAL:
103 raise RuntimeError('sig {} cannot be caught'.format(sig))
104 else:
105 raise
106
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200107 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700108 """Internal helper that is the actual signal handler."""
109 handle = self._signal_handlers.get(sig)
110 if handle is None:
111 return # Assume it's some race condition.
112 if handle._cancelled:
113 self.remove_signal_handler(sig) # Remove it properly.
114 else:
115 self._add_callback_signalsafe(handle)
116
117 def remove_signal_handler(self, sig):
118 """Remove a handler for a signal. UNIX only.
119
120 Return True if a signal handler was removed, False if not.
121 """
122 self._check_signal(sig)
123 try:
124 del self._signal_handlers[sig]
125 except KeyError:
126 return False
127
128 if sig == signal.SIGINT:
129 handler = signal.default_int_handler
130 else:
131 handler = signal.SIG_DFL
132
133 try:
134 signal.signal(sig, handler)
135 except OSError as exc:
136 if exc.errno == errno.EINVAL:
137 raise RuntimeError('sig {} cannot be caught'.format(sig))
138 else:
139 raise
140
141 if not self._signal_handlers:
142 try:
143 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200144 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700145 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146
147 return True
148
149 def _check_signal(self, sig):
150 """Internal helper to validate a signal.
151
152 Raise ValueError if the signal number is invalid or uncatchable.
153 Raise RuntimeError if there is a problem setting up the handler.
154 """
155 if not isinstance(sig, int):
156 raise TypeError('sig must be an int, not {!r}'.format(sig))
157
158 if not (1 <= sig < signal.NSIG):
159 raise ValueError(
160 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
161
162 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
163 extra=None):
164 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
165
166 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
167 extra=None):
168 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
169
Victor Stinnerf951d282014-06-29 00:46:45 +0200170 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171 def _make_subprocess_transport(self, protocol, args, shell,
172 stdin, stdout, stderr, bufsize,
173 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800174 with events.get_child_watcher() as watcher:
175 transp = _UnixSubprocessTransport(self, protocol, args, shell,
176 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800177 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800178 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800179 watcher.add_child_handler(transp.get_pid(),
180 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800181
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182 return transp
183
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800184 def _child_watcher_callback(self, pid, returncode, transp):
185 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700186
Victor Stinnerf951d282014-06-29 00:46:45 +0200187 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500188 def create_unix_connection(self, protocol_factory, path, *,
189 ssl=None, sock=None,
190 server_hostname=None):
191 assert server_hostname is None or isinstance(server_hostname, str)
192 if ssl:
193 if server_hostname is None:
194 raise ValueError(
195 'you have to pass server_hostname when using ssl')
196 else:
197 if server_hostname is not None:
198 raise ValueError('server_hostname is only meaningful with ssl')
199
200 if path is not None:
201 if sock is not None:
202 raise ValueError(
203 'path and sock can not be specified at the same time')
204
Victor Stinner79a29522014-02-19 01:45:59 +0100205 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500206 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500207 sock.setblocking(False)
208 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100209 except:
210 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500211 raise
212
213 else:
214 if sock is None:
215 raise ValueError('no path and sock were specified')
216 sock.setblocking(False)
217
218 transport, protocol = yield from self._create_connection_transport(
219 sock, protocol_factory, ssl, server_hostname)
220 return transport, protocol
221
Victor Stinnerf951d282014-06-29 00:46:45 +0200222 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500223 def create_unix_server(self, protocol_factory, path=None, *,
224 sock=None, backlog=100, ssl=None):
225 if isinstance(ssl, bool):
226 raise TypeError('ssl argument must be an SSLContext or None')
227
228 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200229 if sock is not None:
230 raise ValueError(
231 'path and sock can not be specified at the same time')
232
Yury Selivanovb057c522014-02-18 12:15:06 -0500233 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
234
235 try:
236 sock.bind(path)
237 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100238 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500239 if exc.errno == errno.EADDRINUSE:
240 # Let's improve the error message by adding
241 # with what exact address it occurs.
242 msg = 'Address {!r} is already in use'.format(path)
243 raise OSError(errno.EADDRINUSE, msg) from None
244 else:
245 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200246 except:
247 sock.close()
248 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500249 else:
250 if sock is None:
251 raise ValueError(
252 'path was not specified, and no sock specified')
253
254 if sock.family != socket.AF_UNIX:
255 raise ValueError(
256 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
257
258 server = base_events.Server(self, [sock])
259 sock.listen(backlog)
260 sock.setblocking(False)
261 self._start_serving(protocol_factory, sock, ssl, server)
262 return server
263
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200265if hasattr(os, 'set_blocking'):
266 def _set_nonblocking(fd):
267 os.set_blocking(fd, False)
268else:
Yury Selivanov8c0e0ab2014-09-24 23:21:39 -0400269 import fcntl
270
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200271 def _set_nonblocking(fd):
272 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
273 flags = flags | os.O_NONBLOCK
274 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700275
276
277class _UnixReadPipeTransport(transports.ReadTransport):
278
Yury Selivanovdec1a452014-02-18 22:27:48 -0500279 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700280
281 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
282 super().__init__(extra)
283 self._extra['pipe'] = pipe
284 self._loop = loop
285 self._pipe = pipe
286 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700287 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800288 if not (stat.S_ISFIFO(mode) or
289 stat.S_ISSOCK(mode) or
290 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700291 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700292 _set_nonblocking(self._fileno)
293 self._protocol = protocol
294 self._closing = False
295 self._loop.add_reader(self._fileno, self._read_ready)
296 self._loop.call_soon(self._protocol.connection_made, self)
297 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200298 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200299 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300
Victor Stinnere912e652014-07-12 03:11:53 +0200301 def __repr__(self):
302 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
303 if self._pipe is not None:
304 polling = selector_events._test_selector_event(
305 self._loop._selector,
306 self._fileno, selectors.EVENT_READ)
307 if polling:
308 info.append('polling')
309 else:
310 info.append('idle')
311 else:
312 info.append('closed')
313 return '<%s>' % ' '.join(info)
314
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315 def _read_ready(self):
316 try:
317 data = os.read(self._fileno, self.max_size)
318 except (BlockingIOError, InterruptedError):
319 pass
320 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100321 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 else:
323 if data:
324 self._protocol.data_received(data)
325 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200326 if self._loop.get_debug():
327 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328 self._closing = True
329 self._loop.remove_reader(self._fileno)
330 self._loop.call_soon(self._protocol.eof_received)
331 self._loop.call_soon(self._call_connection_lost, None)
332
Guido van Rossum57497ad2013-10-18 07:58:20 -0700333 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334 self._loop.remove_reader(self._fileno)
335
Guido van Rossum57497ad2013-10-18 07:58:20 -0700336 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700337 self._loop.add_reader(self._fileno, self._read_ready)
338
339 def close(self):
340 if not self._closing:
341 self._close(None)
342
Victor Stinner0ee29c22014-02-19 01:40:41 +0100343 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700344 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200345 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
346 if self._loop.get_debug():
347 logger.debug("%r: %s", self, message, exc_info=True)
348 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500349 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100350 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500351 'exception': exc,
352 'transport': self,
353 'protocol': self._protocol,
354 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355 self._close(exc)
356
357 def _close(self, exc):
358 self._closing = True
359 self._loop.remove_reader(self._fileno)
360 self._loop.call_soon(self._call_connection_lost, exc)
361
362 def _call_connection_lost(self, exc):
363 try:
364 self._protocol.connection_lost(exc)
365 finally:
366 self._pipe.close()
367 self._pipe = None
368 self._protocol = None
369 self._loop = None
370
371
Yury Selivanov3cb99142014-02-18 18:41:13 -0500372class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800373 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700374
375 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100376 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377 self._extra['pipe'] = pipe
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378 self._pipe = pipe
379 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700380 mode = os.fstat(self._fileno).st_mode
381 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100382 if not (is_socket or
383 stat.S_ISFIFO(mode) or
384 stat.S_ISCHR(mode)):
385 raise ValueError("Pipe transport is only for "
386 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700387 _set_nonblocking(self._fileno)
388 self._protocol = protocol
389 self._buffer = []
390 self._conn_lost = 0
391 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700392
393 # On AIX, the reader trick only works for sockets.
394 # On other platforms it works for pipes and sockets.
395 # (Exception: OS X 10.4? Issue #19294.)
396 if is_socket or not sys.platform.startswith("aix"):
397 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398
399 self._loop.call_soon(self._protocol.connection_made, self)
400 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200401 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200402 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403
Victor Stinnere912e652014-07-12 03:11:53 +0200404 def __repr__(self):
405 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
406 if self._pipe is not None:
407 polling = selector_events._test_selector_event(
408 self._loop._selector,
409 self._fileno, selectors.EVENT_WRITE)
410 if polling:
411 info.append('polling')
412 else:
413 info.append('idle')
414
415 bufsize = self.get_write_buffer_size()
416 info.append('bufsize=%s' % bufsize)
417 else:
418 info.append('closed')
419 return '<%s>' % ' '.join(info)
420
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800421 def get_write_buffer_size(self):
422 return sum(len(data) for data in self._buffer)
423
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700425 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200426 if self._loop.get_debug():
427 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100428 if self._buffer:
429 self._close(BrokenPipeError())
430 else:
431 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432
433 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800434 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
435 if isinstance(data, bytearray):
436 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437 if not data:
438 return
439
440 if self._conn_lost or self._closing:
441 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700442 logger.warning('pipe closed by peer or '
443 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444 self._conn_lost += 1
445 return
446
447 if not self._buffer:
448 # Attempt to send it right away first.
449 try:
450 n = os.write(self._fileno, data)
451 except (BlockingIOError, InterruptedError):
452 n = 0
453 except Exception as exc:
454 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100455 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700456 return
457 if n == len(data):
458 return
459 elif n > 0:
460 data = data[n:]
461 self._loop.add_writer(self._fileno, self._write_ready)
462
463 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800464 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465
466 def _write_ready(self):
467 data = b''.join(self._buffer)
468 assert data, 'Data should not be empty'
469
470 self._buffer.clear()
471 try:
472 n = os.write(self._fileno, data)
473 except (BlockingIOError, InterruptedError):
474 self._buffer.append(data)
475 except Exception as exc:
476 self._conn_lost += 1
477 # Remove writer here, _fatal_error() doesn't it
478 # because _buffer is empty.
479 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100480 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700481 else:
482 if n == len(data):
483 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800484 self._maybe_resume_protocol() # May append to buffer.
485 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700486 self._loop.remove_reader(self._fileno)
487 self._call_connection_lost(None)
488 return
489 elif n > 0:
490 data = data[n:]
491
492 self._buffer.append(data) # Try again later.
493
494 def can_write_eof(self):
495 return True
496
497 # TODO: Make the relationships between write_eof(), close(),
498 # abort(), _fatal_error() and _close() more straightforward.
499
500 def write_eof(self):
501 if self._closing:
502 return
503 assert self._pipe
504 self._closing = True
505 if not self._buffer:
506 self._loop.remove_reader(self._fileno)
507 self._loop.call_soon(self._call_connection_lost, None)
508
509 def close(self):
510 if not self._closing:
511 # write_eof is all what we needed to close the write pipe
512 self.write_eof()
513
514 def abort(self):
515 self._close(None)
516
Victor Stinner0ee29c22014-02-19 01:40:41 +0100517 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518 # should be called by exception handler only
Victor Stinnerb2614752014-08-25 23:20:52 +0200519 if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
520 if self._loop.get_debug():
521 logger.debug("%r: %s", self, message, exc_info=True)
522 else:
Yury Selivanov569efa22014-02-18 18:02:19 -0500523 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100524 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500525 'exception': exc,
526 'transport': self,
527 'protocol': self._protocol,
528 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700529 self._close(exc)
530
531 def _close(self, exc=None):
532 self._closing = True
533 if self._buffer:
534 self._loop.remove_writer(self._fileno)
535 self._buffer.clear()
536 self._loop.remove_reader(self._fileno)
537 self._loop.call_soon(self._call_connection_lost, exc)
538
539 def _call_connection_lost(self, exc):
540 try:
541 self._protocol.connection_lost(exc)
542 finally:
543 self._pipe.close()
544 self._pipe = None
545 self._protocol = None
546 self._loop = None
547
548
Guido van Rossum59691282013-10-30 14:52:03 -0700549class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700550
Guido van Rossum59691282013-10-30 14:52:03 -0700551 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700552 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700554 # Use a socket pair for stdin, since not all platforms
555 # support selecting read events on the write end of a
556 # socket (which we use in order to detect closing of the
557 # other end). Notably this is needed on AIX, and works
558 # just fine on other platforms.
559 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700560 self._proc = subprocess.Popen(
561 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
562 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700563 if stdin_w is not None:
564 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200565 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800566
567
568class AbstractChildWatcher:
569 """Abstract base class for monitoring child processes.
570
571 Objects derived from this class monitor a collection of subprocesses and
572 report their termination or interruption by a signal.
573
574 New callbacks are registered with .add_child_handler(). Starting a new
575 process must be done within a 'with' block to allow the watcher to suspend
576 its activity until the new process if fully registered (this is needed to
577 prevent a race condition in some implementations).
578
579 Example:
580 with watcher:
581 proc = subprocess.Popen("sleep 1")
582 watcher.add_child_handler(proc.pid, callback)
583
584 Notes:
585 Implementations of this class must be thread-safe.
586
587 Since child watcher objects may catch the SIGCHLD signal and call
588 waitpid(-1), there should be only one active object per process.
589 """
590
591 def add_child_handler(self, pid, callback, *args):
592 """Register a new child handler.
593
594 Arrange for callback(pid, returncode, *args) to be called when
595 process 'pid' terminates. Specifying another callback for the same
596 process replaces the previous handler.
597
Victor Stinneracdb7822014-07-14 18:33:40 +0200598 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800599 """
600 raise NotImplementedError()
601
602 def remove_child_handler(self, pid):
603 """Removes the handler for process 'pid'.
604
605 The function returns True if the handler was successfully removed,
606 False if there was nothing to remove."""
607
608 raise NotImplementedError()
609
Guido van Rossum2bcae702013-11-13 15:50:08 -0800610 def attach_loop(self, loop):
611 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800612
Guido van Rossum2bcae702013-11-13 15:50:08 -0800613 If the watcher was previously attached to an event loop, then it is
614 first detached before attaching to the new loop.
615
616 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800617 """
618 raise NotImplementedError()
619
620 def close(self):
621 """Close the watcher.
622
623 This must be called to make sure that any underlying resource is freed.
624 """
625 raise NotImplementedError()
626
627 def __enter__(self):
628 """Enter the watcher's context and allow starting new processes
629
630 This function must return self"""
631 raise NotImplementedError()
632
633 def __exit__(self, a, b, c):
634 """Exit the watcher's context"""
635 raise NotImplementedError()
636
637
638class BaseChildWatcher(AbstractChildWatcher):
639
Guido van Rossum2bcae702013-11-13 15:50:08 -0800640 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800641 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800642
643 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800644 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800645
646 def _do_waitpid(self, expected_pid):
647 raise NotImplementedError()
648
649 def _do_waitpid_all(self):
650 raise NotImplementedError()
651
Guido van Rossum2bcae702013-11-13 15:50:08 -0800652 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800653 assert loop is None or isinstance(loop, events.AbstractEventLoop)
654
655 if self._loop is not None:
656 self._loop.remove_signal_handler(signal.SIGCHLD)
657
658 self._loop = loop
659 if loop is not None:
660 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
661
662 # Prevent a race condition in case a child terminated
663 # during the switch.
664 self._do_waitpid_all()
665
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800666 def _sig_chld(self):
667 try:
668 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500669 except Exception as exc:
670 # self._loop should always be available here
671 # as '_sig_chld' is added as a signal handler
672 # in 'attach_loop'
673 self._loop.call_exception_handler({
674 'message': 'Unknown exception in SIGCHLD handler',
675 'exception': exc,
676 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800677
678 def _compute_returncode(self, status):
679 if os.WIFSIGNALED(status):
680 # The child process died because of a signal.
681 return -os.WTERMSIG(status)
682 elif os.WIFEXITED(status):
683 # The child process exited (e.g sys.exit()).
684 return os.WEXITSTATUS(status)
685 else:
686 # The child exited, but we don't understand its status.
687 # This shouldn't happen, but if it does, let's just
688 # return that status; perhaps that helps debug it.
689 return status
690
691
692class SafeChildWatcher(BaseChildWatcher):
693 """'Safe' child watcher implementation.
694
695 This implementation avoids disrupting other code spawning processes by
696 polling explicitly each process in the SIGCHLD handler instead of calling
697 os.waitpid(-1).
698
699 This is a safe solution but it has a significant overhead when handling a
700 big number of children (O(n) each time SIGCHLD is raised)
701 """
702
Guido van Rossum2bcae702013-11-13 15:50:08 -0800703 def __init__(self):
704 super().__init__()
705 self._callbacks = {}
706
707 def close(self):
708 self._callbacks.clear()
709 super().close()
710
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800711 def __enter__(self):
712 return self
713
714 def __exit__(self, a, b, c):
715 pass
716
717 def add_child_handler(self, pid, callback, *args):
718 self._callbacks[pid] = callback, args
719
720 # Prevent a race condition in case the child is already terminated.
721 self._do_waitpid(pid)
722
Guido van Rossum2bcae702013-11-13 15:50:08 -0800723 def remove_child_handler(self, pid):
724 try:
725 del self._callbacks[pid]
726 return True
727 except KeyError:
728 return False
729
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800730 def _do_waitpid_all(self):
731
732 for pid in list(self._callbacks):
733 self._do_waitpid(pid)
734
735 def _do_waitpid(self, expected_pid):
736 assert expected_pid > 0
737
738 try:
739 pid, status = os.waitpid(expected_pid, os.WNOHANG)
740 except ChildProcessError:
741 # The child process is already reaped
742 # (may happen if waitpid() is called elsewhere).
743 pid = expected_pid
744 returncode = 255
745 logger.warning(
746 "Unknown child process pid %d, will report returncode 255",
747 pid)
748 else:
749 if pid == 0:
750 # The child process is still alive.
751 return
752
753 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200754 if self._loop.get_debug():
755 logger.debug('process %s exited with returncode %s',
756 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800757
758 try:
759 callback, args = self._callbacks.pop(pid)
760 except KeyError: # pragma: no cover
761 # May happen if .remove_child_handler() is called
762 # after os.waitpid() returns.
Victor Stinnerb2614752014-08-25 23:20:52 +0200763 if self._loop.get_debug():
764 logger.warning("Child watcher got an unexpected pid: %r",
765 pid, exc_info=True)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800766 else:
767 callback(pid, returncode, *args)
768
769
770class FastChildWatcher(BaseChildWatcher):
771 """'Fast' child watcher implementation.
772
773 This implementation reaps every terminated processes by calling
774 os.waitpid(-1) directly, possibly breaking other code spawning processes
775 and waiting for their termination.
776
777 There is no noticeable overhead when handling a big number of children
778 (O(1) each time a child terminates).
779 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800780 def __init__(self):
781 super().__init__()
782 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800783 self._lock = threading.Lock()
784 self._zombies = {}
785 self._forks = 0
786
787 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800788 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800789 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800790 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800791
792 def __enter__(self):
793 with self._lock:
794 self._forks += 1
795
796 return self
797
798 def __exit__(self, a, b, c):
799 with self._lock:
800 self._forks -= 1
801
802 if self._forks or not self._zombies:
803 return
804
805 collateral_victims = str(self._zombies)
806 self._zombies.clear()
807
808 logger.warning(
809 "Caught subprocesses termination from unknown pids: %s",
810 collateral_victims)
811
812 def add_child_handler(self, pid, callback, *args):
813 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800814 with self._lock:
815 try:
816 returncode = self._zombies.pop(pid)
817 except KeyError:
818 # The child is running.
819 self._callbacks[pid] = callback, args
820 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800821
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800822 # The child is dead already. We can fire the callback.
823 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800824
Guido van Rossum2bcae702013-11-13 15:50:08 -0800825 def remove_child_handler(self, pid):
826 try:
827 del self._callbacks[pid]
828 return True
829 except KeyError:
830 return False
831
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800832 def _do_waitpid_all(self):
833 # Because of signal coalescing, we must keep calling waitpid() as
834 # long as we're able to reap a child.
835 while True:
836 try:
837 pid, status = os.waitpid(-1, os.WNOHANG)
838 except ChildProcessError:
839 # No more child processes exist.
840 return
841 else:
842 if pid == 0:
843 # A child process is still alive.
844 return
845
846 returncode = self._compute_returncode(status)
847
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800848 with self._lock:
849 try:
850 callback, args = self._callbacks.pop(pid)
851 except KeyError:
852 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800853 if self._forks:
854 # It may not be registered yet.
855 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200856 if self._loop.get_debug():
857 logger.debug('unknown process %s exited '
858 'with returncode %s',
859 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800860 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800861 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200862 else:
863 if self._loop.get_debug():
864 logger.debug('process %s exited with returncode %s',
865 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800866
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800867 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800868 logger.warning(
869 "Caught subprocess termination from unknown pid: "
870 "%d -> %d", pid, returncode)
871 else:
872 callback(pid, returncode, *args)
873
874
875class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
876 """XXX"""
877 _loop_factory = _UnixSelectorEventLoop
878
879 def __init__(self):
880 super().__init__()
881 self._watcher = None
882
883 def _init_watcher(self):
884 with events._lock:
885 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800886 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800887 if isinstance(threading.current_thread(),
888 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800889 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800890
891 def set_event_loop(self, loop):
892 """Set the event loop.
893
894 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800895 .set_event_loop() from the main thread will call .attach_loop(loop) on
896 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800897 """
898
899 super().set_event_loop(loop)
900
901 if self._watcher is not None and \
902 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800903 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800904
905 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200906 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800907
908 If not yet set, a SafeChildWatcher object is automatically created.
909 """
910 if self._watcher is None:
911 self._init_watcher()
912
913 return self._watcher
914
915 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200916 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800917
918 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
919
920 if self._watcher is not None:
921 self._watcher.close()
922
923 self._watcher = watcher
924
925SelectorEventLoop = _UnixSelectorEventLoop
926DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy