| """Event loop and event loop policy.""" |
| |
| __all__ = ['AbstractEventLoopPolicy', |
| 'AbstractEventLoop', 'AbstractServer', |
| 'Handle', 'TimerHandle', |
| 'get_event_loop_policy', 'set_event_loop_policy', |
| 'get_event_loop', 'set_event_loop', 'new_event_loop', |
| 'get_child_watcher', 'set_child_watcher', |
| ] |
| |
| import functools |
| import inspect |
| import reprlib |
| import socket |
| import subprocess |
| import sys |
| import threading |
| import traceback |
| |
| |
| _PY34 = sys.version_info >= (3, 4) |
| |
| |
| def _get_function_source(func): |
| if _PY34: |
| func = inspect.unwrap(func) |
| elif hasattr(func, '__wrapped__'): |
| func = func.__wrapped__ |
| if inspect.isfunction(func): |
| code = func.__code__ |
| return (code.co_filename, code.co_firstlineno) |
| if isinstance(func, functools.partial): |
| return _get_function_source(func.func) |
| if _PY34 and isinstance(func, functools.partialmethod): |
| return _get_function_source(func.func) |
| return None |
| |
| |
| def _format_args(args): |
| """Format function arguments. |
| |
| Special case for a single parameter: ('hello',) is formatted as ('hello'). |
| """ |
| # use reprlib to limit the length of the output |
| args_repr = reprlib.repr(args) |
| if len(args) == 1 and args_repr.endswith(',)'): |
| args_repr = args_repr[:-2] + ')' |
| return args_repr |
| |
| |
| def _format_callback(func, args, suffix=''): |
| if isinstance(func, functools.partial): |
| if args is not None: |
| suffix = _format_args(args) + suffix |
| return _format_callback(func.func, func.args, suffix) |
| |
| func_repr = getattr(func, '__qualname__', None) |
| if not func_repr: |
| func_repr = repr(func) |
| |
| if args is not None: |
| func_repr += _format_args(args) |
| if suffix: |
| func_repr += suffix |
| |
| source = _get_function_source(func) |
| if source: |
| func_repr += ' at %s:%s' % source |
| return func_repr |
| |
| |
| class Handle: |
| """Object returned by callback registration methods.""" |
| |
| __slots__ = ('_callback', '_args', '_cancelled', '_loop', |
| '_source_traceback', '_repr', '__weakref__') |
| |
| def __init__(self, callback, args, loop): |
| assert not isinstance(callback, Handle), 'A Handle is not a callback' |
| self._loop = loop |
| self._callback = callback |
| self._args = args |
| self._cancelled = False |
| self._repr = None |
| if self._loop.get_debug(): |
| self._source_traceback = traceback.extract_stack(sys._getframe(1)) |
| else: |
| self._source_traceback = None |
| |
| def _repr_info(self): |
| info = [self.__class__.__name__] |
| if self._cancelled: |
| info.append('cancelled') |
| if self._callback is not None: |
| info.append(_format_callback(self._callback, self._args)) |
| if self._source_traceback: |
| frame = self._source_traceback[-1] |
| info.append('created at %s:%s' % (frame[0], frame[1])) |
| return info |
| |
| def __repr__(self): |
| if self._repr is not None: |
| return self._repr |
| info = self._repr_info() |
| return '<%s>' % ' '.join(info) |
| |
| def cancel(self): |
| if not self._cancelled: |
| self._cancelled = True |
| if self._loop.get_debug(): |
| # Keep a representation in debug mode to keep callback and |
| # parameters. For example, to log the warning |
| # "Executing <Handle...> took 2.5 second" |
| self._repr = repr(self) |
| self._callback = None |
| self._args = None |
| |
| def _run(self): |
| try: |
| self._callback(*self._args) |
| except Exception as exc: |
| cb = _format_callback(self._callback, self._args) |
| msg = 'Exception in callback {}'.format(cb) |
| context = { |
| 'message': msg, |
| 'exception': exc, |
| 'handle': self, |
| } |
| if self._source_traceback: |
| context['source_traceback'] = self._source_traceback |
| self._loop.call_exception_handler(context) |
| self = None # Needed to break cycles when an exception occurs. |
| |
| |
| class TimerHandle(Handle): |
| """Object returned by timed callback registration methods.""" |
| |
| __slots__ = ['_scheduled', '_when'] |
| |
| def __init__(self, when, callback, args, loop): |
| assert when is not None |
| super().__init__(callback, args, loop) |
| if self._source_traceback: |
| del self._source_traceback[-1] |
| self._when = when |
| self._scheduled = False |
| |
| def _repr_info(self): |
| info = super()._repr_info() |
| pos = 2 if self._cancelled else 1 |
| info.insert(pos, 'when=%s' % self._when) |
| return info |
| |
| def __hash__(self): |
| return hash(self._when) |
| |
| def __lt__(self, other): |
| return self._when < other._when |
| |
| def __le__(self, other): |
| if self._when < other._when: |
| return True |
| return self.__eq__(other) |
| |
| def __gt__(self, other): |
| return self._when > other._when |
| |
| def __ge__(self, other): |
| if self._when > other._when: |
| return True |
| return self.__eq__(other) |
| |
| def __eq__(self, other): |
| if isinstance(other, TimerHandle): |
| return (self._when == other._when and |
| self._callback == other._callback and |
| self._args == other._args and |
| self._cancelled == other._cancelled) |
| return NotImplemented |
| |
| def cancel(self): |
| if not self._cancelled: |
| self._loop._timer_handle_cancelled(self) |
| super().cancel() |
| |
| |
| class AbstractServer: |
| """Abstract server returned by create_server().""" |
| |
| def close(self): |
| """Stop serving. This leaves existing connections open.""" |
| return NotImplemented |
| |
| def wait_closed(self): |
| """Coroutine to wait until service is closed.""" |
| return NotImplemented |
| |
| |
| class AbstractEventLoop: |
| """Abstract event loop.""" |
| |
| # Running and stopping the event loop. |
| |
| def run_forever(self): |
| """Run the event loop until stop() is called.""" |
| raise NotImplementedError |
| |
| def run_until_complete(self, future): |
| """Run the event loop until a Future is done. |
| |
| Return the Future's result, or raise its exception. |
| """ |
| raise NotImplementedError |
| |
| def stop(self): |
| """Stop the event loop as soon as reasonable. |
| |
| Exactly how soon that is may depend on the implementation, but |
| no more I/O callbacks should be scheduled. |
| """ |
| raise NotImplementedError |
| |
| def is_running(self): |
| """Return whether the event loop is currently running.""" |
| raise NotImplementedError |
| |
| def is_closed(self): |
| """Returns True if the event loop was closed.""" |
| raise NotImplementedError |
| |
| def close(self): |
| """Close the loop. |
| |
| The loop should not be running. |
| |
| This is idempotent and irreversible. |
| |
| No other methods should be called after this one. |
| """ |
| raise NotImplementedError |
| |
| # Methods scheduling callbacks. All these return Handles. |
| |
| def _timer_handle_cancelled(self, handle): |
| """Notification that a TimerHandle has been cancelled.""" |
| raise NotImplementedError |
| |
| def call_soon(self, callback, *args): |
| return self.call_later(0, callback, *args) |
| |
| def call_later(self, delay, callback, *args): |
| raise NotImplementedError |
| |
| def call_at(self, when, callback, *args): |
| raise NotImplementedError |
| |
| def time(self): |
| raise NotImplementedError |
| |
| # Method scheduling a coroutine object: create a task. |
| |
| def create_task(self, coro): |
| raise NotImplementedError |
| |
| # Methods for interacting with threads. |
| |
| def call_soon_threadsafe(self, callback, *args): |
| raise NotImplementedError |
| |
| def run_in_executor(self, executor, callback, *args): |
| raise NotImplementedError |
| |
| def set_default_executor(self, executor): |
| raise NotImplementedError |
| |
| # Network I/O methods returning Futures. |
| |
| def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0): |
| raise NotImplementedError |
| |
| def getnameinfo(self, sockaddr, flags=0): |
| raise NotImplementedError |
| |
| def create_connection(self, protocol_factory, host=None, port=None, *, |
| ssl=None, family=0, proto=0, flags=0, sock=None, |
| local_addr=None, server_hostname=None): |
| raise NotImplementedError |
| |
| def create_server(self, protocol_factory, host=None, port=None, *, |
| family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, |
| sock=None, backlog=100, ssl=None, reuse_address=None): |
| """A coroutine which creates a TCP server bound to host and port. |
| |
| The return value is a Server object which can be used to stop |
| the service. |
| |
| If host is an empty string or None all interfaces are assumed |
| and a list of multiple sockets will be returned (most likely |
| one for IPv4 and another one for IPv6). |
| |
| family can be set to either AF_INET or AF_INET6 to force the |
| socket to use IPv4 or IPv6. If not set it will be determined |
| from host (defaults to AF_UNSPEC). |
| |
| flags is a bitmask for getaddrinfo(). |
| |
| sock can optionally be specified in order to use a preexisting |
| socket object. |
| |
| backlog is the maximum number of queued connections passed to |
| listen() (defaults to 100). |
| |
| ssl can be set to an SSLContext to enable SSL over the |
| accepted connections. |
| |
| reuse_address tells the kernel to reuse a local socket in |
| TIME_WAIT state, without waiting for its natural timeout to |
| expire. If not specified will automatically be set to True on |
| UNIX. |
| """ |
| raise NotImplementedError |
| |
| def create_unix_connection(self, protocol_factory, path, *, |
| ssl=None, sock=None, |
| server_hostname=None): |
| raise NotImplementedError |
| |
| def create_unix_server(self, protocol_factory, path, *, |
| sock=None, backlog=100, ssl=None): |
| """A coroutine which creates a UNIX Domain Socket server. |
| |
| The return value is a Server object, which can be used to stop |
| the service. |
| |
| path is a str, representing a file systsem path to bind the |
| server socket to. |
| |
| sock can optionally be specified in order to use a preexisting |
| socket object. |
| |
| backlog is the maximum number of queued connections passed to |
| listen() (defaults to 100). |
| |
| ssl can be set to an SSLContext to enable SSL over the |
| accepted connections. |
| """ |
| raise NotImplementedError |
| |
| def create_datagram_endpoint(self, protocol_factory, |
| local_addr=None, remote_addr=None, *, |
| family=0, proto=0, flags=0): |
| raise NotImplementedError |
| |
| # Pipes and subprocesses. |
| |
| def connect_read_pipe(self, protocol_factory, pipe): |
| """Register read pipe in event loop. Set the pipe to non-blocking mode. |
| |
| protocol_factory should instantiate object with Protocol interface. |
| pipe is a file-like object. |
| Return pair (transport, protocol), where transport supports the |
| ReadTransport interface.""" |
| # The reason to accept file-like object instead of just file descriptor |
| # is: we need to own pipe and close it at transport finishing |
| # Can got complicated errors if pass f.fileno(), |
| # close fd in pipe transport then close f and vise versa. |
| raise NotImplementedError |
| |
| def connect_write_pipe(self, protocol_factory, pipe): |
| """Register write pipe in event loop. |
| |
| protocol_factory should instantiate object with BaseProtocol interface. |
| Pipe is file-like object already switched to nonblocking. |
| Return pair (transport, protocol), where transport support |
| WriteTransport interface.""" |
| # The reason to accept file-like object instead of just file descriptor |
| # is: we need to own pipe and close it at transport finishing |
| # Can got complicated errors if pass f.fileno(), |
| # close fd in pipe transport then close f and vise versa. |
| raise NotImplementedError |
| |
| def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE, |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
| **kwargs): |
| raise NotImplementedError |
| |
| def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE, |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
| **kwargs): |
| raise NotImplementedError |
| |
| # Ready-based callback registration methods. |
| # The add_*() methods return None. |
| # The remove_*() methods return True if something was removed, |
| # False if there was nothing to delete. |
| |
| def add_reader(self, fd, callback, *args): |
| raise NotImplementedError |
| |
| def remove_reader(self, fd): |
| raise NotImplementedError |
| |
| def add_writer(self, fd, callback, *args): |
| raise NotImplementedError |
| |
| def remove_writer(self, fd): |
| raise NotImplementedError |
| |
| # Completion based I/O methods returning Futures. |
| |
| def sock_recv(self, sock, nbytes): |
| raise NotImplementedError |
| |
| def sock_sendall(self, sock, data): |
| raise NotImplementedError |
| |
| def sock_connect(self, sock, address): |
| raise NotImplementedError |
| |
| def sock_accept(self, sock): |
| raise NotImplementedError |
| |
| # Signal handling. |
| |
| def add_signal_handler(self, sig, callback, *args): |
| raise NotImplementedError |
| |
| def remove_signal_handler(self, sig): |
| raise NotImplementedError |
| |
| # Error handlers. |
| |
| def set_exception_handler(self, handler): |
| raise NotImplementedError |
| |
| def default_exception_handler(self, context): |
| raise NotImplementedError |
| |
| def call_exception_handler(self, context): |
| raise NotImplementedError |
| |
| # Debug flag management. |
| |
| def get_debug(self): |
| raise NotImplementedError |
| |
| def set_debug(self, enabled): |
| raise NotImplementedError |
| |
| |
| class AbstractEventLoopPolicy: |
| """Abstract policy for accessing the event loop.""" |
| |
| def get_event_loop(self): |
| """Get the event loop for the current context. |
| |
| Returns an event loop object implementing the BaseEventLoop interface, |
| or raises an exception in case no event loop has been set for the |
| current context and the current policy does not specify to create one. |
| |
| It should never return None.""" |
| raise NotImplementedError |
| |
| def set_event_loop(self, loop): |
| """Set the event loop for the current context to loop.""" |
| raise NotImplementedError |
| |
| def new_event_loop(self): |
| """Create and return a new event loop object according to this |
| policy's rules. If there's need to set this loop as the event loop for |
| the current context, set_event_loop must be called explicitly.""" |
| raise NotImplementedError |
| |
| # Child processes handling (Unix only). |
| |
| def get_child_watcher(self): |
| "Get the watcher for child processes." |
| raise NotImplementedError |
| |
| def set_child_watcher(self, watcher): |
| """Set the watcher for child processes.""" |
| raise NotImplementedError |
| |
| |
| class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): |
| """Default policy implementation for accessing the event loop. |
| |
| In this policy, each thread has its own event loop. However, we |
| only automatically create an event loop by default for the main |
| thread; other threads by default have no event loop. |
| |
| Other policies may have different rules (e.g. a single global |
| event loop, or automatically creating an event loop per thread, or |
| using some other notion of context to which an event loop is |
| associated). |
| """ |
| |
| _loop_factory = None |
| |
| class _Local(threading.local): |
| _loop = None |
| _set_called = False |
| |
| def __init__(self): |
| self._local = self._Local() |
| |
| def get_event_loop(self): |
| """Get the event loop. |
| |
| This may be None or an instance of EventLoop. |
| """ |
| if (self._local._loop is None and |
| not self._local._set_called and |
| isinstance(threading.current_thread(), threading._MainThread)): |
| self.set_event_loop(self.new_event_loop()) |
| if self._local._loop is None: |
| raise RuntimeError('There is no current event loop in thread %r.' |
| % threading.current_thread().name) |
| return self._local._loop |
| |
| def set_event_loop(self, loop): |
| """Set the event loop.""" |
| self._local._set_called = True |
| assert loop is None or isinstance(loop, AbstractEventLoop) |
| self._local._loop = loop |
| |
| def new_event_loop(self): |
| """Create a new event loop. |
| |
| You must call set_event_loop() to make this the current event |
| loop. |
| """ |
| return self._loop_factory() |
| |
| |
| # Event loop policy. The policy itself is always global, even if the |
| # policy's rules say that there is an event loop per thread (or other |
| # notion of context). The default policy is installed by the first |
| # call to get_event_loop_policy(). |
| _event_loop_policy = None |
| |
| # Lock for protecting the on-the-fly creation of the event loop policy. |
| _lock = threading.Lock() |
| |
| |
| def _init_event_loop_policy(): |
| global _event_loop_policy |
| with _lock: |
| if _event_loop_policy is None: # pragma: no branch |
| from . import DefaultEventLoopPolicy |
| _event_loop_policy = DefaultEventLoopPolicy() |
| |
| |
| def get_event_loop_policy(): |
| """Get the current event loop policy.""" |
| if _event_loop_policy is None: |
| _init_event_loop_policy() |
| return _event_loop_policy |
| |
| |
| def set_event_loop_policy(policy): |
| """Set the current event loop policy. |
| |
| If policy is None, the default policy is restored.""" |
| global _event_loop_policy |
| assert policy is None or isinstance(policy, AbstractEventLoopPolicy) |
| _event_loop_policy = policy |
| |
| |
| def get_event_loop(): |
| """Equivalent to calling get_event_loop_policy().get_event_loop().""" |
| return get_event_loop_policy().get_event_loop() |
| |
| |
| def set_event_loop(loop): |
| """Equivalent to calling get_event_loop_policy().set_event_loop(loop).""" |
| get_event_loop_policy().set_event_loop(loop) |
| |
| |
| def new_event_loop(): |
| """Equivalent to calling get_event_loop_policy().new_event_loop().""" |
| return get_event_loop_policy().new_event_loop() |
| |
| |
| def get_child_watcher(): |
| """Equivalent to calling get_event_loop_policy().get_child_watcher().""" |
| return get_event_loop_policy().get_child_watcher() |
| |
| |
| def set_child_watcher(watcher): |
| """Equivalent to calling |
| get_event_loop_policy().set_child_watcher(watcher).""" |
| return get_event_loop_policy().set_child_watcher(watcher) |