blob: dd57fe8e79749a398a3f8b282de2de6cbb6d16c2 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Selector eventloop for Unix with signal handling."""
2
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07003import errno
4import fcntl
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005import os
6import signal
7import socket
8import stat
9import subprocess
10import sys
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080011import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13
Guido van Rossum59691282013-10-30 14:52:03 -070014from . import base_subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015from . import constants
16from . import events
17from . import protocols
18from . import selector_events
19from . import tasks
20from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070021from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022
23
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080024__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR',
25 'AbstractChildWatcher', 'SafeChildWatcher',
26 'FastChildWatcher', 'DefaultEventLoopPolicy',
27 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
29STDIN = 0
30STDOUT = 1
31STDERR = 2
32
33
34if sys.platform == 'win32': # pragma: no cover
35 raise ImportError('Signals are not really supported on Windows')
36
37
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -080038class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039 """Unix event loop
40
41 Adds signal handling to SelectorEventLoop
42 """
43
44 def __init__(self, selector=None):
45 super().__init__(selector)
46 self._signal_handlers = {}
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
48 def _socketpair(self):
49 return socket.socketpair()
50
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051 def add_signal_handler(self, sig, callback, *args):
52 """Add a handler for a signal. UNIX only.
53
54 Raise ValueError if the signal number is invalid or uncatchable.
55 Raise RuntimeError if there is a problem setting up the handler.
56 """
57 self._check_signal(sig)
58 try:
59 # set_wakeup_fd() raises ValueError if this is not the
60 # main thread. By calling it early we ensure that an
61 # event loop running in another thread cannot add a signal
62 # handler.
63 signal.set_wakeup_fd(self._csock.fileno())
64 except ValueError as exc:
65 raise RuntimeError(str(exc))
66
67 handle = events.make_handle(callback, args)
68 self._signal_handlers[sig] = handle
69
70 try:
71 signal.signal(sig, self._handle_signal)
72 except OSError as exc:
73 del self._signal_handlers[sig]
74 if not self._signal_handlers:
75 try:
76 signal.set_wakeup_fd(-1)
77 except ValueError as nexc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070078 logger.info('set_wakeup_fd(-1) failed: %s', nexc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079
80 if exc.errno == errno.EINVAL:
81 raise RuntimeError('sig {} cannot be caught'.format(sig))
82 else:
83 raise
84
85 def _handle_signal(self, sig, arg):
86 """Internal helper that is the actual signal handler."""
87 handle = self._signal_handlers.get(sig)
88 if handle is None:
89 return # Assume it's some race condition.
90 if handle._cancelled:
91 self.remove_signal_handler(sig) # Remove it properly.
92 else:
93 self._add_callback_signalsafe(handle)
94
95 def remove_signal_handler(self, sig):
96 """Remove a handler for a signal. UNIX only.
97
98 Return True if a signal handler was removed, False if not.
99 """
100 self._check_signal(sig)
101 try:
102 del self._signal_handlers[sig]
103 except KeyError:
104 return False
105
106 if sig == signal.SIGINT:
107 handler = signal.default_int_handler
108 else:
109 handler = signal.SIG_DFL
110
111 try:
112 signal.signal(sig, handler)
113 except OSError as exc:
114 if exc.errno == errno.EINVAL:
115 raise RuntimeError('sig {} cannot be caught'.format(sig))
116 else:
117 raise
118
119 if not self._signal_handlers:
120 try:
121 signal.set_wakeup_fd(-1)
122 except ValueError as exc:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700123 logger.info('set_wakeup_fd(-1) failed: %s', exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124
125 return True
126
127 def _check_signal(self, sig):
128 """Internal helper to validate a signal.
129
130 Raise ValueError if the signal number is invalid or uncatchable.
131 Raise RuntimeError if there is a problem setting up the handler.
132 """
133 if not isinstance(sig, int):
134 raise TypeError('sig must be an int, not {!r}'.format(sig))
135
136 if not (1 <= sig < signal.NSIG):
137 raise ValueError(
138 'sig {} out of range(1, {})'.format(sig, signal.NSIG))
139
140 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
141 extra=None):
142 return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
143
144 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
145 extra=None):
146 return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
147
148 @tasks.coroutine
149 def _make_subprocess_transport(self, protocol, args, shell,
150 stdin, stdout, stderr, bufsize,
151 extra=None, **kwargs):
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800152 with events.get_child_watcher() as watcher:
153 transp = _UnixSubprocessTransport(self, protocol, args, shell,
154 stdin, stdout, stderr, bufsize,
155 extra=None, **kwargs)
156 watcher.add_child_handler(transp.get_pid(),
157 self._child_watcher_callback, transp)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158 yield from transp._post_init()
159 return transp
160
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800161 def _child_watcher_callback(self, pid, returncode, transp):
162 self.call_soon_threadsafe(transp._process_exited, returncode)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800164 def _subprocess_closed(self, transp):
165 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700166
167
168def _set_nonblocking(fd):
169 flags = fcntl.fcntl(fd, fcntl.F_GETFL)
170 flags = flags | os.O_NONBLOCK
171 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
172
173
174class _UnixReadPipeTransport(transports.ReadTransport):
175
176 max_size = 256 * 1024 # max bytes we read in one eventloop iteration
177
178 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
179 super().__init__(extra)
180 self._extra['pipe'] = pipe
181 self._loop = loop
182 self._pipe = pipe
183 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700184 mode = os.fstat(self._fileno).st_mode
185 if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode)):
186 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700187 _set_nonblocking(self._fileno)
188 self._protocol = protocol
189 self._closing = False
190 self._loop.add_reader(self._fileno, self._read_ready)
191 self._loop.call_soon(self._protocol.connection_made, self)
192 if waiter is not None:
193 self._loop.call_soon(waiter.set_result, None)
194
195 def _read_ready(self):
196 try:
197 data = os.read(self._fileno, self.max_size)
198 except (BlockingIOError, InterruptedError):
199 pass
200 except OSError as exc:
201 self._fatal_error(exc)
202 else:
203 if data:
204 self._protocol.data_received(data)
205 else:
206 self._closing = True
207 self._loop.remove_reader(self._fileno)
208 self._loop.call_soon(self._protocol.eof_received)
209 self._loop.call_soon(self._call_connection_lost, None)
210
Guido van Rossum57497ad2013-10-18 07:58:20 -0700211 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700212 self._loop.remove_reader(self._fileno)
213
Guido van Rossum57497ad2013-10-18 07:58:20 -0700214 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700215 self._loop.add_reader(self._fileno, self._read_ready)
216
217 def close(self):
218 if not self._closing:
219 self._close(None)
220
221 def _fatal_error(self, exc):
222 # should be called by exception handler only
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700223 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700224 self._close(exc)
225
226 def _close(self, exc):
227 self._closing = True
228 self._loop.remove_reader(self._fileno)
229 self._loop.call_soon(self._call_connection_lost, exc)
230
231 def _call_connection_lost(self, exc):
232 try:
233 self._protocol.connection_lost(exc)
234 finally:
235 self._pipe.close()
236 self._pipe = None
237 self._protocol = None
238 self._loop = None
239
240
241class _UnixWritePipeTransport(transports.WriteTransport):
242
243 def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
244 super().__init__(extra)
245 self._extra['pipe'] = pipe
246 self._loop = loop
247 self._pipe = pipe
248 self._fileno = pipe.fileno()
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700249 mode = os.fstat(self._fileno).st_mode
250 is_socket = stat.S_ISSOCK(mode)
251 is_pipe = stat.S_ISFIFO(mode)
252 if not (is_socket or is_pipe):
253 raise ValueError("Pipe transport is for pipes/sockets only.")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254 _set_nonblocking(self._fileno)
255 self._protocol = protocol
256 self._buffer = []
257 self._conn_lost = 0
258 self._closing = False # Set when close() or write_eof() called.
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700259
260 # On AIX, the reader trick only works for sockets.
261 # On other platforms it works for pipes and sockets.
262 # (Exception: OS X 10.4? Issue #19294.)
263 if is_socket or not sys.platform.startswith("aix"):
264 self._loop.add_reader(self._fileno, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265
266 self._loop.call_soon(self._protocol.connection_made, self)
267 if waiter is not None:
268 self._loop.call_soon(waiter.set_result, None)
269
270 def _read_ready(self):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700271 # Pipe was closed by peer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272 self._close()
273
274 def write(self, data):
275 assert isinstance(data, bytes), repr(data)
276 if not data:
277 return
278
279 if self._conn_lost or self._closing:
280 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700281 logger.warning('pipe closed by peer or '
282 'os.write(pipe, data) raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700283 self._conn_lost += 1
284 return
285
286 if not self._buffer:
287 # Attempt to send it right away first.
288 try:
289 n = os.write(self._fileno, data)
290 except (BlockingIOError, InterruptedError):
291 n = 0
292 except Exception as exc:
293 self._conn_lost += 1
294 self._fatal_error(exc)
295 return
296 if n == len(data):
297 return
298 elif n > 0:
299 data = data[n:]
300 self._loop.add_writer(self._fileno, self._write_ready)
301
302 self._buffer.append(data)
303
304 def _write_ready(self):
305 data = b''.join(self._buffer)
306 assert data, 'Data should not be empty'
307
308 self._buffer.clear()
309 try:
310 n = os.write(self._fileno, data)
311 except (BlockingIOError, InterruptedError):
312 self._buffer.append(data)
313 except Exception as exc:
314 self._conn_lost += 1
315 # Remove writer here, _fatal_error() doesn't it
316 # because _buffer is empty.
317 self._loop.remove_writer(self._fileno)
318 self._fatal_error(exc)
319 else:
320 if n == len(data):
321 self._loop.remove_writer(self._fileno)
322 if self._closing:
323 self._loop.remove_reader(self._fileno)
324 self._call_connection_lost(None)
325 return
326 elif n > 0:
327 data = data[n:]
328
329 self._buffer.append(data) # Try again later.
330
331 def can_write_eof(self):
332 return True
333
334 # TODO: Make the relationships between write_eof(), close(),
335 # abort(), _fatal_error() and _close() more straightforward.
336
337 def write_eof(self):
338 if self._closing:
339 return
340 assert self._pipe
341 self._closing = True
342 if not self._buffer:
343 self._loop.remove_reader(self._fileno)
344 self._loop.call_soon(self._call_connection_lost, None)
345
346 def close(self):
347 if not self._closing:
348 # write_eof is all what we needed to close the write pipe
349 self.write_eof()
350
351 def abort(self):
352 self._close(None)
353
354 def _fatal_error(self, exc):
355 # should be called by exception handler only
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700356 logger.exception('Fatal error for %s', self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357 self._close(exc)
358
359 def _close(self, exc=None):
360 self._closing = True
361 if self._buffer:
362 self._loop.remove_writer(self._fileno)
363 self._buffer.clear()
364 self._loop.remove_reader(self._fileno)
365 self._loop.call_soon(self._call_connection_lost, exc)
366
367 def _call_connection_lost(self, exc):
368 try:
369 self._protocol.connection_lost(exc)
370 finally:
371 self._pipe.close()
372 self._pipe = None
373 self._protocol = None
374 self._loop = None
375
376
Guido van Rossum59691282013-10-30 14:52:03 -0700377class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700378
Guido van Rossum59691282013-10-30 14:52:03 -0700379 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700380 stdin_w = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381 if stdin == subprocess.PIPE:
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700382 # Use a socket pair for stdin, since not all platforms
383 # support selecting read events on the write end of a
384 # socket (which we use in order to detect closing of the
385 # other end). Notably this is needed on AIX, and works
386 # just fine on other platforms.
387 stdin, stdin_w = self._loop._socketpair()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 self._proc = subprocess.Popen(
389 args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
390 universal_newlines=False, bufsize=bufsize, **kwargs)
Guido van Rossum934f6ea2013-10-21 20:37:14 -0700391 if stdin_w is not None:
392 stdin.close()
393 self._proc.stdin = open(stdin_w.detach(), 'rb', buffering=bufsize)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800394
395
396class AbstractChildWatcher:
397 """Abstract base class for monitoring child processes.
398
399 Objects derived from this class monitor a collection of subprocesses and
400 report their termination or interruption by a signal.
401
402 New callbacks are registered with .add_child_handler(). Starting a new
403 process must be done within a 'with' block to allow the watcher to suspend
404 its activity until the new process if fully registered (this is needed to
405 prevent a race condition in some implementations).
406
407 Example:
408 with watcher:
409 proc = subprocess.Popen("sleep 1")
410 watcher.add_child_handler(proc.pid, callback)
411
412 Notes:
413 Implementations of this class must be thread-safe.
414
415 Since child watcher objects may catch the SIGCHLD signal and call
416 waitpid(-1), there should be only one active object per process.
417 """
418
419 def add_child_handler(self, pid, callback, *args):
420 """Register a new child handler.
421
422 Arrange for callback(pid, returncode, *args) to be called when
423 process 'pid' terminates. Specifying another callback for the same
424 process replaces the previous handler.
425
426 Note: callback() must be thread-safe
427 """
428 raise NotImplementedError()
429
430 def remove_child_handler(self, pid):
431 """Removes the handler for process 'pid'.
432
433 The function returns True if the handler was successfully removed,
434 False if there was nothing to remove."""
435
436 raise NotImplementedError()
437
438 def set_loop(self, loop):
439 """Reattach the watcher to another event loop.
440
441 Note: loop may be None
442 """
443 raise NotImplementedError()
444
445 def close(self):
446 """Close the watcher.
447
448 This must be called to make sure that any underlying resource is freed.
449 """
450 raise NotImplementedError()
451
452 def __enter__(self):
453 """Enter the watcher's context and allow starting new processes
454
455 This function must return self"""
456 raise NotImplementedError()
457
458 def __exit__(self, a, b, c):
459 """Exit the watcher's context"""
460 raise NotImplementedError()
461
462
463class BaseChildWatcher(AbstractChildWatcher):
464
465 def __init__(self, loop):
466 self._loop = None
467 self._callbacks = {}
468
469 self.set_loop(loop)
470
471 def close(self):
472 self.set_loop(None)
473 self._callbacks.clear()
474
475 def _do_waitpid(self, expected_pid):
476 raise NotImplementedError()
477
478 def _do_waitpid_all(self):
479 raise NotImplementedError()
480
481 def set_loop(self, loop):
482 assert loop is None or isinstance(loop, events.AbstractEventLoop)
483
484 if self._loop is not None:
485 self._loop.remove_signal_handler(signal.SIGCHLD)
486
487 self._loop = loop
488 if loop is not None:
489 loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
490
491 # Prevent a race condition in case a child terminated
492 # during the switch.
493 self._do_waitpid_all()
494
495 def remove_child_handler(self, pid):
496 try:
497 del self._callbacks[pid]
498 return True
499 except KeyError:
500 return False
501
502 def _sig_chld(self):
503 try:
504 self._do_waitpid_all()
505 except Exception:
506 logger.exception('Unknown exception in SIGCHLD handler')
507
508 def _compute_returncode(self, status):
509 if os.WIFSIGNALED(status):
510 # The child process died because of a signal.
511 return -os.WTERMSIG(status)
512 elif os.WIFEXITED(status):
513 # The child process exited (e.g sys.exit()).
514 return os.WEXITSTATUS(status)
515 else:
516 # The child exited, but we don't understand its status.
517 # This shouldn't happen, but if it does, let's just
518 # return that status; perhaps that helps debug it.
519 return status
520
521
522class SafeChildWatcher(BaseChildWatcher):
523 """'Safe' child watcher implementation.
524
525 This implementation avoids disrupting other code spawning processes by
526 polling explicitly each process in the SIGCHLD handler instead of calling
527 os.waitpid(-1).
528
529 This is a safe solution but it has a significant overhead when handling a
530 big number of children (O(n) each time SIGCHLD is raised)
531 """
532
533 def __enter__(self):
534 return self
535
536 def __exit__(self, a, b, c):
537 pass
538
539 def add_child_handler(self, pid, callback, *args):
540 self._callbacks[pid] = callback, args
541
542 # Prevent a race condition in case the child is already terminated.
543 self._do_waitpid(pid)
544
545 def _do_waitpid_all(self):
546
547 for pid in list(self._callbacks):
548 self._do_waitpid(pid)
549
550 def _do_waitpid(self, expected_pid):
551 assert expected_pid > 0
552
553 try:
554 pid, status = os.waitpid(expected_pid, os.WNOHANG)
555 except ChildProcessError:
556 # The child process is already reaped
557 # (may happen if waitpid() is called elsewhere).
558 pid = expected_pid
559 returncode = 255
560 logger.warning(
561 "Unknown child process pid %d, will report returncode 255",
562 pid)
563 else:
564 if pid == 0:
565 # The child process is still alive.
566 return
567
568 returncode = self._compute_returncode(status)
569
570 try:
571 callback, args = self._callbacks.pop(pid)
572 except KeyError: # pragma: no cover
573 # May happen if .remove_child_handler() is called
574 # after os.waitpid() returns.
575 pass
576 else:
577 callback(pid, returncode, *args)
578
579
580class FastChildWatcher(BaseChildWatcher):
581 """'Fast' child watcher implementation.
582
583 This implementation reaps every terminated processes by calling
584 os.waitpid(-1) directly, possibly breaking other code spawning processes
585 and waiting for their termination.
586
587 There is no noticeable overhead when handling a big number of children
588 (O(1) each time a child terminates).
589 """
590 def __init__(self, loop):
591 super().__init__(loop)
592
593 self._lock = threading.Lock()
594 self._zombies = {}
595 self._forks = 0
596
597 def close(self):
598 super().close()
599 self._zombies.clear()
600
601 def __enter__(self):
602 with self._lock:
603 self._forks += 1
604
605 return self
606
607 def __exit__(self, a, b, c):
608 with self._lock:
609 self._forks -= 1
610
611 if self._forks or not self._zombies:
612 return
613
614 collateral_victims = str(self._zombies)
615 self._zombies.clear()
616
617 logger.warning(
618 "Caught subprocesses termination from unknown pids: %s",
619 collateral_victims)
620
621 def add_child_handler(self, pid, callback, *args):
622 assert self._forks, "Must use the context manager"
623
624 self._callbacks[pid] = callback, args
625
626 try:
627 # Ensure that the child is not already terminated.
628 # (raise KeyError if still alive)
629 returncode = self._zombies.pop(pid)
630
631 # Child is dead, therefore we can fire the callback immediately.
632 # First we remove it from the dict.
633 # (raise KeyError if .remove_child_handler() was called in-between)
634 del self._callbacks[pid]
635 except KeyError:
636 pass
637 else:
638 callback(pid, returncode, *args)
639
640 def _do_waitpid_all(self):
641 # Because of signal coalescing, we must keep calling waitpid() as
642 # long as we're able to reap a child.
643 while True:
644 try:
645 pid, status = os.waitpid(-1, os.WNOHANG)
646 except ChildProcessError:
647 # No more child processes exist.
648 return
649 else:
650 if pid == 0:
651 # A child process is still alive.
652 return
653
654 returncode = self._compute_returncode(status)
655
656 try:
657 callback, args = self._callbacks.pop(pid)
658 except KeyError:
659 # unknown child
660 with self._lock:
661 if self._forks:
662 # It may not be registered yet.
663 self._zombies[pid] = returncode
664 continue
665
666 logger.warning(
667 "Caught subprocess termination from unknown pid: "
668 "%d -> %d", pid, returncode)
669 else:
670 callback(pid, returncode, *args)
671
672
673class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
674 """XXX"""
675 _loop_factory = _UnixSelectorEventLoop
676
677 def __init__(self):
678 super().__init__()
679 self._watcher = None
680
681 def _init_watcher(self):
682 with events._lock:
683 if self._watcher is None: # pragma: no branch
684 if isinstance(threading.current_thread(),
685 threading._MainThread):
686 self._watcher = SafeChildWatcher(self._local._loop)
687 else:
688 self._watcher = SafeChildWatcher(None)
689
690 def set_event_loop(self, loop):
691 """Set the event loop.
692
693 As a side effect, if a child watcher was set before, then calling
694 .set_event_loop() from the main thread will call .set_loop(loop) on the
695 child watcher.
696 """
697
698 super().set_event_loop(loop)
699
700 if self._watcher is not None and \
701 isinstance(threading.current_thread(), threading._MainThread):
702 self._watcher.set_loop(loop)
703
704 def get_child_watcher(self):
705 """Get the child watcher
706
707 If not yet set, a SafeChildWatcher object is automatically created.
708 """
709 if self._watcher is None:
710 self._init_watcher()
711
712 return self._watcher
713
714 def set_child_watcher(self, watcher):
715 """Set the child watcher"""
716
717 assert watcher is None or isinstance(watcher, AbstractChildWatcher)
718
719 if self._watcher is not None:
720 self._watcher.close()
721
722 self._watcher = watcher
723
724SelectorEventLoop = _UnixSelectorEventLoop
725DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy