|  | """Event loop and event loop policy.""" | 
|  |  | 
|  | __all__ = ['AbstractEventLoopPolicy', | 
|  | 'AbstractEventLoop', 'AbstractServer', | 
|  | 'Handle', 'TimerHandle', | 
|  | 'get_event_loop_policy', 'set_event_loop_policy', | 
|  | 'get_event_loop', 'set_event_loop', 'new_event_loop', | 
|  | 'get_child_watcher', 'set_child_watcher', | 
|  | ] | 
|  |  | 
|  | import functools | 
|  | import inspect | 
|  | import reprlib | 
|  | import socket | 
|  | import subprocess | 
|  | import sys | 
|  | import threading | 
|  | import traceback | 
|  |  | 
|  |  | 
|  | _PY34 = sys.version_info >= (3, 4) | 
|  |  | 
|  |  | 
|  | def _get_function_source(func): | 
|  | if _PY34: | 
|  | func = inspect.unwrap(func) | 
|  | elif hasattr(func, '__wrapped__'): | 
|  | func = func.__wrapped__ | 
|  | if inspect.isfunction(func): | 
|  | code = func.__code__ | 
|  | return (code.co_filename, code.co_firstlineno) | 
|  | if isinstance(func, functools.partial): | 
|  | return _get_function_source(func.func) | 
|  | if _PY34 and isinstance(func, functools.partialmethod): | 
|  | return _get_function_source(func.func) | 
|  | return None | 
|  |  | 
|  |  | 
|  | def _format_args(args): | 
|  | """Format function arguments. | 
|  |  | 
|  | Special case for a single parameter: ('hello',) is formatted as ('hello'). | 
|  | """ | 
|  | # use reprlib to limit the length of the output | 
|  | args_repr = reprlib.repr(args) | 
|  | if len(args) == 1 and args_repr.endswith(',)'): | 
|  | args_repr = args_repr[:-2] + ')' | 
|  | return args_repr | 
|  |  | 
|  |  | 
|  | def _format_callback(func, args, suffix=''): | 
|  | if isinstance(func, functools.partial): | 
|  | if args is not None: | 
|  | suffix = _format_args(args) + suffix | 
|  | return _format_callback(func.func, func.args, suffix) | 
|  |  | 
|  | func_repr = getattr(func, '__qualname__', None) | 
|  | if not func_repr: | 
|  | func_repr = repr(func) | 
|  |  | 
|  | if args is not None: | 
|  | func_repr += _format_args(args) | 
|  | if suffix: | 
|  | func_repr += suffix | 
|  |  | 
|  | source = _get_function_source(func) | 
|  | if source: | 
|  | func_repr += ' at %s:%s' % source | 
|  | return func_repr | 
|  |  | 
|  |  | 
|  | class Handle: | 
|  | """Object returned by callback registration methods.""" | 
|  |  | 
|  | __slots__ = ('_callback', '_args', '_cancelled', '_loop', | 
|  | '_source_traceback', '_repr', '__weakref__') | 
|  |  | 
|  | def __init__(self, callback, args, loop): | 
|  | assert not isinstance(callback, Handle), 'A Handle is not a callback' | 
|  | self._loop = loop | 
|  | self._callback = callback | 
|  | self._args = args | 
|  | self._cancelled = False | 
|  | self._repr = None | 
|  | if self._loop.get_debug(): | 
|  | self._source_traceback = traceback.extract_stack(sys._getframe(1)) | 
|  | else: | 
|  | self._source_traceback = None | 
|  |  | 
|  | def _repr_info(self): | 
|  | info = [self.__class__.__name__] | 
|  | if self._cancelled: | 
|  | info.append('cancelled') | 
|  | if self._callback is not None: | 
|  | info.append(_format_callback(self._callback, self._args)) | 
|  | if self._source_traceback: | 
|  | frame = self._source_traceback[-1] | 
|  | info.append('created at %s:%s' % (frame[0], frame[1])) | 
|  | return info | 
|  |  | 
|  | def __repr__(self): | 
|  | if self._repr is not None: | 
|  | return self._repr | 
|  | info = self._repr_info() | 
|  | return '<%s>' % ' '.join(info) | 
|  |  | 
|  | def cancel(self): | 
|  | if not self._cancelled: | 
|  | self._cancelled = True | 
|  | if self._loop.get_debug(): | 
|  | # Keep a representation in debug mode to keep callback and | 
|  | # parameters. For example, to log the warning | 
|  | # "Executing <Handle...> took 2.5 second" | 
|  | self._repr = repr(self) | 
|  | self._callback = None | 
|  | self._args = None | 
|  |  | 
|  | def _run(self): | 
|  | try: | 
|  | self._callback(*self._args) | 
|  | except Exception as exc: | 
|  | cb = _format_callback(self._callback, self._args) | 
|  | msg = 'Exception in callback {}'.format(cb) | 
|  | context = { | 
|  | 'message': msg, | 
|  | 'exception': exc, | 
|  | 'handle': self, | 
|  | } | 
|  | if self._source_traceback: | 
|  | context['source_traceback'] = self._source_traceback | 
|  | self._loop.call_exception_handler(context) | 
|  | self = None  # Needed to break cycles when an exception occurs. | 
|  |  | 
|  |  | 
|  | class TimerHandle(Handle): | 
|  | """Object returned by timed callback registration methods.""" | 
|  |  | 
|  | __slots__ = ['_scheduled', '_when'] | 
|  |  | 
|  | def __init__(self, when, callback, args, loop): | 
|  | assert when is not None | 
|  | super().__init__(callback, args, loop) | 
|  | if self._source_traceback: | 
|  | del self._source_traceback[-1] | 
|  | self._when = when | 
|  | self._scheduled = False | 
|  |  | 
|  | def _repr_info(self): | 
|  | info = super()._repr_info() | 
|  | pos = 2 if self._cancelled else 1 | 
|  | info.insert(pos, 'when=%s' % self._when) | 
|  | return info | 
|  |  | 
|  | def __hash__(self): | 
|  | return hash(self._when) | 
|  |  | 
|  | def __lt__(self, other): | 
|  | return self._when < other._when | 
|  |  | 
|  | def __le__(self, other): | 
|  | if self._when < other._when: | 
|  | return True | 
|  | return self.__eq__(other) | 
|  |  | 
|  | def __gt__(self, other): | 
|  | return self._when > other._when | 
|  |  | 
|  | def __ge__(self, other): | 
|  | if self._when > other._when: | 
|  | return True | 
|  | return self.__eq__(other) | 
|  |  | 
|  | def __eq__(self, other): | 
|  | if isinstance(other, TimerHandle): | 
|  | return (self._when == other._when and | 
|  | self._callback == other._callback and | 
|  | self._args == other._args and | 
|  | self._cancelled == other._cancelled) | 
|  | return NotImplemented | 
|  |  | 
|  | def cancel(self): | 
|  | if not self._cancelled: | 
|  | self._loop._timer_handle_cancelled(self) | 
|  | super().cancel() | 
|  |  | 
|  |  | 
|  | class AbstractServer: | 
|  | """Abstract server returned by create_server().""" | 
|  |  | 
|  | def close(self): | 
|  | """Stop serving.  This leaves existing connections open.""" | 
|  | return NotImplemented | 
|  |  | 
|  | def wait_closed(self): | 
|  | """Coroutine to wait until service is closed.""" | 
|  | return NotImplemented | 
|  |  | 
|  |  | 
|  | class AbstractEventLoop: | 
|  | """Abstract event loop.""" | 
|  |  | 
|  | # Running and stopping the event loop. | 
|  |  | 
|  | def run_forever(self): | 
|  | """Run the event loop until stop() is called.""" | 
|  | raise NotImplementedError | 
|  |  | 
|  | def run_until_complete(self, future): | 
|  | """Run the event loop until a Future is done. | 
|  |  | 
|  | Return the Future's result, or raise its exception. | 
|  | """ | 
|  | raise NotImplementedError | 
|  |  | 
|  | def stop(self): | 
|  | """Stop the event loop as soon as reasonable. | 
|  |  | 
|  | Exactly how soon that is may depend on the implementation, but | 
|  | no more I/O callbacks should be scheduled. | 
|  | """ | 
|  | raise NotImplementedError | 
|  |  | 
|  | def is_running(self): | 
|  | """Return whether the event loop is currently running.""" | 
|  | raise NotImplementedError | 
|  |  | 
|  | def is_closed(self): | 
|  | """Returns True if the event loop was closed.""" | 
|  | raise NotImplementedError | 
|  |  | 
|  | def close(self): | 
|  | """Close the loop. | 
|  |  | 
|  | The loop should not be running. | 
|  |  | 
|  | This is idempotent and irreversible. | 
|  |  | 
|  | No other methods should be called after this one. | 
|  | """ | 
|  | raise NotImplementedError | 
|  |  | 
|  | # Methods scheduling callbacks.  All these return Handles. | 
|  |  | 
|  | def _timer_handle_cancelled(self, handle): | 
|  | """Notification that a TimerHandle has been cancelled.""" | 
|  | raise NotImplementedError | 
|  |  | 
|  | def call_soon(self, callback, *args): | 
|  | return self.call_later(0, callback, *args) | 
|  |  | 
|  | def call_later(self, delay, callback, *args): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def call_at(self, when, callback, *args): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def time(self): | 
|  | raise NotImplementedError | 
|  |  | 
|  | # Method scheduling a coroutine object: create a task. | 
|  |  | 
|  | def create_task(self, coro): | 
|  | raise NotImplementedError | 
|  |  | 
|  | # Methods for interacting with threads. | 
|  |  | 
|  | def call_soon_threadsafe(self, callback, *args): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def run_in_executor(self, executor, callback, *args): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def set_default_executor(self, executor): | 
|  | raise NotImplementedError | 
|  |  | 
|  | # Network I/O methods returning Futures. | 
|  |  | 
|  | def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def getnameinfo(self, sockaddr, flags=0): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def create_connection(self, protocol_factory, host=None, port=None, *, | 
|  | ssl=None, family=0, proto=0, flags=0, sock=None, | 
|  | local_addr=None, server_hostname=None): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def create_server(self, protocol_factory, host=None, port=None, *, | 
|  | family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, | 
|  | sock=None, backlog=100, ssl=None, reuse_address=None): | 
|  | """A coroutine which creates a TCP server bound to host and port. | 
|  |  | 
|  | The return value is a Server object which can be used to stop | 
|  | the service. | 
|  |  | 
|  | If host is an empty string or None all interfaces are assumed | 
|  | and a list of multiple sockets will be returned (most likely | 
|  | one for IPv4 and another one for IPv6). | 
|  |  | 
|  | family can be set to either AF_INET or AF_INET6 to force the | 
|  | socket to use IPv4 or IPv6. If not set it will be determined | 
|  | from host (defaults to AF_UNSPEC). | 
|  |  | 
|  | flags is a bitmask for getaddrinfo(). | 
|  |  | 
|  | sock can optionally be specified in order to use a preexisting | 
|  | socket object. | 
|  |  | 
|  | backlog is the maximum number of queued connections passed to | 
|  | listen() (defaults to 100). | 
|  |  | 
|  | ssl can be set to an SSLContext to enable SSL over the | 
|  | accepted connections. | 
|  |  | 
|  | reuse_address tells the kernel to reuse a local socket in | 
|  | TIME_WAIT state, without waiting for its natural timeout to | 
|  | expire. If not specified will automatically be set to True on | 
|  | UNIX. | 
|  | """ | 
|  | raise NotImplementedError | 
|  |  | 
|  | def create_unix_connection(self, protocol_factory, path, *, | 
|  | ssl=None, sock=None, | 
|  | server_hostname=None): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def create_unix_server(self, protocol_factory, path, *, | 
|  | sock=None, backlog=100, ssl=None): | 
|  | """A coroutine which creates a UNIX Domain Socket server. | 
|  |  | 
|  | The return value is a Server object, which can be used to stop | 
|  | the service. | 
|  |  | 
|  | path is a str, representing a file systsem path to bind the | 
|  | server socket to. | 
|  |  | 
|  | sock can optionally be specified in order to use a preexisting | 
|  | socket object. | 
|  |  | 
|  | backlog is the maximum number of queued connections passed to | 
|  | listen() (defaults to 100). | 
|  |  | 
|  | ssl can be set to an SSLContext to enable SSL over the | 
|  | accepted connections. | 
|  | """ | 
|  | raise NotImplementedError | 
|  |  | 
|  | def create_datagram_endpoint(self, protocol_factory, | 
|  | local_addr=None, remote_addr=None, *, | 
|  | family=0, proto=0, flags=0): | 
|  | raise NotImplementedError | 
|  |  | 
|  | # Pipes and subprocesses. | 
|  |  | 
|  | def connect_read_pipe(self, protocol_factory, pipe): | 
|  | """Register read pipe in event loop. Set the pipe to non-blocking mode. | 
|  |  | 
|  | protocol_factory should instantiate object with Protocol interface. | 
|  | pipe is a file-like object. | 
|  | Return pair (transport, protocol), where transport supports the | 
|  | ReadTransport interface.""" | 
|  | # The reason to accept file-like object instead of just file descriptor | 
|  | # is: we need to own pipe and close it at transport finishing | 
|  | # Can got complicated errors if pass f.fileno(), | 
|  | # close fd in pipe transport then close f and vise versa. | 
|  | raise NotImplementedError | 
|  |  | 
|  | def connect_write_pipe(self, protocol_factory, pipe): | 
|  | """Register write pipe in event loop. | 
|  |  | 
|  | protocol_factory should instantiate object with BaseProtocol interface. | 
|  | Pipe is file-like object already switched to nonblocking. | 
|  | Return pair (transport, protocol), where transport support | 
|  | WriteTransport interface.""" | 
|  | # The reason to accept file-like object instead of just file descriptor | 
|  | # is: we need to own pipe and close it at transport finishing | 
|  | # Can got complicated errors if pass f.fileno(), | 
|  | # close fd in pipe transport then close f and vise versa. | 
|  | raise NotImplementedError | 
|  |  | 
|  | def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE, | 
|  | stdout=subprocess.PIPE, stderr=subprocess.PIPE, | 
|  | **kwargs): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE, | 
|  | stdout=subprocess.PIPE, stderr=subprocess.PIPE, | 
|  | **kwargs): | 
|  | raise NotImplementedError | 
|  |  | 
|  | # Ready-based callback registration methods. | 
|  | # The add_*() methods return None. | 
|  | # The remove_*() methods return True if something was removed, | 
|  | # False if there was nothing to delete. | 
|  |  | 
|  | def add_reader(self, fd, callback, *args): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def remove_reader(self, fd): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def add_writer(self, fd, callback, *args): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def remove_writer(self, fd): | 
|  | raise NotImplementedError | 
|  |  | 
|  | # Completion based I/O methods returning Futures. | 
|  |  | 
|  | def sock_recv(self, sock, nbytes): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def sock_sendall(self, sock, data): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def sock_connect(self, sock, address): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def sock_accept(self, sock): | 
|  | raise NotImplementedError | 
|  |  | 
|  | # Signal handling. | 
|  |  | 
|  | def add_signal_handler(self, sig, callback, *args): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def remove_signal_handler(self, sig): | 
|  | raise NotImplementedError | 
|  |  | 
|  | # Error handlers. | 
|  |  | 
|  | def set_exception_handler(self, handler): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def default_exception_handler(self, context): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def call_exception_handler(self, context): | 
|  | raise NotImplementedError | 
|  |  | 
|  | # Debug flag management. | 
|  |  | 
|  | def get_debug(self): | 
|  | raise NotImplementedError | 
|  |  | 
|  | def set_debug(self, enabled): | 
|  | raise NotImplementedError | 
|  |  | 
|  |  | 
|  | class AbstractEventLoopPolicy: | 
|  | """Abstract policy for accessing the event loop.""" | 
|  |  | 
|  | def get_event_loop(self): | 
|  | """Get the event loop for the current context. | 
|  |  | 
|  | Returns an event loop object implementing the BaseEventLoop interface, | 
|  | or raises an exception in case no event loop has been set for the | 
|  | current context and the current policy does not specify to create one. | 
|  |  | 
|  | It should never return None.""" | 
|  | raise NotImplementedError | 
|  |  | 
|  | def set_event_loop(self, loop): | 
|  | """Set the event loop for the current context to loop.""" | 
|  | raise NotImplementedError | 
|  |  | 
|  | def new_event_loop(self): | 
|  | """Create and return a new event loop object according to this | 
|  | policy's rules. If there's need to set this loop as the event loop for | 
|  | the current context, set_event_loop must be called explicitly.""" | 
|  | raise NotImplementedError | 
|  |  | 
|  | # Child processes handling (Unix only). | 
|  |  | 
|  | def get_child_watcher(self): | 
|  | "Get the watcher for child processes." | 
|  | raise NotImplementedError | 
|  |  | 
|  | def set_child_watcher(self, watcher): | 
|  | """Set the watcher for child processes.""" | 
|  | raise NotImplementedError | 
|  |  | 
|  |  | 
|  | class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy): | 
|  | """Default policy implementation for accessing the event loop. | 
|  |  | 
|  | In this policy, each thread has its own event loop.  However, we | 
|  | only automatically create an event loop by default for the main | 
|  | thread; other threads by default have no event loop. | 
|  |  | 
|  | Other policies may have different rules (e.g. a single global | 
|  | event loop, or automatically creating an event loop per thread, or | 
|  | using some other notion of context to which an event loop is | 
|  | associated). | 
|  | """ | 
|  |  | 
|  | _loop_factory = None | 
|  |  | 
|  | class _Local(threading.local): | 
|  | _loop = None | 
|  | _set_called = False | 
|  |  | 
|  | def __init__(self): | 
|  | self._local = self._Local() | 
|  |  | 
|  | def get_event_loop(self): | 
|  | """Get the event loop. | 
|  |  | 
|  | This may be None or an instance of EventLoop. | 
|  | """ | 
|  | if (self._local._loop is None and | 
|  | not self._local._set_called and | 
|  | isinstance(threading.current_thread(), threading._MainThread)): | 
|  | self.set_event_loop(self.new_event_loop()) | 
|  | if self._local._loop is None: | 
|  | raise RuntimeError('There is no current event loop in thread %r.' | 
|  | % threading.current_thread().name) | 
|  | return self._local._loop | 
|  |  | 
|  | def set_event_loop(self, loop): | 
|  | """Set the event loop.""" | 
|  | self._local._set_called = True | 
|  | assert loop is None or isinstance(loop, AbstractEventLoop) | 
|  | self._local._loop = loop | 
|  |  | 
|  | def new_event_loop(self): | 
|  | """Create a new event loop. | 
|  |  | 
|  | You must call set_event_loop() to make this the current event | 
|  | loop. | 
|  | """ | 
|  | return self._loop_factory() | 
|  |  | 
|  |  | 
|  | # Event loop policy.  The policy itself is always global, even if the | 
|  | # policy's rules say that there is an event loop per thread (or other | 
|  | # notion of context).  The default policy is installed by the first | 
|  | # call to get_event_loop_policy(). | 
|  | _event_loop_policy = None | 
|  |  | 
|  | # Lock for protecting the on-the-fly creation of the event loop policy. | 
|  | _lock = threading.Lock() | 
|  |  | 
|  |  | 
|  | def _init_event_loop_policy(): | 
|  | global _event_loop_policy | 
|  | with _lock: | 
|  | if _event_loop_policy is None:  # pragma: no branch | 
|  | from . import DefaultEventLoopPolicy | 
|  | _event_loop_policy = DefaultEventLoopPolicy() | 
|  |  | 
|  |  | 
|  | def get_event_loop_policy(): | 
|  | """Get the current event loop policy.""" | 
|  | if _event_loop_policy is None: | 
|  | _init_event_loop_policy() | 
|  | return _event_loop_policy | 
|  |  | 
|  |  | 
|  | def set_event_loop_policy(policy): | 
|  | """Set the current event loop policy. | 
|  |  | 
|  | If policy is None, the default policy is restored.""" | 
|  | global _event_loop_policy | 
|  | assert policy is None or isinstance(policy, AbstractEventLoopPolicy) | 
|  | _event_loop_policy = policy | 
|  |  | 
|  |  | 
|  | def get_event_loop(): | 
|  | """Equivalent to calling get_event_loop_policy().get_event_loop().""" | 
|  | return get_event_loop_policy().get_event_loop() | 
|  |  | 
|  |  | 
|  | def set_event_loop(loop): | 
|  | """Equivalent to calling get_event_loop_policy().set_event_loop(loop).""" | 
|  | get_event_loop_policy().set_event_loop(loop) | 
|  |  | 
|  |  | 
|  | def new_event_loop(): | 
|  | """Equivalent to calling get_event_loop_policy().new_event_loop().""" | 
|  | return get_event_loop_policy().new_event_loop() | 
|  |  | 
|  |  | 
|  | def get_child_watcher(): | 
|  | """Equivalent to calling get_event_loop_policy().get_child_watcher().""" | 
|  | return get_event_loop_policy().get_child_watcher() | 
|  |  | 
|  |  | 
|  | def set_child_watcher(watcher): | 
|  | """Equivalent to calling | 
|  | get_event_loop_policy().set_child_watcher(watcher).""" | 
|  | return get_event_loop_policy().set_child_watcher(watcher) |