blob: f4cf6e7f2ae254d646c8269d76b0c2b3d58b2c13 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Selector eventloop for Unix with signal handling."""
2
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
16from . import events
17from . import protocols
18from . import selector_events
19from . import tasks
20from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080024__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR',
25 'AbstractChildWatcher', 'SafeChildWatcher',
26 'FastChildWatcher', 'DefaultEventLoopPolicy',
27 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
29STDIN = 0
30STDOUT = 1
31STDERR = 2
32
33
34if sys.platform == 'win32': # pragma: no cover
35 raise ImportError('Signals are not really supported on Windows')
36
37
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080038class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039 """Unix event loop
40
41 Adds signal handling to SelectorEventLoop
42 """
43
44 def __init__(self, selector=None):
45 super().__init__(selector)
46 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48 def _socketpair(self):
49 return socket.socketpair()
50
Guido van Rossum0b69fbc2013-11-06 20:25:50 -080051 def close(self):
52 for sig in list(self._signal_handlers):
53 self.remove_signal_handler(sig)
54 super().close()
55
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056 def add_signal_handler(self, sig, callback, *args):
57 """Add a handler for a signal. UNIX only.
58
59 Raise ValueError if the signal number is invalid or uncatchable.
60 Raise RuntimeError if there is a problem setting up the handler.
61 """
62 self._check_signal(sig)
63 try:
64 # set_wakeup_fd() raises ValueError if this is not the
65 # main thread. By calling it early we ensure that an
66 # event loop running in another thread cannot add a signal
67 # handler.
68 signal.set_wakeup_fd(self._csock.fileno())
69 except ValueError as exc:
70 raise RuntimeError(str(exc))
71
72 handle = events.make_handle(callback, args)
73 self._signal_handlers[sig] = handle
74
75 try:
76 signal.signal(sig, self._handle_signal)
77 except OSError as exc:
78 del self._signal_handlers[sig]
79 if not self._signal_handlers:
80 try:
81 signal.set_wakeup_fd(-1)
82 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070083 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084
85 if exc.errno == errno.EINVAL:
86 raise RuntimeError('sig {} cannot be caught'.format(sig))
87 else:
88 raise
89
90 def _handle_signal(self, sig, arg):
91 """Internal helper that is the actual signal handler."""
92 handle = self._signal_handlers.get(sig)
93 if handle is None:
94 return # Assume it's some race condition.
95 if handle._cancelled:
96 self.remove_signal_handler(sig) # Remove it properly.
97 else:
98 self._add_callback_signalsafe(handle)
99
100 def remove_signal_handler(self, sig):
101 """Remove a handler for a signal. UNIX only.
102
103 Return True if a signal handler was removed, False if not.
104 """
105 self._check_signal(sig)
106 try:
107 del self._signal_handlers[sig]
108 except KeyError:
109 return False
110
111 if sig == signal.SIGINT:
112 handler = signal.default_int_handler
113 else:
114 handler = signal.SIG_DFL
115
116 try:
117 signal.signal(sig, handler)
118 except OSError as exc:
119 if exc.errno == errno.EINVAL:
120 raise RuntimeError('sig {} cannot be caught'.format(sig))
121 else:
122 raise
123
124 if not self._signal_handlers:
125 try:
126 signal.set_wakeup_fd(-1)
127 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700128 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700129
130 return True
131
132 def _check_signal(self, sig):
133 """Internal helper to validate a signal.
134
135 Raise ValueError if the signal number is invalid or uncatchable.
136 Raise RuntimeError if there is a problem setting up the handler.
137 """
138 if not isinstance(sig, int):
139 raise TypeError('sig must be an int, not {!r}'.format(sig))
140
141 if not (1 <= sig < signal.NSIG):
142 raise ValueError(
143 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
144
145 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
146 extra=None):
147 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
148
149 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
150 extra=None):
151 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
152
153 @tasks.coroutine
154 def _make_subprocess_transport(self, protocol, args, shell,
155 stdin, stdout, stderr, bufsize,
156 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800157 with events.get_child_watcher() as watcher:
158 transp = _UnixSubprocessTransport(self, protocol, args, shell,
159 stdin, stdout, stderr, bufsize,
160 extra=None, **kwargs)
161 watcher.add_child_handler(transp.get_pid(),
162 self._child_watcher_callback, transp)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 yield from transp._post_init()
164 return transp
165
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800166 def _child_watcher_callback(self, pid, returncode, transp):
167 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700168
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800169 def _subprocess_closed(self, transp):
170 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171
172
173def _set_nonblocking(fd):
174 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
175 flags = flags | os.O_NONBLOCK
176 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
177
178
179class _UnixReadPipeTransport(transports.ReadTransport):
180
181 max_size = 256 * 1024 # max bytes we read in one eventloop iteration
182
183 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
184 super().__init__(extra)
185 self._extra['pipe'] = pipe
186 self._loop = loop
187 self._pipe = pipe
188 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700189 mode = os.fstat(self._fileno).st_mode
190 if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode)):
191 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700192 _set_nonblocking(self._fileno)
193 self._protocol = protocol
194 self._closing = False
195 self._loop.add_reader(self._fileno, self._read_ready)
196 self._loop.call_soon(self._protocol.connection_made, self)
197 if waiter is not None:
198 self._loop.call_soon(waiter.set_result, None)
199
200 def _read_ready(self):
201 try:
202 data = os.read(self._fileno, self.max_size)
203 except (BlockingIOError, InterruptedError):
204 pass
205 except OSError as exc:
206 self._fatal_error(exc)
207 else:
208 if data:
209 self._protocol.data_received(data)
210 else:
211 self._closing = True
212 self._loop.remove_reader(self._fileno)
213 self._loop.call_soon(self._protocol.eof_received)
214 self._loop.call_soon(self._call_connection_lost, None)
215
Guido van Rossum57497ad2013-10-18 07:58:20 -0700216 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217 self._loop.remove_reader(self._fileno)
218
Guido van Rossum57497ad2013-10-18 07:58:20 -0700219 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700220 self._loop.add_reader(self._fileno, self._read_ready)
221
222 def close(self):
223 if not self._closing:
224 self._close(None)
225
226 def _fatal_error(self, exc):
227 # should be called by exception handler only
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700228 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700229 self._close(exc)
230
231 def _close(self, exc):
232 self._closing = True
233 self._loop.remove_reader(self._fileno)
234 self._loop.call_soon(self._call_connection_lost, exc)
235
236 def _call_connection_lost(self, exc):
237 try:
238 self._protocol.connection_lost(exc)
239 finally:
240 self._pipe.close()
241 self._pipe = None
242 self._protocol = None
243 self._loop = None
244
245
246class _UnixWritePipeTransport(transports.WriteTransport):
247
248 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
249 super().__init__(extra)
250 self._extra['pipe'] = pipe
251 self._loop = loop
252 self._pipe = pipe
253 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700254 mode = os.fstat(self._fileno).st_mode
255 is_socket = stat.S_ISSOCK(mode)
256 is_pipe = stat.S_ISFIFO(mode)
257 if not (is_socket or is_pipe):
258 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700259 _set_nonblocking(self._fileno)
260 self._protocol = protocol
261 self._buffer = []
262 self._conn_lost = 0
263 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700264
265 # On AIX, the reader trick only works for sockets.
266 # On other platforms it works for pipes and sockets.
267 # (Exception: OS X 10.4? Issue #19294.)
268 if is_socket or not sys.platform.startswith("aix"):
269 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700270
271 self._loop.call_soon(self._protocol.connection_made, self)
272 if waiter is not None:
273 self._loop.call_soon(waiter.set_result, None)
274
275 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700276 # Pipe was closed by peer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700277 self._close()
278
279 def write(self, data):
280 assert isinstance(data, bytes), repr(data)
281 if not data:
282 return
283
284 if self._conn_lost or self._closing:
285 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700286 logger.warning('pipe closed by peer or '
287 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700288 self._conn_lost += 1
289 return
290
291 if not self._buffer:
292 # Attempt to send it right away first.
293 try:
294 n = os.write(self._fileno, data)
295 except (BlockingIOError, InterruptedError):
296 n = 0
297 except Exception as exc:
298 self._conn_lost += 1
299 self._fatal_error(exc)
300 return
301 if n == len(data):
302 return
303 elif n > 0:
304 data = data[n:]
305 self._loop.add_writer(self._fileno, self._write_ready)
306
307 self._buffer.append(data)
308
309 def _write_ready(self):
310 data = b''.join(self._buffer)
311 assert data, 'Data should not be empty'
312
313 self._buffer.clear()
314 try:
315 n = os.write(self._fileno, data)
316 except (BlockingIOError, InterruptedError):
317 self._buffer.append(data)
318 except Exception as exc:
319 self._conn_lost += 1
320 # Remove writer here, _fatal_error() doesn't it
321 # because _buffer is empty.
322 self._loop.remove_writer(self._fileno)
323 self._fatal_error(exc)
324 else:
325 if n == len(data):
326 self._loop.remove_writer(self._fileno)
327 if self._closing:
328 self._loop.remove_reader(self._fileno)
329 self._call_connection_lost(None)
330 return
331 elif n > 0:
332 data = data[n:]
333
334 self._buffer.append(data) # Try again later.
335
336 def can_write_eof(self):
337 return True
338
339 # TODO: Make the relationships between write_eof(), close(),
340 # abort(), _fatal_error() and _close() more straightforward.
341
342 def write_eof(self):
343 if self._closing:
344 return
345 assert self._pipe
346 self._closing = True
347 if not self._buffer:
348 self._loop.remove_reader(self._fileno)
349 self._loop.call_soon(self._call_connection_lost, None)
350
351 def close(self):
352 if not self._closing:
353 # write_eof is all what we needed to close the write pipe
354 self.write_eof()
355
356 def abort(self):
357 self._close(None)
358
359 def _fatal_error(self, exc):
360 # should be called by exception handler only
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700361 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700362 self._close(exc)
363
364 def _close(self, exc=None):
365 self._closing = True
366 if self._buffer:
367 self._loop.remove_writer(self._fileno)
368 self._buffer.clear()
369 self._loop.remove_reader(self._fileno)
370 self._loop.call_soon(self._call_connection_lost, exc)
371
372 def _call_connection_lost(self, exc):
373 try:
374 self._protocol.connection_lost(exc)
375 finally:
376 self._pipe.close()
377 self._pipe = None
378 self._protocol = None
379 self._loop = None
380
381
Guido van Rossum59691282013-10-30 14:52:03 -0700382class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700383
Guido van Rossum59691282013-10-30 14:52:03 -0700384 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700385 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700387 # Use a socket pair for stdin, since not all platforms
388 # support selecting read events on the write end of a
389 # socket (which we use in order to detect closing of the
390 # other end). Notably this is needed on AIX, and works
391 # just fine on other platforms.
392 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700393 self._proc = subprocess.Popen(
394 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
395 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700396 if stdin_w is not None:
397 stdin.close()
398 self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800399
400
401class AbstractChildWatcher:
402 """Abstract base class for monitoring child processes.
403
404 Objects derived from this class monitor a collection of subprocesses and
405 report their termination or interruption by a signal.
406
407 New callbacks are registered with .add_child_handler(). Starting a new
408 process must be done within a 'with' block to allow the watcher to suspend
409 its activity until the new process if fully registered (this is needed to
410 prevent a race condition in some implementations).
411
412 Example:
413 with watcher:
414 proc = subprocess.Popen("sleep 1")
415 watcher.add_child_handler(proc.pid, callback)
416
417 Notes:
418 Implementations of this class must be thread-safe.
419
420 Since child watcher objects may catch the SIGCHLD signal and call
421 waitpid(-1), there should be only one active object per process.
422 """
423
424 def add_child_handler(self, pid, callback, *args):
425 """Register a new child handler.
426
427 Arrange for callback(pid, returncode, *args) to be called when
428 process 'pid' terminates. Specifying another callback for the same
429 process replaces the previous handler.
430
431 Note: callback() must be thread-safe
432 """
433 raise NotImplementedError()
434
435 def remove_child_handler(self, pid):
436 """Removes the handler for process 'pid'.
437
438 The function returns True if the handler was successfully removed,
439 False if there was nothing to remove."""
440
441 raise NotImplementedError()
442
443 def set_loop(self, loop):
444 """Reattach the watcher to another event loop.
445
446 Note: loop may be None
447 """
448 raise NotImplementedError()
449
450 def close(self):
451 """Close the watcher.
452
453 This must be called to make sure that any underlying resource is freed.
454 """
455 raise NotImplementedError()
456
457 def __enter__(self):
458 """Enter the watcher's context and allow starting new processes
459
460 This function must return self"""
461 raise NotImplementedError()
462
463 def __exit__(self, a, b, c):
464 """Exit the watcher's context"""
465 raise NotImplementedError()
466
467
468class BaseChildWatcher(AbstractChildWatcher):
469
470 def __init__(self, loop):
471 self._loop = None
472 self._callbacks = {}
473
474 self.set_loop(loop)
475
476 def close(self):
477 self.set_loop(None)
478 self._callbacks.clear()
479
480 def _do_waitpid(self, expected_pid):
481 raise NotImplementedError()
482
483 def _do_waitpid_all(self):
484 raise NotImplementedError()
485
486 def set_loop(self, loop):
487 assert loop is None or isinstance(loop, events.AbstractEventLoop)
488
489 if self._loop is not None:
490 self._loop.remove_signal_handler(signal.SIGCHLD)
491
492 self._loop = loop
493 if loop is not None:
494 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
495
496 # Prevent a race condition in case a child terminated
497 # during the switch.
498 self._do_waitpid_all()
499
500 def remove_child_handler(self, pid):
501 try:
502 del self._callbacks[pid]
503 return True
504 except KeyError:
505 return False
506
507 def _sig_chld(self):
508 try:
509 self._do_waitpid_all()
510 except Exception:
511 logger.exception('Unknown exception in SIGCHLD handler')
512
513 def _compute_returncode(self, status):
514 if os.WIFSIGNALED(status):
515 # The child process died because of a signal.
516 return -os.WTERMSIG(status)
517 elif os.WIFEXITED(status):
518 # The child process exited (e.g sys.exit()).
519 return os.WEXITSTATUS(status)
520 else:
521 # The child exited, but we don't understand its status.
522 # This shouldn't happen, but if it does, let's just
523 # return that status; perhaps that helps debug it.
524 return status
525
526
527class SafeChildWatcher(BaseChildWatcher):
528 """'Safe' child watcher implementation.
529
530 This implementation avoids disrupting other code spawning processes by
531 polling explicitly each process in the SIGCHLD handler instead of calling
532 os.waitpid(-1).
533
534 This is a safe solution but it has a significant overhead when handling a
535 big number of children (O(n) each time SIGCHLD is raised)
536 """
537
538 def __enter__(self):
539 return self
540
541 def __exit__(self, a, b, c):
542 pass
543
544 def add_child_handler(self, pid, callback, *args):
545 self._callbacks[pid] = callback, args
546
547 # Prevent a race condition in case the child is already terminated.
548 self._do_waitpid(pid)
549
550 def _do_waitpid_all(self):
551
552 for pid in list(self._callbacks):
553 self._do_waitpid(pid)
554
555 def _do_waitpid(self, expected_pid):
556 assert expected_pid > 0
557
558 try:
559 pid, status = os.waitpid(expected_pid, os.WNOHANG)
560 except ChildProcessError:
561 # The child process is already reaped
562 # (may happen if waitpid() is called elsewhere).
563 pid = expected_pid
564 returncode = 255
565 logger.warning(
566 "Unknown child process pid %d, will report returncode 255",
567 pid)
568 else:
569 if pid == 0:
570 # The child process is still alive.
571 return
572
573 returncode = self._compute_returncode(status)
574
575 try:
576 callback, args = self._callbacks.pop(pid)
577 except KeyError: # pragma: no cover
578 # May happen if .remove_child_handler() is called
579 # after os.waitpid() returns.
580 pass
581 else:
582 callback(pid, returncode, *args)
583
584
585class FastChildWatcher(BaseChildWatcher):
586 """'Fast' child watcher implementation.
587
588 This implementation reaps every terminated processes by calling
589 os.waitpid(-1) directly, possibly breaking other code spawning processes
590 and waiting for their termination.
591
592 There is no noticeable overhead when handling a big number of children
593 (O(1) each time a child terminates).
594 """
595 def __init__(self, loop):
596 super().__init__(loop)
597
598 self._lock = threading.Lock()
599 self._zombies = {}
600 self._forks = 0
601
602 def close(self):
603 super().close()
604 self._zombies.clear()
605
606 def __enter__(self):
607 with self._lock:
608 self._forks += 1
609
610 return self
611
612 def __exit__(self, a, b, c):
613 with self._lock:
614 self._forks -= 1
615
616 if self._forks or not self._zombies:
617 return
618
619 collateral_victims = str(self._zombies)
620 self._zombies.clear()
621
622 logger.warning(
623 "Caught subprocesses termination from unknown pids: %s",
624 collateral_victims)
625
626 def add_child_handler(self, pid, callback, *args):
627 assert self._forks, "Must use the context manager"
628
629 self._callbacks[pid] = callback, args
630
631 try:
632 # Ensure that the child is not already terminated.
633 # (raise KeyError if still alive)
634 returncode = self._zombies.pop(pid)
635
636 # Child is dead, therefore we can fire the callback immediately.
637 # First we remove it from the dict.
638 # (raise KeyError if .remove_child_handler() was called in-between)
639 del self._callbacks[pid]
640 except KeyError:
641 pass
642 else:
643 callback(pid, returncode, *args)
644
645 def _do_waitpid_all(self):
646 # Because of signal coalescing, we must keep calling waitpid() as
647 # long as we're able to reap a child.
648 while True:
649 try:
650 pid, status = os.waitpid(-1, os.WNOHANG)
651 except ChildProcessError:
652 # No more child processes exist.
653 return
654 else:
655 if pid == 0:
656 # A child process is still alive.
657 return
658
659 returncode = self._compute_returncode(status)
660
661 try:
662 callback, args = self._callbacks.pop(pid)
663 except KeyError:
664 # unknown child
665 with self._lock:
666 if self._forks:
667 # It may not be registered yet.
668 self._zombies[pid] = returncode
669 continue
670
671 logger.warning(
672 "Caught subprocess termination from unknown pid: "
673 "%d -> %d", pid, returncode)
674 else:
675 callback(pid, returncode, *args)
676
677
678class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
679 """XXX"""
680 _loop_factory = _UnixSelectorEventLoop
681
682 def __init__(self):
683 super().__init__()
684 self._watcher = None
685
686 def _init_watcher(self):
687 with events._lock:
688 if self._watcher is None: # pragma: no branch
689 if isinstance(threading.current_thread(),
690 threading._MainThread):
691 self._watcher = SafeChildWatcher(self._local._loop)
692 else:
693 self._watcher = SafeChildWatcher(None)
694
695 def set_event_loop(self, loop):
696 """Set the event loop.
697
698 As a side effect, if a child watcher was set before, then calling
699 .set_event_loop() from the main thread will call .set_loop(loop) on the
700 child watcher.
701 """
702
703 super().set_event_loop(loop)
704
705 if self._watcher is not None and \
706 isinstance(threading.current_thread(), threading._MainThread):
707 self._watcher.set_loop(loop)
708
709 def get_child_watcher(self):
710 """Get the child watcher
711
712 If not yet set, a SafeChildWatcher object is automatically created.
713 """
714 if self._watcher is None:
715 self._init_watcher()
716
717 return self._watcher
718
719 def set_child_watcher(self, watcher):
720 """Set the child watcher"""
721
722 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
723
724 if self._watcher is not None:
725 self._watcher.close()
726
727 self._watcher = watcher
728
729SelectorEventLoop = _UnixSelectorEventLoop
730DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy