blob: 94a46d094cdfe18817006011cb0f7545e00a93cb [file] [log] [blame]
Yury Selivanovdec1a452014-02-18 22:27:48 -05001"""Selector event loop for Unix with signal handling."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07002
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004import os
5import signal
6import socket
7import stat
8import subprocess
9import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080010import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12
Yury Selivanovb057c522014-02-18 12:15:06 -050013from . import base_events
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
16from . import events
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070017from . import selector_events
Victor Stinnere912e652014-07-12 03:11:53 +020018from . import selectors
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070019from . import transports
Victor Stinnerf951d282014-06-29 00:46:45 +020020from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
Victor Stinner915bcb02014-02-01 22:49:59 +010024__all__ = ['SelectorEventLoop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080025 'AbstractChildWatcher', 'SafeChildWatcher',
26 'FastChildWatcher', 'DefaultEventLoopPolicy',
27 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029if sys.platform == 'win32': # pragma: no cover
30 raise ImportError('Signals are not really supported on Windows')
31
32
Victor Stinnerfe5649c2014-07-17 22:43:40 +020033def _sighandler_noop(signum, frame):
34 """Dummy signal handler."""
35 pass
36
37
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080038class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Yury Selivanovb057c522014-02-18 12:15:06 -050039 """Unix event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040
Yury Selivanovb057c522014-02-18 12:15:06 -050041 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042 """
43
44 def __init__(self, selector=None):
45 super().__init__(selector)
46 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48 def _socketpair(self):
49 return socket.socketpair()
50
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080051 def close(self):
Victor Stinnerf328c7d2014-06-23 01:02:37 +020052 super().close()
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080053 for sig in list(self._signal_handlers):
54 self.remove_signal_handler(sig)
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080055
Victor Stinnerfe5649c2014-07-17 22:43:40 +020056 def _process_self_data(self, data):
57 for signum in data:
58 if not signum:
59 # ignore null bytes written by _write_to_self()
60 continue
61 self._handle_signal(signum)
62
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070063 def add_signal_handler(self, sig, callback, *args):
64 """Add a handler for a signal. UNIX only.
65
66 Raise ValueError if the signal number is invalid or uncatchable.
67 Raise RuntimeError if there is a problem setting up the handler.
68 """
69 self._check_signal(sig)
70 try:
71 # set_wakeup_fd() raises ValueError if this is not the
72 # main thread. By calling it early we ensure that an
73 # event loop running in another thread cannot add a signal
74 # handler.
75 signal.set_wakeup_fd(self._csock.fileno())
Victor Stinnerc4c46492014-07-23 18:21:45 +020076 except (ValueError, OSError) as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 raise RuntimeError(str(exc))
78
Yury Selivanov569efa22014-02-18 18:02:19 -050079 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070080 self._signal_handlers[sig] = handle
81
82 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +020083 # Register a dummy signal handler to ask Python to write the signal
84 # number in the wakup file descriptor. _process_self_data() will
85 # read signal numbers from this file descriptor to handle signals.
86 signal.signal(sig, _sighandler_noop)
87
Charles-François Natali74e7cf32013-12-05 22:47:19 +010088 # Set SA_RESTART to limit EINTR occurrences.
89 signal.siginterrupt(sig, False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070090 except OSError as exc:
91 del self._signal_handlers[sig]
92 if not self._signal_handlers:
93 try:
94 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +020095 except (ValueError, OSError) as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070096 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070097
98 if exc.errno == errno.EINVAL:
99 raise RuntimeError('sig {} cannot be caught'.format(sig))
100 else:
101 raise
102
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200103 def _handle_signal(self, sig):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700104 """Internal helper that is the actual signal handler."""
105 handle = self._signal_handlers.get(sig)
106 if handle is None:
107 return # Assume it's some race condition.
108 if handle._cancelled:
109 self.remove_signal_handler(sig) # Remove it properly.
110 else:
111 self._add_callback_signalsafe(handle)
112
113 def remove_signal_handler(self, sig):
114 """Remove a handler for a signal. UNIX only.
115
116 Return True if a signal handler was removed, False if not.
117 """
118 self._check_signal(sig)
119 try:
120 del self._signal_handlers[sig]
121 except KeyError:
122 return False
123
124 if sig == signal.SIGINT:
125 handler = signal.default_int_handler
126 else:
127 handler = signal.SIG_DFL
128
129 try:
130 signal.signal(sig, handler)
131 except OSError as exc:
132 if exc.errno == errno.EINVAL:
133 raise RuntimeError('sig {} cannot be caught'.format(sig))
134 else:
135 raise
136
137 if not self._signal_handlers:
138 try:
139 signal.set_wakeup_fd(-1)
Victor Stinnerc4c46492014-07-23 18:21:45 +0200140 except (ValueError, OSError) as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700141 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142
143 return True
144
145 def _check_signal(self, sig):
146 """Internal helper to validate a signal.
147
148 Raise ValueError if the signal number is invalid or uncatchable.
149 Raise RuntimeError if there is a problem setting up the handler.
150 """
151 if not isinstance(sig, int):
152 raise TypeError('sig must be an int, not {!r}'.format(sig))
153
154 if not (1 <= sig < signal.NSIG):
155 raise ValueError(
156 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
157
158 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
159 extra=None):
160 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
161
162 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
163 extra=None):
164 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
165
Victor Stinnerf951d282014-06-29 00:46:45 +0200166 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 def _make_subprocess_transport(self, protocol, args, shell,
168 stdin, stdout, stderr, bufsize,
169 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800170 with events.get_child_watcher() as watcher:
171 transp = _UnixSubprocessTransport(self, protocol, args, shell,
172 stdin, stdout, stderr, bufsize,
Victor Stinner73f10fd2014-01-29 14:32:20 -0800173 extra=extra, **kwargs)
Guido van Rossum4835f172014-01-10 13:28:59 -0800174 yield from transp._post_init()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800175 watcher.add_child_handler(transp.get_pid(),
176 self._child_watcher_callback, transp)
Guido van Rossum4835f172014-01-10 13:28:59 -0800177
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700178 return transp
179
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800180 def _child_watcher_callback(self, pid, returncode, transp):
181 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182
Victor Stinnerf951d282014-06-29 00:46:45 +0200183 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500184 def create_unix_connection(self, protocol_factory, path, *,
185 ssl=None, sock=None,
186 server_hostname=None):
187 assert server_hostname is None or isinstance(server_hostname, str)
188 if ssl:
189 if server_hostname is None:
190 raise ValueError(
191 'you have to pass server_hostname when using ssl')
192 else:
193 if server_hostname is not None:
194 raise ValueError('server_hostname is only meaningful with ssl')
195
196 if path is not None:
197 if sock is not None:
198 raise ValueError(
199 'path and sock can not be specified at the same time')
200
Victor Stinner79a29522014-02-19 01:45:59 +0100201 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
Yury Selivanovb057c522014-02-18 12:15:06 -0500202 try:
Yury Selivanovb057c522014-02-18 12:15:06 -0500203 sock.setblocking(False)
204 yield from self.sock_connect(sock, path)
Victor Stinner79a29522014-02-19 01:45:59 +0100205 except:
206 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500207 raise
208
209 else:
210 if sock is None:
211 raise ValueError('no path and sock were specified')
212 sock.setblocking(False)
213
214 transport, protocol = yield from self._create_connection_transport(
215 sock, protocol_factory, ssl, server_hostname)
216 return transport, protocol
217
Victor Stinnerf951d282014-06-29 00:46:45 +0200218 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500219 def create_unix_server(self, protocol_factory, path=None, *,
220 sock=None, backlog=100, ssl=None):
221 if isinstance(ssl, bool):
222 raise TypeError('ssl argument must be an SSLContext or None')
223
224 if path is not None:
Victor Stinner1fd03a42014-04-07 11:18:54 +0200225 if sock is not None:
226 raise ValueError(
227 'path and sock can not be specified at the same time')
228
Yury Selivanovb057c522014-02-18 12:15:06 -0500229 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
230
231 try:
232 sock.bind(path)
233 except OSError as exc:
Victor Stinner79a29522014-02-19 01:45:59 +0100234 sock.close()
Yury Selivanovb057c522014-02-18 12:15:06 -0500235 if exc.errno == errno.EADDRINUSE:
236 # Let's improve the error message by adding
237 # with what exact address it occurs.
238 msg = 'Address {!r} is already in use'.format(path)
239 raise OSError(errno.EADDRINUSE, msg) from None
240 else:
241 raise
Victor Stinner223a6242014-06-04 00:11:52 +0200242 except:
243 sock.close()
244 raise
Yury Selivanovb057c522014-02-18 12:15:06 -0500245 else:
246 if sock is None:
247 raise ValueError(
248 'path was not specified, and no sock specified')
249
250 if sock.family != socket.AF_UNIX:
251 raise ValueError(
252 'A UNIX Domain Socket was expected, got {!r}'.format(sock))
253
254 server = base_events.Server(self, [sock])
255 sock.listen(backlog)
256 sock.setblocking(False)
257 self._start_serving(protocol_factory, sock, ssl, server)
258 return server
259
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700260
Victor Stinnerf2ed8892014-07-29 23:08:00 +0200261if hasattr(os, 'set_blocking'):
262 def _set_nonblocking(fd):
263 os.set_blocking(fd, False)
264else:
265 def _set_nonblocking(fd):
266 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
267 flags = flags | os.O_NONBLOCK
268 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700269
270
271class _UnixReadPipeTransport(transports.ReadTransport):
272
Yury Selivanovdec1a452014-02-18 22:27:48 -0500273 max_size = 256 * 1024 # max bytes we read in one event loop iteration
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274
275 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
276 super().__init__(extra)
277 self._extra['pipe'] = pipe
278 self._loop = loop
279 self._pipe = pipe
280 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700281 mode = os.fstat(self._fileno).st_mode
Guido van Rossum02757ea2014-01-10 13:30:04 -0800282 if not (stat.S_ISFIFO(mode) or
283 stat.S_ISSOCK(mode) or
284 stat.S_ISCHR(mode)):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700285 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286 _set_nonblocking(self._fileno)
287 self._protocol = protocol
288 self._closing = False
289 self._loop.add_reader(self._fileno, self._read_ready)
290 self._loop.call_soon(self._protocol.connection_made, self)
291 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200292 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200293 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294
Victor Stinnere912e652014-07-12 03:11:53 +0200295 def __repr__(self):
296 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
297 if self._pipe is not None:
298 polling = selector_events._test_selector_event(
299 self._loop._selector,
300 self._fileno, selectors.EVENT_READ)
301 if polling:
302 info.append('polling')
303 else:
304 info.append('idle')
305 else:
306 info.append('closed')
307 return '<%s>' % ' '.join(info)
308
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 def _read_ready(self):
310 try:
311 data = os.read(self._fileno, self.max_size)
312 except (BlockingIOError, InterruptedError):
313 pass
314 except OSError as exc:
Victor Stinner0ee29c22014-02-19 01:40:41 +0100315 self._fatal_error(exc, 'Fatal read error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 else:
317 if data:
318 self._protocol.data_received(data)
319 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200320 if self._loop.get_debug():
321 logger.info("%r was closed by peer", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 self._closing = True
323 self._loop.remove_reader(self._fileno)
324 self._loop.call_soon(self._protocol.eof_received)
325 self._loop.call_soon(self._call_connection_lost, None)
326
Guido van Rossum57497ad2013-10-18 07:58:20 -0700327 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700328 self._loop.remove_reader(self._fileno)
329
Guido van Rossum57497ad2013-10-18 07:58:20 -0700330 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700331 self._loop.add_reader(self._fileno, self._read_ready)
332
333 def close(self):
334 if not self._closing:
335 self._close(None)
336
Victor Stinner0ee29c22014-02-19 01:40:41 +0100337 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338 # should be called by exception handler only
Guido van Rossum02757ea2014-01-10 13:30:04 -0800339 if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
Yury Selivanov569efa22014-02-18 18:02:19 -0500340 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100341 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500342 'exception': exc,
343 'transport': self,
344 'protocol': self._protocol,
345 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346 self._close(exc)
347
348 def _close(self, exc):
349 self._closing = True
350 self._loop.remove_reader(self._fileno)
351 self._loop.call_soon(self._call_connection_lost, exc)
352
353 def _call_connection_lost(self, exc):
354 try:
355 self._protocol.connection_lost(exc)
356 finally:
357 self._pipe.close()
358 self._pipe = None
359 self._protocol = None
360 self._loop = None
361
362
Yury Selivanov3cb99142014-02-18 18:41:13 -0500363class _UnixWritePipeTransport(transports._FlowControlMixin,
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800364 transports.WriteTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365
366 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
367 super().__init__(extra)
368 self._extra['pipe'] = pipe
369 self._loop = loop
370 self._pipe = pipe
371 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700372 mode = os.fstat(self._fileno).st_mode
373 is_socket = stat.S_ISSOCK(mode)
Victor Stinner8dffc452014-01-25 15:32:06 +0100374 if not (is_socket or
375 stat.S_ISFIFO(mode) or
376 stat.S_ISCHR(mode)):
377 raise ValueError("Pipe transport is only for "
378 "pipes, sockets and character devices")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700379 _set_nonblocking(self._fileno)
380 self._protocol = protocol
381 self._buffer = []
382 self._conn_lost = 0
383 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700384
385 # On AIX, the reader trick only works for sockets.
386 # On other platforms it works for pipes and sockets.
387 # (Exception: OS X 10.4? Issue #19294.)
388 if is_socket or not sys.platform.startswith("aix"):
389 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700390
391 self._loop.call_soon(self._protocol.connection_made, self)
392 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200393 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200394 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395
Victor Stinnere912e652014-07-12 03:11:53 +0200396 def __repr__(self):
397 info = [self.__class__.__name__, 'fd=%s' % self._fileno]
398 if self._pipe is not None:
399 polling = selector_events._test_selector_event(
400 self._loop._selector,
401 self._fileno, selectors.EVENT_WRITE)
402 if polling:
403 info.append('polling')
404 else:
405 info.append('idle')
406
407 bufsize = self.get_write_buffer_size()
408 info.append('bufsize=%s' % bufsize)
409 else:
410 info.append('closed')
411 return '<%s>' % ' '.join(info)
412
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800413 def get_write_buffer_size(self):
414 return sum(len(data) for data in self._buffer)
415
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700417 # Pipe was closed by peer.
Victor Stinnere912e652014-07-12 03:11:53 +0200418 if self._loop.get_debug():
419 logger.info("%r was closed by peer", self)
Victor Stinner61b3c9b2014-01-31 13:04:28 +0100420 if self._buffer:
421 self._close(BrokenPipeError())
422 else:
423 self._close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424
425 def write(self, data):
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800426 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
427 if isinstance(data, bytearray):
428 data = memoryview(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 if not data:
430 return
431
432 if self._conn_lost or self._closing:
433 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700434 logger.warning('pipe closed by peer or '
435 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 self._conn_lost += 1
437 return
438
439 if not self._buffer:
440 # Attempt to send it right away first.
441 try:
442 n = os.write(self._fileno, data)
443 except (BlockingIOError, InterruptedError):
444 n = 0
445 except Exception as exc:
446 self._conn_lost += 1
Victor Stinner0ee29c22014-02-19 01:40:41 +0100447 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448 return
449 if n == len(data):
450 return
451 elif n > 0:
452 data = data[n:]
453 self._loop.add_writer(self._fileno, self._write_ready)
454
455 self._buffer.append(data)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800456 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457
458 def _write_ready(self):
459 data = b''.join(self._buffer)
460 assert data, 'Data should not be empty'
461
462 self._buffer.clear()
463 try:
464 n = os.write(self._fileno, data)
465 except (BlockingIOError, InterruptedError):
466 self._buffer.append(data)
467 except Exception as exc:
468 self._conn_lost += 1
469 # Remove writer here, _fatal_error() doesn't it
470 # because _buffer is empty.
471 self._loop.remove_writer(self._fileno)
Victor Stinner0ee29c22014-02-19 01:40:41 +0100472 self._fatal_error(exc, 'Fatal write error on pipe transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473 else:
474 if n == len(data):
475 self._loop.remove_writer(self._fileno)
Guido van Rossum47fb97e2014-01-29 13:20:39 -0800476 self._maybe_resume_protocol() # May append to buffer.
477 if not self._buffer and self._closing:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700478 self._loop.remove_reader(self._fileno)
479 self._call_connection_lost(None)
480 return
481 elif n > 0:
482 data = data[n:]
483
484 self._buffer.append(data) # Try again later.
485
486 def can_write_eof(self):
487 return True
488
489 # TODO: Make the relationships between write_eof(), close(),
490 # abort(), _fatal_error() and _close() more straightforward.
491
492 def write_eof(self):
493 if self._closing:
494 return
495 assert self._pipe
496 self._closing = True
497 if not self._buffer:
498 self._loop.remove_reader(self._fileno)
499 self._loop.call_soon(self._call_connection_lost, None)
500
501 def close(self):
502 if not self._closing:
503 # write_eof is all what we needed to close the write pipe
504 self.write_eof()
505
506 def abort(self):
507 self._close(None)
508
Victor Stinner0ee29c22014-02-19 01:40:41 +0100509 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700510 # should be called by exception handler only
Victor Stinner63b4d4b2014-01-29 13:12:03 -0800511 if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
Yury Selivanov569efa22014-02-18 18:02:19 -0500512 self._loop.call_exception_handler({
Victor Stinner0ee29c22014-02-19 01:40:41 +0100513 'message': message,
Yury Selivanov569efa22014-02-18 18:02:19 -0500514 'exception': exc,
515 'transport': self,
516 'protocol': self._protocol,
517 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518 self._close(exc)
519
520 def _close(self, exc=None):
521 self._closing = True
522 if self._buffer:
523 self._loop.remove_writer(self._fileno)
524 self._buffer.clear()
525 self._loop.remove_reader(self._fileno)
526 self._loop.call_soon(self._call_connection_lost, exc)
527
528 def _call_connection_lost(self, exc):
529 try:
530 self._protocol.connection_lost(exc)
531 finally:
532 self._pipe.close()
533 self._pipe = None
534 self._protocol = None
535 self._loop = None
536
537
Guido van Rossum59691282013-10-30 14:52:03 -0700538class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700539
Guido van Rossum59691282013-10-30 14:52:03 -0700540 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700541 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700543 # Use a socket pair for stdin, since not all platforms
544 # support selecting read events on the write end of a
545 # socket (which we use in order to detect closing of the
546 # other end). Notably this is needed on AIX, and works
547 # just fine on other platforms.
548 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700549 self._proc = subprocess.Popen(
550 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
551 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700552 if stdin_w is not None:
553 stdin.close()
Victor Stinner2dba23a2014-07-03 00:59:00 +0200554 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800555
556
557class AbstractChildWatcher:
558 """Abstract base class for monitoring child processes.
559
560 Objects derived from this class monitor a collection of subprocesses and
561 report their termination or interruption by a signal.
562
563 New callbacks are registered with .add_child_handler(). Starting a new
564 process must be done within a 'with' block to allow the watcher to suspend
565 its activity until the new process if fully registered (this is needed to
566 prevent a race condition in some implementations).
567
568 Example:
569 with watcher:
570 proc = subprocess.Popen("sleep 1")
571 watcher.add_child_handler(proc.pid, callback)
572
573 Notes:
574 Implementations of this class must be thread-safe.
575
576 Since child watcher objects may catch the SIGCHLD signal and call
577 waitpid(-1), there should be only one active object per process.
578 """
579
580 def add_child_handler(self, pid, callback, *args):
581 """Register a new child handler.
582
583 Arrange for callback(pid, returncode, *args) to be called when
584 process 'pid' terminates. Specifying another callback for the same
585 process replaces the previous handler.
586
Victor Stinneracdb7822014-07-14 18:33:40 +0200587 Note: callback() must be thread-safe.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800588 """
589 raise NotImplementedError()
590
591 def remove_child_handler(self, pid):
592 """Removes the handler for process 'pid'.
593
594 The function returns True if the handler was successfully removed,
595 False if there was nothing to remove."""
596
597 raise NotImplementedError()
598
Guido van Rossum2bcae702013-11-13 15:50:08 -0800599 def attach_loop(self, loop):
600 """Attach the watcher to an event loop.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800601
Guido van Rossum2bcae702013-11-13 15:50:08 -0800602 If the watcher was previously attached to an event loop, then it is
603 first detached before attaching to the new loop.
604
605 Note: loop may be None.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800606 """
607 raise NotImplementedError()
608
609 def close(self):
610 """Close the watcher.
611
612 This must be called to make sure that any underlying resource is freed.
613 """
614 raise NotImplementedError()
615
616 def __enter__(self):
617 """Enter the watcher's context and allow starting new processes
618
619 This function must return self"""
620 raise NotImplementedError()
621
622 def __exit__(self, a, b, c):
623 """Exit the watcher's context"""
624 raise NotImplementedError()
625
626
627class BaseChildWatcher(AbstractChildWatcher):
628
Guido van Rossum2bcae702013-11-13 15:50:08 -0800629 def __init__(self):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800630 self._loop = None
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800631
632 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800633 self.attach_loop(None)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800634
635 def _do_waitpid(self, expected_pid):
636 raise NotImplementedError()
637
638 def _do_waitpid_all(self):
639 raise NotImplementedError()
640
Guido van Rossum2bcae702013-11-13 15:50:08 -0800641 def attach_loop(self, loop):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800642 assert loop is None or isinstance(loop, events.AbstractEventLoop)
643
644 if self._loop is not None:
645 self._loop.remove_signal_handler(signal.SIGCHLD)
646
647 self._loop = loop
648 if loop is not None:
649 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
650
651 # Prevent a race condition in case a child terminated
652 # during the switch.
653 self._do_waitpid_all()
654
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800655 def _sig_chld(self):
656 try:
657 self._do_waitpid_all()
Yury Selivanov569efa22014-02-18 18:02:19 -0500658 except Exception as exc:
659 # self._loop should always be available here
660 # as '_sig_chld' is added as a signal handler
661 # in 'attach_loop'
662 self._loop.call_exception_handler({
663 'message': 'Unknown exception in SIGCHLD handler',
664 'exception': exc,
665 })
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800666
667 def _compute_returncode(self, status):
668 if os.WIFSIGNALED(status):
669 # The child process died because of a signal.
670 return -os.WTERMSIG(status)
671 elif os.WIFEXITED(status):
672 # The child process exited (e.g sys.exit()).
673 return os.WEXITSTATUS(status)
674 else:
675 # The child exited, but we don't understand its status.
676 # This shouldn't happen, but if it does, let's just
677 # return that status; perhaps that helps debug it.
678 return status
679
680
681class SafeChildWatcher(BaseChildWatcher):
682 """'Safe' child watcher implementation.
683
684 This implementation avoids disrupting other code spawning processes by
685 polling explicitly each process in the SIGCHLD handler instead of calling
686 os.waitpid(-1).
687
688 This is a safe solution but it has a significant overhead when handling a
689 big number of children (O(n) each time SIGCHLD is raised)
690 """
691
Guido van Rossum2bcae702013-11-13 15:50:08 -0800692 def __init__(self):
693 super().__init__()
694 self._callbacks = {}
695
696 def close(self):
697 self._callbacks.clear()
698 super().close()
699
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800700 def __enter__(self):
701 return self
702
703 def __exit__(self, a, b, c):
704 pass
705
706 def add_child_handler(self, pid, callback, *args):
707 self._callbacks[pid] = callback, args
708
709 # Prevent a race condition in case the child is already terminated.
710 self._do_waitpid(pid)
711
Guido van Rossum2bcae702013-11-13 15:50:08 -0800712 def remove_child_handler(self, pid):
713 try:
714 del self._callbacks[pid]
715 return True
716 except KeyError:
717 return False
718
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800719 def _do_waitpid_all(self):
720
721 for pid in list(self._callbacks):
722 self._do_waitpid(pid)
723
724 def _do_waitpid(self, expected_pid):
725 assert expected_pid > 0
726
727 try:
728 pid, status = os.waitpid(expected_pid, os.WNOHANG)
729 except ChildProcessError:
730 # The child process is already reaped
731 # (may happen if waitpid() is called elsewhere).
732 pid = expected_pid
733 returncode = 255
734 logger.warning(
735 "Unknown child process pid %d, will report returncode 255",
736 pid)
737 else:
738 if pid == 0:
739 # The child process is still alive.
740 return
741
742 returncode = self._compute_returncode(status)
Victor Stinneracdb7822014-07-14 18:33:40 +0200743 if self._loop.get_debug():
744 logger.debug('process %s exited with returncode %s',
745 expected_pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800746
747 try:
748 callback, args = self._callbacks.pop(pid)
749 except KeyError: # pragma: no cover
750 # May happen if .remove_child_handler() is called
751 # after os.waitpid() returns.
752 pass
753 else:
754 callback(pid, returncode, *args)
755
756
757class FastChildWatcher(BaseChildWatcher):
758 """'Fast' child watcher implementation.
759
760 This implementation reaps every terminated processes by calling
761 os.waitpid(-1) directly, possibly breaking other code spawning processes
762 and waiting for their termination.
763
764 There is no noticeable overhead when handling a big number of children
765 (O(1) each time a child terminates).
766 """
Guido van Rossum2bcae702013-11-13 15:50:08 -0800767 def __init__(self):
768 super().__init__()
769 self._callbacks = {}
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800770 self._lock = threading.Lock()
771 self._zombies = {}
772 self._forks = 0
773
774 def close(self):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800775 self._callbacks.clear()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800776 self._zombies.clear()
Guido van Rossum2bcae702013-11-13 15:50:08 -0800777 super().close()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800778
779 def __enter__(self):
780 with self._lock:
781 self._forks += 1
782
783 return self
784
785 def __exit__(self, a, b, c):
786 with self._lock:
787 self._forks -= 1
788
789 if self._forks or not self._zombies:
790 return
791
792 collateral_victims = str(self._zombies)
793 self._zombies.clear()
794
795 logger.warning(
796 "Caught subprocesses termination from unknown pids: %s",
797 collateral_victims)
798
799 def add_child_handler(self, pid, callback, *args):
800 assert self._forks, "Must use the context manager"
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800801 with self._lock:
802 try:
803 returncode = self._zombies.pop(pid)
804 except KeyError:
805 # The child is running.
806 self._callbacks[pid] = callback, args
807 return
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800808
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800809 # The child is dead already. We can fire the callback.
810 callback(pid, returncode, *args)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800811
Guido van Rossum2bcae702013-11-13 15:50:08 -0800812 def remove_child_handler(self, pid):
813 try:
814 del self._callbacks[pid]
815 return True
816 except KeyError:
817 return False
818
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800819 def _do_waitpid_all(self):
820 # Because of signal coalescing, we must keep calling waitpid() as
821 # long as we're able to reap a child.
822 while True:
823 try:
824 pid, status = os.waitpid(-1, os.WNOHANG)
825 except ChildProcessError:
826 # No more child processes exist.
827 return
828 else:
829 if pid == 0:
830 # A child process is still alive.
831 return
832
833 returncode = self._compute_returncode(status)
834
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800835 with self._lock:
836 try:
837 callback, args = self._callbacks.pop(pid)
838 except KeyError:
839 # unknown child
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800840 if self._forks:
841 # It may not be registered yet.
842 self._zombies[pid] = returncode
Victor Stinneracdb7822014-07-14 18:33:40 +0200843 if self._loop.get_debug():
844 logger.debug('unknown process %s exited '
845 'with returncode %s',
846 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800847 continue
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800848 callback = None
Victor Stinneracdb7822014-07-14 18:33:40 +0200849 else:
850 if self._loop.get_debug():
851 logger.debug('process %s exited with returncode %s',
852 pid, returncode)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800853
Guido van Rossumab27a9f2014-01-25 16:32:17 -0800854 if callback is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800855 logger.warning(
856 "Caught subprocess termination from unknown pid: "
857 "%d -> %d", pid, returncode)
858 else:
859 callback(pid, returncode, *args)
860
861
862class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
863 """XXX"""
864 _loop_factory = _UnixSelectorEventLoop
865
866 def __init__(self):
867 super().__init__()
868 self._watcher = None
869
870 def _init_watcher(self):
871 with events._lock:
872 if self._watcher is None: # pragma: no branch
Guido van Rossum2bcae702013-11-13 15:50:08 -0800873 self._watcher = SafeChildWatcher()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800874 if isinstance(threading.current_thread(),
875 threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800876 self._watcher.attach_loop(self._local._loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800877
878 def set_event_loop(self, loop):
879 """Set the event loop.
880
881 As a side effect, if a child watcher was set before, then calling
Guido van Rossum2bcae702013-11-13 15:50:08 -0800882 .set_event_loop() from the main thread will call .attach_loop(loop) on
883 the child watcher.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800884 """
885
886 super().set_event_loop(loop)
887
888 if self._watcher is not None and \
889 isinstance(threading.current_thread(), threading._MainThread):
Guido van Rossum2bcae702013-11-13 15:50:08 -0800890 self._watcher.attach_loop(loop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800891
892 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200893 """Get the watcher for child processes.
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800894
895 If not yet set, a SafeChildWatcher object is automatically created.
896 """
897 if self._watcher is None:
898 self._init_watcher()
899
900 return self._watcher
901
902 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200903 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800904
905 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
906
907 if self._watcher is not None:
908 self._watcher.close()
909
910 self._watcher = watcher
911
912SelectorEventLoop = _UnixSelectorEventLoop
913DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy