blob: c88b818de62a6f4fda0e9c5af8bc11beff2fcbd9 [file] [log] [blame]
Jingwen Chen475b3cc2021-01-05 21:45:16 -05001"""Selector event loop for Unix with signal handling."""
2
3import errno
4import io
5import itertools
6import os
7import selectors
8import signal
9import socket
10import stat
11import subprocess
12import sys
13import threading
14import warnings
15
16from . import base_events
17from . import base_subprocess
18from . import constants
19from . import coroutines
20from . import events
21from . import exceptions
22from . import futures
23from . import selector_events
24from . import tasks
25from . import transports
26from .log import logger
27
28
29__all__ = (
30 'SelectorEventLoop',
31 'AbstractChildWatcher', 'SafeChildWatcher',
Elliott Hughes96c2b6b2021-01-26 11:15:15 -080032 'FastChildWatcher', 'PidfdChildWatcher',
Jingwen Chen475b3cc2021-01-05 21:45:16 -050033 'MultiLoopChildWatcher', 'ThreadedChildWatcher',
34 'DefaultEventLoopPolicy',
35)
36
37
38if sys.platform == 'win32': # pragma: no cover
39 raise ImportError('Signals are not really supported on Windows')
40
41
42def _sighandler_noop(signum, frame):
43 """Dummy signal handler."""
44 pass
45
46
Dan Willemsenc9fa0012022-03-25 22:58:53 +000047def waitstatus_to_exitcode(status):
48 try:
49 return os.waitstatus_to_exitcode(status)
50 except ValueError:
51 # The child exited, but we don't understand its status.
52 # This shouldn't happen, but if it does, let's just
53 # return that status; perhaps that helps debug it.
54 return status
55
56
Jingwen Chen475b3cc2021-01-05 21:45:16 -050057class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
58 """Unix event loop.
59
60 Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
61 """
62
63 def __init__(self, selector=None):
64 super().__init__(selector)
65 self._signal_handlers = {}
66
67 def close(self):
68 super().close()
69 if not sys.is_finalizing():
70 for sig in list(self._signal_handlers):
71 self.remove_signal_handler(sig)
72 else:
73 if self._signal_handlers:
74 warnings.warn(f"Closing the loop {self!r} "
75 f"on interpreter shutdown "
76 f"stage, skipping signal handlers removal",
77 ResourceWarning,
78 source=self)
79 self._signal_handlers.clear()
80
81 def _process_self_data(self, data):
82 for signum in data:
83 if not signum:
84 # ignore null bytes written by _write_to_self()
85 continue
86 self._handle_signal(signum)
87
88 def add_signal_handler(self, sig, callback, *args):
89 """Add a handler for a signal. UNIX only.
90
91 Raise ValueError if the signal number is invalid or uncatchable.
92 Raise RuntimeError if there is a problem setting up the handler.
93 """
94 if (coroutines.iscoroutine(callback) or
95 coroutines.iscoroutinefunction(callback)):
96 raise TypeError("coroutines cannot be used "
97 "with add_signal_handler()")
98 self._check_signal(sig)
99 self._check_closed()
100 try:
101 # set_wakeup_fd() raises ValueError if this is not the
102 # main thread. By calling it early we ensure that an
103 # event loop running in another thread cannot add a signal
104 # handler.
105 signal.set_wakeup_fd(self._csock.fileno())
106 except (ValueError, OSError) as exc:
107 raise RuntimeError(str(exc))
108
109 handle = events.Handle(callback, args, self, None)
110 self._signal_handlers[sig] = handle
111
112 try:
113 # Register a dummy signal handler to ask Python to write the signal
114 # number in the wakeup file descriptor. _process_self_data() will
115 # read signal numbers from this file descriptor to handle signals.
116 signal.signal(sig, _sighandler_noop)
117
118 # Set SA_RESTART to limit EINTR occurrences.
119 signal.siginterrupt(sig, False)
120 except OSError as exc:
121 del self._signal_handlers[sig]
122 if not self._signal_handlers:
123 try:
124 signal.set_wakeup_fd(-1)
125 except (ValueError, OSError) as nexc:
126 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
127
128 if exc.errno == errno.EINVAL:
129 raise RuntimeError(f'sig {sig} cannot be caught')
130 else:
131 raise
132
133 def _handle_signal(self, sig):
134 """Internal helper that is the actual signal handler."""
135 handle = self._signal_handlers.get(sig)
136 if handle is None:
137 return # Assume it's some race condition.
138 if handle._cancelled:
139 self.remove_signal_handler(sig) # Remove it properly.
140 else:
141 self._add_callback_signalsafe(handle)
142
143 def remove_signal_handler(self, sig):
144 """Remove a handler for a signal. UNIX only.
145
146 Return True if a signal handler was removed, False if not.
147 """
148 self._check_signal(sig)
149 try:
150 del self._signal_handlers[sig]
151 except KeyError:
152 return False
153
154 if sig == signal.SIGINT:
155 handler = signal.default_int_handler
156 else:
157 handler = signal.SIG_DFL
158
159 try:
160 signal.signal(sig, handler)
161 except OSError as exc:
162 if exc.errno == errno.EINVAL:
163 raise RuntimeError(f'sig {sig} cannot be caught')
164 else:
165 raise
166
167 if not self._signal_handlers:
168 try:
169 signal.set_wakeup_fd(-1)
170 except (ValueError, OSError) as exc:
171 logger.info('set_wakeup_fd(-1) failed: %s', exc)
172
173 return True
174
175 def _check_signal(self, sig):
176 """Internal helper to validate a signal.
177
178 Raise ValueError if the signal number is invalid or uncatchable.
179 Raise RuntimeError if there is a problem setting up the handler.
180 """
181 if not isinstance(sig, int):
182 raise TypeError(f'sig must be an int, not {sig!r}')
183
184 if sig not in signal.valid_signals():
185 raise ValueError(f'invalid signal number {sig}')
186
187 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
188 extra=None):
189 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
190
191 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
192 extra=None):
193 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
194
195 async def _make_subprocess_transport(self, protocol, args, shell,
196 stdin, stdout, stderr, bufsize,
197 extra=None, **kwargs):
198 with events.get_child_watcher() as watcher:
199 if not watcher.is_active():
200 # Check early.
201 # Raising exception before process creation
202 # prevents subprocess execution if the watcher
203 # is not ready to handle it.
204 raise RuntimeError("asyncio.get_child_watcher() is not activated, "
205 "subprocess support is not installed.")
206 waiter = self.create_future()
207 transp = _UnixSubprocessTransport(self, protocol, args, shell,
208 stdin, stdout, stderr, bufsize,
209 waiter=waiter, extra=extra,
210 **kwargs)
211
212 watcher.add_child_handler(transp.get_pid(),
213 self._child_watcher_callback, transp)
214 try:
215 await waiter
216 except (SystemExit, KeyboardInterrupt):
217 raise
218 except BaseException:
219 transp.close()
220 await transp._wait()
221 raise
222
223 return transp
224
225 def _child_watcher_callback(self, pid, returncode, transp):
226 self.call_soon_threadsafe(transp._process_exited, returncode)
227
228 async def create_unix_connection(
229 self, protocol_factory, path=None, *,
230 ssl=None, sock=None,
231 server_hostname=None,
232 ssl_handshake_timeout=None):
233 assert server_hostname is None or isinstance(server_hostname, str)
234 if ssl:
235 if server_hostname is None:
236 raise ValueError(
237 'you have to pass server_hostname when using ssl')
238 else:
239 if server_hostname is not None:
240 raise ValueError('server_hostname is only meaningful with ssl')
241 if ssl_handshake_timeout is not None:
242 raise ValueError(
243 'ssl_handshake_timeout is only meaningful with ssl')
244
245 if path is not None:
246 if sock is not None:
247 raise ValueError(
248 'path and sock can not be specified at the same time')
249
250 path = os.fspath(path)
251 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
252 try:
253 sock.setblocking(False)
254 await self.sock_connect(sock, path)
255 except:
256 sock.close()
257 raise
258
259 else:
260 if sock is None:
261 raise ValueError('no path and sock were specified')
262 if (sock.family != socket.AF_UNIX or
263 sock.type != socket.SOCK_STREAM):
264 raise ValueError(
265 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
266 sock.setblocking(False)
267
268 transport, protocol = await self._create_connection_transport(
269 sock, protocol_factory, ssl, server_hostname,
270 ssl_handshake_timeout=ssl_handshake_timeout)
271 return transport, protocol
272
273 async def create_unix_server(
274 self, protocol_factory, path=None, *,
275 sock=None, backlog=100, ssl=None,
276 ssl_handshake_timeout=None,
277 start_serving=True):
278 if isinstance(ssl, bool):
279 raise TypeError('ssl argument must be an SSLContext or None')
280
281 if ssl_handshake_timeout is not None and not ssl:
282 raise ValueError(
283 'ssl_handshake_timeout is only meaningful with ssl')
284
285 if path is not None:
286 if sock is not None:
287 raise ValueError(
288 'path and sock can not be specified at the same time')
289
290 path = os.fspath(path)
291 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
292
293 # Check for abstract socket. `str` and `bytes` paths are supported.
294 if path[0] not in (0, '\x00'):
295 try:
296 if stat.S_ISSOCK(os.stat(path).st_mode):
297 os.remove(path)
298 except FileNotFoundError:
299 pass
300 except OSError as err:
301 # Directory may have permissions only to create socket.
302 logger.error('Unable to check or remove stale UNIX socket '
303 '%r: %r', path, err)
304
305 try:
306 sock.bind(path)
307 except OSError as exc:
308 sock.close()
309 if exc.errno == errno.EADDRINUSE:
310 # Let's improve the error message by adding
311 # with what exact address it occurs.
312 msg = f'Address {path!r} is already in use'
313 raise OSError(errno.EADDRINUSE, msg) from None
314 else:
315 raise
316 except:
317 sock.close()
318 raise
319 else:
320 if sock is None:
321 raise ValueError(
322 'path was not specified, and no sock specified')
323
324 if (sock.family != socket.AF_UNIX or
325 sock.type != socket.SOCK_STREAM):
326 raise ValueError(
327 f'A UNIX Domain Stream Socket was expected, got {sock!r}')
328
329 sock.setblocking(False)
330 server = base_events.Server(self, [sock], protocol_factory,
331 ssl, backlog, ssl_handshake_timeout)
332 if start_serving:
333 server._start_serving()
334 # Skip one loop iteration so that all 'loop.add_reader'
335 # go through.
Dan Willemsenc9fa0012022-03-25 22:58:53 +0000336 await tasks.sleep(0)
Jingwen Chen475b3cc2021-01-05 21:45:16 -0500337
338 return server
339
340 async def _sock_sendfile_native(self, sock, file, offset, count):
341 try:
342 os.sendfile
Elliott Hughes96c2b6b2021-01-26 11:15:15 -0800343 except AttributeError:
Jingwen Chen475b3cc2021-01-05 21:45:16 -0500344 raise exceptions.SendfileNotAvailableError(
345 "os.sendfile() is not available")
346 try:
347 fileno = file.fileno()
348 except (AttributeError, io.UnsupportedOperation) as err:
349 raise exceptions.SendfileNotAvailableError("not a regular file")
350 try:
351 fsize = os.fstat(fileno).st_size
Elliott Hughes96c2b6b2021-01-26 11:15:15 -0800352 except OSError:
Jingwen Chen475b3cc2021-01-05 21:45:16 -0500353 raise exceptions.SendfileNotAvailableError("not a regular file")
354 blocksize = count if count else fsize
355 if not blocksize:
356 return 0 # empty file
357
358 fut = self.create_future()
359 self._sock_sendfile_native_impl(fut, None, sock, fileno,
360 offset, count, blocksize, 0)
361 return await fut
362
363 def _sock_sendfile_native_impl(self, fut, registered_fd, sock, fileno,
364 offset, count, blocksize, total_sent):
365 fd = sock.fileno()
366 if registered_fd is not None:
367 # Remove the callback early. It should be rare that the
368 # selector says the fd is ready but the call still returns
369 # EAGAIN, and I am willing to take a hit in that case in
370 # order to simplify the common case.
371 self.remove_writer(registered_fd)
372 if fut.cancelled():
373 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
374 return
375 if count:
376 blocksize = count - total_sent
377 if blocksize <= 0:
378 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
379 fut.set_result(total_sent)
380 return
381
382 try:
383 sent = os.sendfile(fd, fileno, offset, blocksize)
384 except (BlockingIOError, InterruptedError):
385 if registered_fd is None:
386 self._sock_add_cancellation_callback(fut, sock)
387 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
388 fd, sock, fileno,
389 offset, count, blocksize, total_sent)
390 except OSError as exc:
391 if (registered_fd is not None and
392 exc.errno == errno.ENOTCONN and
393 type(exc) is not ConnectionError):
394 # If we have an ENOTCONN and this isn't a first call to
395 # sendfile(), i.e. the connection was closed in the middle
396 # of the operation, normalize the error to ConnectionError
397 # to make it consistent across all Posix systems.
398 new_exc = ConnectionError(
399 "socket is not connected", errno.ENOTCONN)
400 new_exc.__cause__ = exc
401 exc = new_exc
402 if total_sent == 0:
403 # We can get here for different reasons, the main
404 # one being 'file' is not a regular mmap(2)-like
405 # file, in which case we'll fall back on using
406 # plain send().
407 err = exceptions.SendfileNotAvailableError(
408 "os.sendfile call failed")
409 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
410 fut.set_exception(err)
411 else:
412 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
413 fut.set_exception(exc)
414 except (SystemExit, KeyboardInterrupt):
415 raise
416 except BaseException as exc:
417 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
418 fut.set_exception(exc)
419 else:
420 if sent == 0:
421 # EOF
422 self._sock_sendfile_update_filepos(fileno, offset, total_sent)
423 fut.set_result(total_sent)
424 else:
425 offset += sent
426 total_sent += sent
427 if registered_fd is None:
428 self._sock_add_cancellation_callback(fut, sock)
429 self.add_writer(fd, self._sock_sendfile_native_impl, fut,
430 fd, sock, fileno,
431 offset, count, blocksize, total_sent)
432
433 def _sock_sendfile_update_filepos(self, fileno, offset, total_sent):
434 if total_sent > 0:
435 os.lseek(fileno, offset, os.SEEK_SET)
436
437 def _sock_add_cancellation_callback(self, fut, sock):
438 def cb(fut):
439 if fut.cancelled():
440 fd = sock.fileno()
441 if fd != -1:
442 self.remove_writer(fd)
443 fut.add_done_callback(cb)
444
445
446class _UnixReadPipeTransport(transports.ReadTransport):
447
448 max_size = 256 * 1024 # max bytes we read in one event loop iteration
449
450 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
451 super().__init__(extra)
452 self._extra['pipe'] = pipe
453 self._loop = loop
454 self._pipe = pipe
455 self._fileno = pipe.fileno()
456 self._protocol = protocol
457 self._closing = False
458 self._paused = False
459
460 mode = os.fstat(self._fileno).st_mode
461 if not (stat.S_ISFIFO(mode) or
462 stat.S_ISSOCK(mode) or
463 stat.S_ISCHR(mode)):
464 self._pipe = None
465 self._fileno = None
466 self._protocol = None
467 raise ValueError("Pipe transport is for pipes/sockets only.")
468
469 os.set_blocking(self._fileno, False)
470
471 self._loop.call_soon(self._protocol.connection_made, self)
472 # only start reading when connection_made() has been called
473 self._loop.call_soon(self._loop._add_reader,
474 self._fileno, self._read_ready)
475 if waiter is not None:
476 # only wake up the waiter when connection_made() has been called
477 self._loop.call_soon(futures._set_result_unless_cancelled,
478 waiter, None)
479
480 def __repr__(self):
481 info = [self.__class__.__name__]
482 if self._pipe is None:
483 info.append('closed')
484 elif self._closing:
485 info.append('closing')
486 info.append(f'fd={self._fileno}')
487 selector = getattr(self._loop, '_selector', None)
488 if self._pipe is not None and selector is not None:
489 polling = selector_events._test_selector_event(
490 selector, self._fileno, selectors.EVENT_READ)
491 if polling:
492 info.append('polling')
493 else:
494 info.append('idle')
495 elif self._pipe is not None:
496 info.append('open')
497 else:
498 info.append('closed')
499 return '<{}>'.format(' '.join(info))
500
501 def _read_ready(self):
502 try:
503 data = os.read(self._fileno, self.max_size)
504 except (BlockingIOError, InterruptedError):
505 pass
506 except OSError as exc:
507 self._fatal_error(exc, 'Fatal read error on pipe transport')
508 else:
509 if data:
510 self._protocol.data_received(data)
511 else:
512 if self._loop.get_debug():
513 logger.info("%r was closed by peer", self)
514 self._closing = True
515 self._loop._remove_reader(self._fileno)
516 self._loop.call_soon(self._protocol.eof_received)
517 self._loop.call_soon(self._call_connection_lost, None)
518
519 def pause_reading(self):
520 if self._closing or self._paused:
521 return
522 self._paused = True
523 self._loop._remove_reader(self._fileno)
524 if self._loop.get_debug():
525 logger.debug("%r pauses reading", self)
526
527 def resume_reading(self):
528 if self._closing or not self._paused:
529 return
530 self._paused = False
531 self._loop._add_reader(self._fileno, self._read_ready)
532 if self._loop.get_debug():
533 logger.debug("%r resumes reading", self)
534
535 def set_protocol(self, protocol):
536 self._protocol = protocol
537
538 def get_protocol(self):
539 return self._protocol
540
541 def is_closing(self):
542 return self._closing
543
544 def close(self):
545 if not self._closing:
546 self._close(None)
547
548 def __del__(self, _warn=warnings.warn):
549 if self._pipe is not None:
550 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
551 self._pipe.close()
552
553 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
554 # should be called by exception handler only
555 if (isinstance(exc, OSError) and exc.errno == errno.EIO):
556 if self._loop.get_debug():
557 logger.debug("%r: %s", self, message, exc_info=True)
558 else:
559 self._loop.call_exception_handler({
560 'message': message,
561 'exception': exc,
562 'transport': self,
563 'protocol': self._protocol,
564 })
565 self._close(exc)
566
567 def _close(self, exc):
568 self._closing = True
569 self._loop._remove_reader(self._fileno)
570 self._loop.call_soon(self._call_connection_lost, exc)
571
572 def _call_connection_lost(self, exc):
573 try:
574 self._protocol.connection_lost(exc)
575 finally:
576 self._pipe.close()
577 self._pipe = None
578 self._protocol = None
579 self._loop = None
580
581
582class _UnixWritePipeTransport(transports._FlowControlMixin,
583 transports.WriteTransport):
584
585 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
586 super().__init__(extra, loop)
587 self._extra['pipe'] = pipe
588 self._pipe = pipe
589 self._fileno = pipe.fileno()
590 self._protocol = protocol
591 self._buffer = bytearray()
592 self._conn_lost = 0
593 self._closing = False # Set when close() or write_eof() called.
594
595 mode = os.fstat(self._fileno).st_mode
596 is_char = stat.S_ISCHR(mode)
597 is_fifo = stat.S_ISFIFO(mode)
598 is_socket = stat.S_ISSOCK(mode)
599 if not (is_char or is_fifo or is_socket):
600 self._pipe = None
601 self._fileno = None
602 self._protocol = None
603 raise ValueError("Pipe transport is only for "
604 "pipes, sockets and character devices")
605
606 os.set_blocking(self._fileno, False)
607 self._loop.call_soon(self._protocol.connection_made, self)
608
609 # On AIX, the reader trick (to be notified when the read end of the
610 # socket is closed) only works for sockets. On other platforms it
611 # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
612 if is_socket or (is_fifo and not sys.platform.startswith("aix")):
613 # only start reading when connection_made() has been called
614 self._loop.call_soon(self._loop._add_reader,
615 self._fileno, self._read_ready)
616
617 if waiter is not None:
618 # only wake up the waiter when connection_made() has been called
619 self._loop.call_soon(futures._set_result_unless_cancelled,
620 waiter, None)
621
622 def __repr__(self):
623 info = [self.__class__.__name__]
624 if self._pipe is None:
625 info.append('closed')
626 elif self._closing:
627 info.append('closing')
628 info.append(f'fd={self._fileno}')
629 selector = getattr(self._loop, '_selector', None)
630 if self._pipe is not None and selector is not None:
631 polling = selector_events._test_selector_event(
632 selector, self._fileno, selectors.EVENT_WRITE)
633 if polling:
634 info.append('polling')
635 else:
636 info.append('idle')
637
638 bufsize = self.get_write_buffer_size()
639 info.append(f'bufsize={bufsize}')
640 elif self._pipe is not None:
641 info.append('open')
642 else:
643 info.append('closed')
644 return '<{}>'.format(' '.join(info))
645
646 def get_write_buffer_size(self):
647 return len(self._buffer)
648
649 def _read_ready(self):
650 # Pipe was closed by peer.
651 if self._loop.get_debug():
652 logger.info("%r was closed by peer", self)
653 if self._buffer:
654 self._close(BrokenPipeError())
655 else:
656 self._close()
657
658 def write(self, data):
659 assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
660 if isinstance(data, bytearray):
661 data = memoryview(data)
662 if not data:
663 return
664
665 if self._conn_lost or self._closing:
666 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
667 logger.warning('pipe closed by peer or '
668 'os.write(pipe, data) raised exception.')
669 self._conn_lost += 1
670 return
671
672 if not self._buffer:
673 # Attempt to send it right away first.
674 try:
675 n = os.write(self._fileno, data)
676 except (BlockingIOError, InterruptedError):
677 n = 0
678 except (SystemExit, KeyboardInterrupt):
679 raise
680 except BaseException as exc:
681 self._conn_lost += 1
682 self._fatal_error(exc, 'Fatal write error on pipe transport')
683 return
684 if n == len(data):
685 return
686 elif n > 0:
687 data = memoryview(data)[n:]
688 self._loop._add_writer(self._fileno, self._write_ready)
689
690 self._buffer += data
691 self._maybe_pause_protocol()
692
693 def _write_ready(self):
694 assert self._buffer, 'Data should not be empty'
695
696 try:
697 n = os.write(self._fileno, self._buffer)
698 except (BlockingIOError, InterruptedError):
699 pass
700 except (SystemExit, KeyboardInterrupt):
701 raise
702 except BaseException as exc:
703 self._buffer.clear()
704 self._conn_lost += 1
705 # Remove writer here, _fatal_error() doesn't it
706 # because _buffer is empty.
707 self._loop._remove_writer(self._fileno)
708 self._fatal_error(exc, 'Fatal write error on pipe transport')
709 else:
710 if n == len(self._buffer):
711 self._buffer.clear()
712 self._loop._remove_writer(self._fileno)
713 self._maybe_resume_protocol() # May append to buffer.
714 if self._closing:
715 self._loop._remove_reader(self._fileno)
716 self._call_connection_lost(None)
717 return
718 elif n > 0:
719 del self._buffer[:n]
720
721 def can_write_eof(self):
722 return True
723
724 def write_eof(self):
725 if self._closing:
726 return
727 assert self._pipe
728 self._closing = True
729 if not self._buffer:
730 self._loop._remove_reader(self._fileno)
731 self._loop.call_soon(self._call_connection_lost, None)
732
733 def set_protocol(self, protocol):
734 self._protocol = protocol
735
736 def get_protocol(self):
737 return self._protocol
738
739 def is_closing(self):
740 return self._closing
741
742 def close(self):
743 if self._pipe is not None and not self._closing:
744 # write_eof is all what we needed to close the write pipe
745 self.write_eof()
746
747 def __del__(self, _warn=warnings.warn):
748 if self._pipe is not None:
749 _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
750 self._pipe.close()
751
752 def abort(self):
753 self._close(None)
754
755 def _fatal_error(self, exc, message='Fatal error on pipe transport'):
756 # should be called by exception handler only
757 if isinstance(exc, OSError):
758 if self._loop.get_debug():
759 logger.debug("%r: %s", self, message, exc_info=True)
760 else:
761 self._loop.call_exception_handler({
762 'message': message,
763 'exception': exc,
764 'transport': self,
765 'protocol': self._protocol,
766 })
767 self._close(exc)
768
769 def _close(self, exc=None):
770 self._closing = True
771 if self._buffer:
772 self._loop._remove_writer(self._fileno)
773 self._buffer.clear()
774 self._loop._remove_reader(self._fileno)
775 self._loop.call_soon(self._call_connection_lost, exc)
776
777 def _call_connection_lost(self, exc):
778 try:
779 self._protocol.connection_lost(exc)
780 finally:
781 self._pipe.close()
782 self._pipe = None
783 self._protocol = None
784 self._loop = None
785
786
787class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
788
789 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
790 stdin_w = None
791 if stdin == subprocess.PIPE:
792 # Use a socket pair for stdin, since not all platforms
793 # support selecting read events on the write end of a
794 # socket (which we use in order to detect closing of the
795 # other end). Notably this is needed on AIX, and works
796 # just fine on other platforms.
797 stdin, stdin_w = socket.socketpair()
798 try:
799 self._proc = subprocess.Popen(
800 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
801 universal_newlines=False, bufsize=bufsize, **kwargs)
802 if stdin_w is not None:
803 stdin.close()
804 self._proc.stdin = open(stdin_w.detach(), 'wb', buffering=bufsize)
805 stdin_w = None
806 finally:
807 if stdin_w is not None:
808 stdin.close()
809 stdin_w.close()
810
811
812class AbstractChildWatcher:
813 """Abstract base class for monitoring child processes.
814
815 Objects derived from this class monitor a collection of subprocesses and
816 report their termination or interruption by a signal.
817
818 New callbacks are registered with .add_child_handler(). Starting a new
819 process must be done within a 'with' block to allow the watcher to suspend
820 its activity until the new process if fully registered (this is needed to
821 prevent a race condition in some implementations).
822
823 Example:
824 with watcher:
825 proc = subprocess.Popen("sleep 1")
826 watcher.add_child_handler(proc.pid, callback)
827
828 Notes:
829 Implementations of this class must be thread-safe.
830
831 Since child watcher objects may catch the SIGCHLD signal and call
832 waitpid(-1), there should be only one active object per process.
833 """
834
835 def add_child_handler(self, pid, callback, *args):
836 """Register a new child handler.
837
838 Arrange for callback(pid, returncode, *args) to be called when
839 process 'pid' terminates. Specifying another callback for the same
840 process replaces the previous handler.
841
842 Note: callback() must be thread-safe.
843 """
844 raise NotImplementedError()
845
846 def remove_child_handler(self, pid):
847 """Removes the handler for process 'pid'.
848
849 The function returns True if the handler was successfully removed,
850 False if there was nothing to remove."""
851
852 raise NotImplementedError()
853
854 def attach_loop(self, loop):
855 """Attach the watcher to an event loop.
856
857 If the watcher was previously attached to an event loop, then it is
858 first detached before attaching to the new loop.
859
860 Note: loop may be None.
861 """
862 raise NotImplementedError()
863
864 def close(self):
865 """Close the watcher.
866
867 This must be called to make sure that any underlying resource is freed.
868 """
869 raise NotImplementedError()
870
871 def is_active(self):
872 """Return ``True`` if the watcher is active and is used by the event loop.
873
874 Return True if the watcher is installed and ready to handle process exit
875 notifications.
876
877 """
878 raise NotImplementedError()
879
880 def __enter__(self):
881 """Enter the watcher's context and allow starting new processes
882
883 This function must return self"""
884 raise NotImplementedError()
885
886 def __exit__(self, a, b, c):
887 """Exit the watcher's context"""
888 raise NotImplementedError()
889
890
Elliott Hughes96c2b6b2021-01-26 11:15:15 -0800891class PidfdChildWatcher(AbstractChildWatcher):
892 """Child watcher implementation using Linux's pid file descriptors.
893
894 This child watcher polls process file descriptors (pidfds) to await child
895 process termination. In some respects, PidfdChildWatcher is a "Goldilocks"
896 child watcher implementation. It doesn't require signals or threads, doesn't
897 interfere with any processes launched outside the event loop, and scales
898 linearly with the number of subprocesses launched by the event loop. The
899 main disadvantage is that pidfds are specific to Linux, and only work on
900 recent (5.3+) kernels.
901 """
902
903 def __init__(self):
904 self._loop = None
905 self._callbacks = {}
906
907 def __enter__(self):
908 return self
909
910 def __exit__(self, exc_type, exc_value, exc_traceback):
911 pass
912
913 def is_active(self):
914 return self._loop is not None and self._loop.is_running()
915
916 def close(self):
917 self.attach_loop(None)
918
919 def attach_loop(self, loop):
920 if self._loop is not None and loop is None and self._callbacks:
921 warnings.warn(
922 'A loop is being detached '
923 'from a child watcher with pending handlers',
924 RuntimeWarning)
925 for pidfd, _, _ in self._callbacks.values():
926 self._loop._remove_reader(pidfd)
927 os.close(pidfd)
928 self._callbacks.clear()
929 self._loop = loop
930
931 def add_child_handler(self, pid, callback, *args):
932 existing = self._callbacks.get(pid)
933 if existing is not None:
934 self._callbacks[pid] = existing[0], callback, args
935 else:
936 pidfd = os.pidfd_open(pid)
937 self._loop._add_reader(pidfd, self._do_wait, pid)
938 self._callbacks[pid] = pidfd, callback, args
939
940 def _do_wait(self, pid):
941 pidfd, callback, args = self._callbacks.pop(pid)
942 self._loop._remove_reader(pidfd)
943 try:
944 _, status = os.waitpid(pid, 0)
945 except ChildProcessError:
946 # The child process is already reaped
947 # (may happen if waitpid() is called elsewhere).
948 returncode = 255
949 logger.warning(
950 "child process pid %d exit status already read: "
951 " will report returncode 255",
952 pid)
953 else:
Dan Willemsenc9fa0012022-03-25 22:58:53 +0000954 returncode = waitstatus_to_exitcode(status)
Elliott Hughes96c2b6b2021-01-26 11:15:15 -0800955
956 os.close(pidfd)
957 callback(pid, returncode, *args)
958
959 def remove_child_handler(self, pid):
960 try:
961 pidfd, _, _ = self._callbacks.pop(pid)
962 except KeyError:
963 return False
964 self._loop._remove_reader(pidfd)
965 os.close(pidfd)
966 return True
967
968
Jingwen Chen475b3cc2021-01-05 21:45:16 -0500969class BaseChildWatcher(AbstractChildWatcher):
970
971 def __init__(self):
972 self._loop = None
973 self._callbacks = {}
974
975 def close(self):
976 self.attach_loop(None)
977
978 def is_active(self):
979 return self._loop is not None and self._loop.is_running()
980
981 def _do_waitpid(self, expected_pid):
982 raise NotImplementedError()
983
984 def _do_waitpid_all(self):
985 raise NotImplementedError()
986
987 def attach_loop(self, loop):
988 assert loop is None or isinstance(loop, events.AbstractEventLoop)
989
990 if self._loop is not None and loop is None and self._callbacks:
991 warnings.warn(
992 'A loop is being detached '
993 'from a child watcher with pending handlers',
994 RuntimeWarning)
995
996 if self._loop is not None:
997 self._loop.remove_signal_handler(signal.SIGCHLD)
998
999 self._loop = loop
1000 if loop is not None:
1001 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
1002
1003 # Prevent a race condition in case a child terminated
1004 # during the switch.
1005 self._do_waitpid_all()
1006
1007 def _sig_chld(self):
1008 try:
1009 self._do_waitpid_all()
1010 except (SystemExit, KeyboardInterrupt):
1011 raise
1012 except BaseException as exc:
1013 # self._loop should always be available here
1014 # as '_sig_chld' is added as a signal handler
1015 # in 'attach_loop'
1016 self._loop.call_exception_handler({
1017 'message': 'Unknown exception in SIGCHLD handler',
1018 'exception': exc,
1019 })
1020
1021
1022class SafeChildWatcher(BaseChildWatcher):
1023 """'Safe' child watcher implementation.
1024
1025 This implementation avoids disrupting other code spawning processes by
1026 polling explicitly each process in the SIGCHLD handler instead of calling
1027 os.waitpid(-1).
1028
1029 This is a safe solution but it has a significant overhead when handling a
1030 big number of children (O(n) each time SIGCHLD is raised)
1031 """
1032
1033 def close(self):
1034 self._callbacks.clear()
1035 super().close()
1036
1037 def __enter__(self):
1038 return self
1039
1040 def __exit__(self, a, b, c):
1041 pass
1042
1043 def add_child_handler(self, pid, callback, *args):
1044 self._callbacks[pid] = (callback, args)
1045
1046 # Prevent a race condition in case the child is already terminated.
1047 self._do_waitpid(pid)
1048
1049 def remove_child_handler(self, pid):
1050 try:
1051 del self._callbacks[pid]
1052 return True
1053 except KeyError:
1054 return False
1055
1056 def _do_waitpid_all(self):
1057
1058 for pid in list(self._callbacks):
1059 self._do_waitpid(pid)
1060
1061 def _do_waitpid(self, expected_pid):
1062 assert expected_pid > 0
1063
1064 try:
1065 pid, status = os.waitpid(expected_pid, os.WNOHANG)
1066 except ChildProcessError:
1067 # The child process is already reaped
1068 # (may happen if waitpid() is called elsewhere).
1069 pid = expected_pid
1070 returncode = 255
1071 logger.warning(
1072 "Unknown child process pid %d, will report returncode 255",
1073 pid)
1074 else:
1075 if pid == 0:
1076 # The child process is still alive.
1077 return
1078
Dan Willemsenc9fa0012022-03-25 22:58:53 +00001079 returncode = waitstatus_to_exitcode(status)
Jingwen Chen475b3cc2021-01-05 21:45:16 -05001080 if self._loop.get_debug():
1081 logger.debug('process %s exited with returncode %s',
1082 expected_pid, returncode)
1083
1084 try:
1085 callback, args = self._callbacks.pop(pid)
1086 except KeyError: # pragma: no cover
1087 # May happen if .remove_child_handler() is called
1088 # after os.waitpid() returns.
1089 if self._loop.get_debug():
1090 logger.warning("Child watcher got an unexpected pid: %r",
1091 pid, exc_info=True)
1092 else:
1093 callback(pid, returncode, *args)
1094
1095
1096class FastChildWatcher(BaseChildWatcher):
1097 """'Fast' child watcher implementation.
1098
1099 This implementation reaps every terminated processes by calling
1100 os.waitpid(-1) directly, possibly breaking other code spawning processes
1101 and waiting for their termination.
1102
1103 There is no noticeable overhead when handling a big number of children
1104 (O(1) each time a child terminates).
1105 """
1106 def __init__(self):
1107 super().__init__()
1108 self._lock = threading.Lock()
1109 self._zombies = {}
1110 self._forks = 0
1111
1112 def close(self):
1113 self._callbacks.clear()
1114 self._zombies.clear()
1115 super().close()
1116
1117 def __enter__(self):
1118 with self._lock:
1119 self._forks += 1
1120
1121 return self
1122
1123 def __exit__(self, a, b, c):
1124 with self._lock:
1125 self._forks -= 1
1126
1127 if self._forks or not self._zombies:
1128 return
1129
1130 collateral_victims = str(self._zombies)
1131 self._zombies.clear()
1132
1133 logger.warning(
1134 "Caught subprocesses termination from unknown pids: %s",
1135 collateral_victims)
1136
1137 def add_child_handler(self, pid, callback, *args):
1138 assert self._forks, "Must use the context manager"
1139
1140 with self._lock:
1141 try:
1142 returncode = self._zombies.pop(pid)
1143 except KeyError:
1144 # The child is running.
1145 self._callbacks[pid] = callback, args
1146 return
1147
1148 # The child is dead already. We can fire the callback.
1149 callback(pid, returncode, *args)
1150
1151 def remove_child_handler(self, pid):
1152 try:
1153 del self._callbacks[pid]
1154 return True
1155 except KeyError:
1156 return False
1157
1158 def _do_waitpid_all(self):
1159 # Because of signal coalescing, we must keep calling waitpid() as
1160 # long as we're able to reap a child.
1161 while True:
1162 try:
1163 pid, status = os.waitpid(-1, os.WNOHANG)
1164 except ChildProcessError:
1165 # No more child processes exist.
1166 return
1167 else:
1168 if pid == 0:
1169 # A child process is still alive.
1170 return
1171
Dan Willemsenc9fa0012022-03-25 22:58:53 +00001172 returncode = waitstatus_to_exitcode(status)
Jingwen Chen475b3cc2021-01-05 21:45:16 -05001173
1174 with self._lock:
1175 try:
1176 callback, args = self._callbacks.pop(pid)
1177 except KeyError:
1178 # unknown child
1179 if self._forks:
1180 # It may not be registered yet.
1181 self._zombies[pid] = returncode
1182 if self._loop.get_debug():
1183 logger.debug('unknown process %s exited '
1184 'with returncode %s',
1185 pid, returncode)
1186 continue
1187 callback = None
1188 else:
1189 if self._loop.get_debug():
1190 logger.debug('process %s exited with returncode %s',
1191 pid, returncode)
1192
1193 if callback is None:
1194 logger.warning(
1195 "Caught subprocess termination from unknown pid: "
1196 "%d -> %d", pid, returncode)
1197 else:
1198 callback(pid, returncode, *args)
1199
1200
1201class MultiLoopChildWatcher(AbstractChildWatcher):
1202 """A watcher that doesn't require running loop in the main thread.
1203
1204 This implementation registers a SIGCHLD signal handler on
1205 instantiation (which may conflict with other code that
1206 install own handler for this signal).
1207
1208 The solution is safe but it has a significant overhead when
1209 handling a big number of processes (*O(n)* each time a
1210 SIGCHLD is received).
1211 """
1212
1213 # Implementation note:
1214 # The class keeps compatibility with AbstractChildWatcher ABC
1215 # To achieve this it has empty attach_loop() method
1216 # and doesn't accept explicit loop argument
1217 # for add_child_handler()/remove_child_handler()
1218 # but retrieves the current loop by get_running_loop()
1219
1220 def __init__(self):
1221 self._callbacks = {}
1222 self._saved_sighandler = None
1223
1224 def is_active(self):
1225 return self._saved_sighandler is not None
1226
1227 def close(self):
1228 self._callbacks.clear()
Dan Willemsenc9fa0012022-03-25 22:58:53 +00001229 if self._saved_sighandler is None:
1230 return
1231
1232 handler = signal.getsignal(signal.SIGCHLD)
1233 if handler != self._sig_chld:
1234 logger.warning("SIGCHLD handler was changed by outside code")
1235 else:
1236 signal.signal(signal.SIGCHLD, self._saved_sighandler)
1237 self._saved_sighandler = None
Jingwen Chen475b3cc2021-01-05 21:45:16 -05001238
1239 def __enter__(self):
1240 return self
1241
1242 def __exit__(self, exc_type, exc_val, exc_tb):
1243 pass
1244
1245 def add_child_handler(self, pid, callback, *args):
1246 loop = events.get_running_loop()
1247 self._callbacks[pid] = (loop, callback, args)
1248
1249 # Prevent a race condition in case the child is already terminated.
1250 self._do_waitpid(pid)
1251
1252 def remove_child_handler(self, pid):
1253 try:
1254 del self._callbacks[pid]
1255 return True
1256 except KeyError:
1257 return False
1258
1259 def attach_loop(self, loop):
1260 # Don't save the loop but initialize itself if called first time
1261 # The reason to do it here is that attach_loop() is called from
1262 # unix policy only for the main thread.
1263 # Main thread is required for subscription on SIGCHLD signal
Dan Willemsenc9fa0012022-03-25 22:58:53 +00001264 if self._saved_sighandler is not None:
1265 return
Dan Willemsen52aefcd2022-03-24 21:56:24 -07001266
Dan Willemsenc9fa0012022-03-25 22:58:53 +00001267 self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
1268 if self._saved_sighandler is None:
1269 logger.warning("Previous SIGCHLD handler was set by non-Python code, "
1270 "restore to default handler on watcher close.")
1271 self._saved_sighandler = signal.SIG_DFL
1272
1273 # Set SA_RESTART to limit EINTR occurrences.
1274 signal.siginterrupt(signal.SIGCHLD, False)
Jingwen Chen475b3cc2021-01-05 21:45:16 -05001275
1276 def _do_waitpid_all(self):
1277 for pid in list(self._callbacks):
1278 self._do_waitpid(pid)
1279
1280 def _do_waitpid(self, expected_pid):
1281 assert expected_pid > 0
1282
1283 try:
1284 pid, status = os.waitpid(expected_pid, os.WNOHANG)
1285 except ChildProcessError:
1286 # The child process is already reaped
1287 # (may happen if waitpid() is called elsewhere).
1288 pid = expected_pid
1289 returncode = 255
1290 logger.warning(
1291 "Unknown child process pid %d, will report returncode 255",
1292 pid)
1293 debug_log = False
1294 else:
1295 if pid == 0:
1296 # The child process is still alive.
1297 return
1298
Dan Willemsenc9fa0012022-03-25 22:58:53 +00001299 returncode = waitstatus_to_exitcode(status)
Jingwen Chen475b3cc2021-01-05 21:45:16 -05001300 debug_log = True
1301 try:
1302 loop, callback, args = self._callbacks.pop(pid)
1303 except KeyError: # pragma: no cover
1304 # May happen if .remove_child_handler() is called
1305 # after os.waitpid() returns.
1306 logger.warning("Child watcher got an unexpected pid: %r",
1307 pid, exc_info=True)
1308 else:
1309 if loop.is_closed():
1310 logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1311 else:
1312 if debug_log and loop.get_debug():
1313 logger.debug('process %s exited with returncode %s',
1314 expected_pid, returncode)
1315 loop.call_soon_threadsafe(callback, pid, returncode, *args)
1316
1317 def _sig_chld(self, signum, frame):
1318 try:
1319 self._do_waitpid_all()
1320 except (SystemExit, KeyboardInterrupt):
1321 raise
1322 except BaseException:
1323 logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
1324
1325
1326class ThreadedChildWatcher(AbstractChildWatcher):
1327 """Threaded child watcher implementation.
1328
1329 The watcher uses a thread per process
1330 for waiting for the process finish.
1331
1332 It doesn't require subscription on POSIX signal
1333 but a thread creation is not free.
1334
1335 The watcher has O(1) complexity, its performance doesn't depend
1336 on amount of spawn processes.
1337 """
1338
1339 def __init__(self):
1340 self._pid_counter = itertools.count(0)
1341 self._threads = {}
1342
1343 def is_active(self):
1344 return True
1345
1346 def close(self):
1347 self._join_threads()
1348
1349 def _join_threads(self):
1350 """Internal: Join all non-daemon threads"""
1351 threads = [thread for thread in list(self._threads.values())
1352 if thread.is_alive() and not thread.daemon]
1353 for thread in threads:
1354 thread.join()
1355
1356 def __enter__(self):
1357 return self
1358
1359 def __exit__(self, exc_type, exc_val, exc_tb):
1360 pass
1361
1362 def __del__(self, _warn=warnings.warn):
1363 threads = [thread for thread in list(self._threads.values())
1364 if thread.is_alive()]
1365 if threads:
1366 _warn(f"{self.__class__} has registered but not finished child processes",
1367 ResourceWarning,
1368 source=self)
1369
1370 def add_child_handler(self, pid, callback, *args):
1371 loop = events.get_running_loop()
1372 thread = threading.Thread(target=self._do_waitpid,
1373 name=f"waitpid-{next(self._pid_counter)}",
1374 args=(loop, pid, callback, args),
1375 daemon=True)
1376 self._threads[pid] = thread
1377 thread.start()
1378
1379 def remove_child_handler(self, pid):
1380 # asyncio never calls remove_child_handler() !!!
1381 # The method is no-op but is implemented because
Dan Willemsenc9fa0012022-03-25 22:58:53 +00001382 # abstract base classes require it.
Jingwen Chen475b3cc2021-01-05 21:45:16 -05001383 return True
1384
1385 def attach_loop(self, loop):
1386 pass
1387
1388 def _do_waitpid(self, loop, expected_pid, callback, args):
1389 assert expected_pid > 0
1390
1391 try:
1392 pid, status = os.waitpid(expected_pid, 0)
1393 except ChildProcessError:
1394 # The child process is already reaped
1395 # (may happen if waitpid() is called elsewhere).
1396 pid = expected_pid
1397 returncode = 255
1398 logger.warning(
1399 "Unknown child process pid %d, will report returncode 255",
1400 pid)
1401 else:
Dan Willemsenc9fa0012022-03-25 22:58:53 +00001402 returncode = waitstatus_to_exitcode(status)
Jingwen Chen475b3cc2021-01-05 21:45:16 -05001403 if loop.get_debug():
1404 logger.debug('process %s exited with returncode %s',
1405 expected_pid, returncode)
1406
1407 if loop.is_closed():
1408 logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1409 else:
1410 loop.call_soon_threadsafe(callback, pid, returncode, *args)
1411
1412 self._threads.pop(expected_pid)
1413
1414
1415class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
1416 """UNIX event loop policy with a watcher for child processes."""
1417 _loop_factory = _UnixSelectorEventLoop
1418
1419 def __init__(self):
1420 super().__init__()
1421 self._watcher = None
1422
1423 def _init_watcher(self):
1424 with events._lock:
1425 if self._watcher is None: # pragma: no branch
1426 self._watcher = ThreadedChildWatcher()
Elliott Hughes96c2b6b2021-01-26 11:15:15 -08001427 if threading.current_thread() is threading.main_thread():
Jingwen Chen475b3cc2021-01-05 21:45:16 -05001428 self._watcher.attach_loop(self._local._loop)
1429
1430 def set_event_loop(self, loop):
1431 """Set the event loop.
1432
1433 As a side effect, if a child watcher was set before, then calling
1434 .set_event_loop() from the main thread will call .attach_loop(loop) on
1435 the child watcher.
1436 """
1437
1438 super().set_event_loop(loop)
1439
1440 if (self._watcher is not None and
Elliott Hughes96c2b6b2021-01-26 11:15:15 -08001441 threading.current_thread() is threading.main_thread()):
Jingwen Chen475b3cc2021-01-05 21:45:16 -05001442 self._watcher.attach_loop(loop)
1443
1444 def get_child_watcher(self):
1445 """Get the watcher for child processes.
1446
1447 If not yet set, a ThreadedChildWatcher object is automatically created.
1448 """
1449 if self._watcher is None:
1450 self._init_watcher()
1451
1452 return self._watcher
1453
1454 def set_child_watcher(self, watcher):
1455 """Set the watcher for child processes."""
1456
1457 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
1458
1459 if self._watcher is not None:
1460 self._watcher.close()
1461
1462 self._watcher = watcher
1463
1464
1465SelectorEventLoop = _UnixSelectorEventLoop
1466DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy