blob: 405448225e71c7e27e7d3d895080c4c5cfbe6427 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop and event loop policy."""
2
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08003__all__ = ['AbstractEventLoopPolicy',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'AbstractEventLoop', 'AbstractServer',
5 'Handle', 'TimerHandle',
6 'get_event_loop_policy', 'set_event_loop_policy',
7 'get_event_loop', 'set_event_loop', 'new_event_loop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08008 'get_child_watcher', 'set_child_watcher',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009 ]
10
Victor Stinner307bccc2014-06-12 18:39:26 +020011import functools
12import inspect
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013import subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import threading
15import socket
Victor Stinner307bccc2014-06-12 18:39:26 +020016import sys
17
18
19_PY34 = sys.version_info >= (3, 4)
20
Victor Stinner975735f2014-06-25 21:41:58 +020021
Victor Stinner307bccc2014-06-12 18:39:26 +020022def _get_function_source(func):
23 if _PY34:
24 func = inspect.unwrap(func)
25 elif hasattr(func, '__wrapped__'):
26 func = func.__wrapped__
27 if inspect.isfunction(func):
28 code = func.__code__
29 return (code.co_filename, code.co_firstlineno)
30 if isinstance(func, functools.partial):
31 return _get_function_source(func.func)
32 if _PY34 and isinstance(func, functools.partialmethod):
33 return _get_function_source(func.func)
34 return None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070036
Victor Stinner975735f2014-06-25 21:41:58 +020037def _format_args(args):
38 # function formatting ('hello',) as ('hello')
39 args_repr = repr(args)
40 if len(args) == 1 and args_repr.endswith(',)'):
41 args_repr = args_repr[:-2] + ')'
42 return args_repr
43
44
45def _format_callback(func, args, suffix=''):
46 if isinstance(func, functools.partial):
47 if args is not None:
48 suffix = _format_args(args) + suffix
49 return _format_callback(func.func, func.args, suffix)
50
51 func_repr = getattr(func, '__qualname__', None)
52 if not func_repr:
53 func_repr = repr(func)
54
55 if args is not None:
56 func_repr += _format_args(args)
57 if suffix:
58 func_repr += suffix
59
60 source = _get_function_source(func)
61 if source:
62 func_repr += ' at %s:%s' % source
63 return func_repr
64
65
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070066class Handle:
67 """Object returned by callback registration methods."""
68
Guido van Rossum94ba1462014-04-27 10:44:22 -070069 __slots__ = ['_callback', '_args', '_cancelled', '_loop', '__weakref__']
Yury Selivanovb1317782014-02-12 17:01:52 -050070
Yury Selivanov569efa22014-02-18 18:02:19 -050071 def __init__(self, callback, args, loop):
Victor Stinnerdc62b7e2014-02-10 00:45:44 +010072 assert not isinstance(callback, Handle), 'A Handle is not a callback'
Yury Selivanov569efa22014-02-18 18:02:19 -050073 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070074 self._callback = callback
75 self._args = args
76 self._cancelled = False
77
78 def __repr__(self):
Victor Stinner975735f2014-06-25 21:41:58 +020079 info = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070080 if self._cancelled:
Victor Stinner975735f2014-06-25 21:41:58 +020081 info.append('cancelled')
82 info.append(_format_callback(self._callback, self._args))
83 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070084
85 def cancel(self):
86 self._cancelled = True
87
88 def _run(self):
89 try:
90 self._callback(*self._args)
Yury Selivanov569efa22014-02-18 18:02:19 -050091 except Exception as exc:
92 msg = 'Exception in callback {}{!r}'.format(self._callback,
93 self._args)
94 self._loop.call_exception_handler({
95 'message': msg,
96 'exception': exc,
97 'handle': self,
98 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070099 self = None # Needed to break cycles when an exception occurs.
100
101
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700102class TimerHandle(Handle):
103 """Object returned by timed callback registration methods."""
104
Yury Selivanovb1317782014-02-12 17:01:52 -0500105 __slots__ = ['_when']
106
Yury Selivanov569efa22014-02-18 18:02:19 -0500107 def __init__(self, when, callback, args, loop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700108 assert when is not None
Yury Selivanov569efa22014-02-18 18:02:19 -0500109 super().__init__(callback, args, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110
111 self._when = when
112
113 def __repr__(self):
Victor Stinner975735f2014-06-25 21:41:58 +0200114 info = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700115 if self._cancelled:
Victor Stinner975735f2014-06-25 21:41:58 +0200116 info.append('cancelled')
117 info.append('when=%s' % self._when)
118 info.append(_format_callback(self._callback, self._args))
119 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700120
121 def __hash__(self):
122 return hash(self._when)
123
124 def __lt__(self, other):
125 return self._when < other._when
126
127 def __le__(self, other):
128 if self._when < other._when:
129 return True
130 return self.__eq__(other)
131
132 def __gt__(self, other):
133 return self._when > other._when
134
135 def __ge__(self, other):
136 if self._when > other._when:
137 return True
138 return self.__eq__(other)
139
140 def __eq__(self, other):
141 if isinstance(other, TimerHandle):
142 return (self._when == other._when and
143 self._callback == other._callback and
144 self._args == other._args and
145 self._cancelled == other._cancelled)
146 return NotImplemented
147
148 def __ne__(self, other):
149 equal = self.__eq__(other)
150 return NotImplemented if equal is NotImplemented else not equal
151
152
153class AbstractServer:
Victor Stinnercf6f72e2013-12-03 18:23:52 +0100154 """Abstract server returned by create_server()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700155
156 def close(self):
157 """Stop serving. This leaves existing connections open."""
158 return NotImplemented
159
160 def wait_closed(self):
161 """Coroutine to wait until service is closed."""
162 return NotImplemented
163
164
165class AbstractEventLoop:
166 """Abstract event loop."""
167
168 # Running and stopping the event loop.
169
170 def run_forever(self):
171 """Run the event loop until stop() is called."""
172 raise NotImplementedError
173
174 def run_until_complete(self, future):
175 """Run the event loop until a Future is done.
176
177 Return the Future's result, or raise its exception.
178 """
179 raise NotImplementedError
180
181 def stop(self):
182 """Stop the event loop as soon as reasonable.
183
184 Exactly how soon that is may depend on the implementation, but
185 no more I/O callbacks should be scheduled.
186 """
187 raise NotImplementedError
188
189 def is_running(self):
190 """Return whether the event loop is currently running."""
191 raise NotImplementedError
192
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700193 def close(self):
194 """Close the loop.
195
196 The loop should not be running.
197
198 This is idempotent and irreversible.
199
200 No other methods should be called after this one.
201 """
202 raise NotImplementedError
203
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204 # Methods scheduling callbacks. All these return Handles.
205
206 def call_soon(self, callback, *args):
207 return self.call_later(0, callback, *args)
208
209 def call_later(self, delay, callback, *args):
210 raise NotImplementedError
211
212 def call_at(self, when, callback, *args):
213 raise NotImplementedError
214
215 def time(self):
216 raise NotImplementedError
217
218 # Methods for interacting with threads.
219
220 def call_soon_threadsafe(self, callback, *args):
221 raise NotImplementedError
222
223 def run_in_executor(self, executor, callback, *args):
224 raise NotImplementedError
225
226 def set_default_executor(self, executor):
227 raise NotImplementedError
228
229 # Network I/O methods returning Futures.
230
231 def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
232 raise NotImplementedError
233
234 def getnameinfo(self, sockaddr, flags=0):
235 raise NotImplementedError
236
237 def create_connection(self, protocol_factory, host=None, port=None, *,
238 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700239 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240 raise NotImplementedError
241
242 def create_server(self, protocol_factory, host=None, port=None, *,
243 family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
244 sock=None, backlog=100, ssl=None, reuse_address=None):
245 """A coroutine which creates a TCP server bound to host and port.
246
247 The return value is a Server object which can be used to stop
248 the service.
249
250 If host is an empty string or None all interfaces are assumed
251 and a list of multiple sockets will be returned (most likely
252 one for IPv4 and another one for IPv6).
253
254 family can be set to either AF_INET or AF_INET6 to force the
255 socket to use IPv4 or IPv6. If not set it will be determined
256 from host (defaults to AF_UNSPEC).
257
258 flags is a bitmask for getaddrinfo().
259
260 sock can optionally be specified in order to use a preexisting
261 socket object.
262
263 backlog is the maximum number of queued connections passed to
264 listen() (defaults to 100).
265
266 ssl can be set to an SSLContext to enable SSL over the
267 accepted connections.
268
269 reuse_address tells the kernel to reuse a local socket in
270 TIME_WAIT state, without waiting for its natural timeout to
271 expire. If not specified will automatically be set to True on
272 UNIX.
273 """
274 raise NotImplementedError
275
Yury Selivanovb057c522014-02-18 12:15:06 -0500276 def create_unix_connection(self, protocol_factory, path, *,
277 ssl=None, sock=None,
278 server_hostname=None):
279 raise NotImplementedError
280
281 def create_unix_server(self, protocol_factory, path, *,
282 sock=None, backlog=100, ssl=None):
283 """A coroutine which creates a UNIX Domain Socket server.
284
Yury Selivanovdec1a452014-02-18 22:27:48 -0500285 The return value is a Server object, which can be used to stop
Yury Selivanovb057c522014-02-18 12:15:06 -0500286 the service.
287
288 path is a str, representing a file systsem path to bind the
289 server socket to.
290
291 sock can optionally be specified in order to use a preexisting
292 socket object.
293
294 backlog is the maximum number of queued connections passed to
295 listen() (defaults to 100).
296
297 ssl can be set to an SSLContext to enable SSL over the
298 accepted connections.
299 """
300 raise NotImplementedError
301
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302 def create_datagram_endpoint(self, protocol_factory,
303 local_addr=None, remote_addr=None, *,
304 family=0, proto=0, flags=0):
305 raise NotImplementedError
306
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700307 # Pipes and subprocesses.
308
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 def connect_read_pipe(self, protocol_factory, pipe):
Victor Stinnera5b257a2014-05-29 00:14:03 +0200310 """Register read pipe in event loop. Set the pipe to non-blocking mode.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311
312 protocol_factory should instantiate object with Protocol interface.
Victor Stinnera5b257a2014-05-29 00:14:03 +0200313 pipe is a file-like object.
314 Return pair (transport, protocol), where transport supports the
Guido van Rossum9204af42013-11-30 15:35:42 -0800315 ReadTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 # The reason to accept file-like object instead of just file descriptor
317 # is: we need to own pipe and close it at transport finishing
318 # Can got complicated errors if pass f.fileno(),
319 # close fd in pipe transport then close f and vise versa.
320 raise NotImplementedError
321
322 def connect_write_pipe(self, protocol_factory, pipe):
Yury Selivanovdec1a452014-02-18 22:27:48 -0500323 """Register write pipe in event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324
325 protocol_factory should instantiate object with BaseProtocol interface.
326 Pipe is file-like object already switched to nonblocking.
327 Return pair (transport, protocol), where transport support
Guido van Rossum9204af42013-11-30 15:35:42 -0800328 WriteTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700329 # The reason to accept file-like object instead of just file descriptor
330 # is: we need to own pipe and close it at transport finishing
331 # Can got complicated errors if pass f.fileno(),
332 # close fd in pipe transport then close f and vise versa.
333 raise NotImplementedError
334
335 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
336 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
337 **kwargs):
338 raise NotImplementedError
339
340 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
341 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
342 **kwargs):
343 raise NotImplementedError
344
345 # Ready-based callback registration methods.
346 # The add_*() methods return None.
347 # The remove_*() methods return True if something was removed,
348 # False if there was nothing to delete.
349
350 def add_reader(self, fd, callback, *args):
351 raise NotImplementedError
352
353 def remove_reader(self, fd):
354 raise NotImplementedError
355
356 def add_writer(self, fd, callback, *args):
357 raise NotImplementedError
358
359 def remove_writer(self, fd):
360 raise NotImplementedError
361
362 # Completion based I/O methods returning Futures.
363
364 def sock_recv(self, sock, nbytes):
365 raise NotImplementedError
366
367 def sock_sendall(self, sock, data):
368 raise NotImplementedError
369
370 def sock_connect(self, sock, address):
371 raise NotImplementedError
372
373 def sock_accept(self, sock):
374 raise NotImplementedError
375
376 # Signal handling.
377
378 def add_signal_handler(self, sig, callback, *args):
379 raise NotImplementedError
380
381 def remove_signal_handler(self, sig):
382 raise NotImplementedError
383
Yury Selivanov569efa22014-02-18 18:02:19 -0500384 # Error handlers.
385
386 def set_exception_handler(self, handler):
387 raise NotImplementedError
388
389 def default_exception_handler(self, context):
390 raise NotImplementedError
391
392 def call_exception_handler(self, context):
393 raise NotImplementedError
394
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100395 # Debug flag management.
396
397 def get_debug(self):
398 raise NotImplementedError
399
400 def set_debug(self, enabled):
401 raise NotImplementedError
402
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403
404class AbstractEventLoopPolicy:
405 """Abstract policy for accessing the event loop."""
406
407 def get_event_loop(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200408 """Get the event loop for the current context.
409
410 Returns an event loop object implementing the BaseEventLoop interface,
411 or raises an exception in case no event loop has been set for the
412 current context and the current policy does not specify to create one.
413
414 It should never return None."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415 raise NotImplementedError
416
417 def set_event_loop(self, loop):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200418 """Set the event loop for the current context to loop."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700419 raise NotImplementedError
420
421 def new_event_loop(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200422 """Create and return a new event loop object according to this
423 policy's rules. If there's need to set this loop as the event loop for
424 the current context, set_event_loop must be called explicitly."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425 raise NotImplementedError
426
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800427 # Child processes handling (Unix only).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700428
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800429 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200430 "Get the watcher for child processes."
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800431 raise NotImplementedError
432
433 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200434 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800435 raise NotImplementedError
436
437
438class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700439 """Default policy implementation for accessing the event loop.
440
441 In this policy, each thread has its own event loop. However, we
442 only automatically create an event loop by default for the main
443 thread; other threads by default have no event loop.
444
445 Other policies may have different rules (e.g. a single global
446 event loop, or automatically creating an event loop per thread, or
447 using some other notion of context to which an event loop is
448 associated).
449 """
450
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800451 _loop_factory = None
452
453 class _Local(threading.local):
454 _loop = None
455 _set_called = False
456
457 def __init__(self):
458 self._local = self._Local()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700459
460 def get_event_loop(self):
461 """Get the event loop.
462
463 This may be None or an instance of EventLoop.
464 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800465 if (self._local._loop is None and
466 not self._local._set_called and
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700467 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossumcced0762013-11-27 10:37:13 -0800468 self.set_event_loop(self.new_event_loop())
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800469 assert self._local._loop is not None, \
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700470 ('There is no current event loop in thread %r.' %
471 threading.current_thread().name)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800472 return self._local._loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473
474 def set_event_loop(self, loop):
475 """Set the event loop."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800476 self._local._set_called = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700477 assert loop is None or isinstance(loop, AbstractEventLoop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800478 self._local._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700479
480 def new_event_loop(self):
481 """Create a new event loop.
482
483 You must call set_event_loop() to make this the current event
484 loop.
485 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800486 return self._loop_factory()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700487
488
489# Event loop policy. The policy itself is always global, even if the
490# policy's rules say that there is an event loop per thread (or other
491# notion of context). The default policy is installed by the first
492# call to get_event_loop_policy().
493_event_loop_policy = None
494
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800495# Lock for protecting the on-the-fly creation of the event loop policy.
496_lock = threading.Lock()
497
498
499def _init_event_loop_policy():
500 global _event_loop_policy
501 with _lock:
502 if _event_loop_policy is None: # pragma: no branch
503 from . import DefaultEventLoopPolicy
504 _event_loop_policy = DefaultEventLoopPolicy()
505
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506
507def get_event_loop_policy():
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200508 """Get the current event loop policy."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700509 if _event_loop_policy is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800510 _init_event_loop_policy()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700511 return _event_loop_policy
512
513
514def set_event_loop_policy(policy):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200515 """Set the current event loop policy.
516
517 If policy is None, the default policy is restored."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518 global _event_loop_policy
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700519 assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
520 _event_loop_policy = policy
521
522
523def get_event_loop():
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200524 """Equivalent to calling get_event_loop_policy().get_event_loop()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700525 return get_event_loop_policy().get_event_loop()
526
527
528def set_event_loop(loop):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200529 """Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530 get_event_loop_policy().set_event_loop(loop)
531
532
533def new_event_loop():
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200534 """Equivalent to calling get_event_loop_policy().new_event_loop()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700535 return get_event_loop_policy().new_event_loop()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800536
537
538def get_child_watcher():
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200539 """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800540 return get_event_loop_policy().get_child_watcher()
541
542
543def set_child_watcher(watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200544 """Equivalent to calling
545 get_event_loop_policy().set_child_watcher(watcher)."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800546 return get_event_loop_policy().set_child_watcher(watcher)