blob: b389cfb0889abcdf7004e66790c2df4b44e0f6f7 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop and event loop policy."""
2
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08003__all__ = ['AbstractEventLoopPolicy',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'AbstractEventLoop', 'AbstractServer',
5 'Handle', 'TimerHandle',
6 'get_event_loop_policy', 'set_event_loop_policy',
7 'get_event_loop', 'set_event_loop', 'new_event_loop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08008 'get_child_watcher', 'set_child_watcher',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009 ]
10
Victor Stinner307bccc2014-06-12 18:39:26 +020011import functools
12import inspect
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013import subprocess
Victor Stinner80f53aa2014-06-27 13:52:20 +020014import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015import threading
16import socket
Victor Stinner307bccc2014-06-12 18:39:26 +020017import sys
18
19
20_PY34 = sys.version_info >= (3, 4)
21
Victor Stinner975735f2014-06-25 21:41:58 +020022
Victor Stinner307bccc2014-06-12 18:39:26 +020023def _get_function_source(func):
24 if _PY34:
25 func = inspect.unwrap(func)
26 elif hasattr(func, '__wrapped__'):
27 func = func.__wrapped__
28 if inspect.isfunction(func):
29 code = func.__code__
30 return (code.co_filename, code.co_firstlineno)
31 if isinstance(func, functools.partial):
32 return _get_function_source(func.func)
33 if _PY34 and isinstance(func, functools.partialmethod):
34 return _get_function_source(func.func)
35 return None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070036
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037
Victor Stinner975735f2014-06-25 21:41:58 +020038def _format_args(args):
39 # function formatting ('hello',) as ('hello')
40 args_repr = repr(args)
41 if len(args) == 1 and args_repr.endswith(',)'):
42 args_repr = args_repr[:-2] + ')'
43 return args_repr
44
45
46def _format_callback(func, args, suffix=''):
47 if isinstance(func, functools.partial):
48 if args is not None:
49 suffix = _format_args(args) + suffix
50 return _format_callback(func.func, func.args, suffix)
51
52 func_repr = getattr(func, '__qualname__', None)
53 if not func_repr:
54 func_repr = repr(func)
55
56 if args is not None:
57 func_repr += _format_args(args)
58 if suffix:
59 func_repr += suffix
60
61 source = _get_function_source(func)
62 if source:
63 func_repr += ' at %s:%s' % source
64 return func_repr
65
66
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070067class Handle:
68 """Object returned by callback registration methods."""
69
Victor Stinner80f53aa2014-06-27 13:52:20 +020070 __slots__ = ('_callback', '_args', '_cancelled', '_loop',
71 '_source_traceback', '__weakref__')
Yury Selivanovb1317782014-02-12 17:01:52 -050072
Yury Selivanov569efa22014-02-18 18:02:19 -050073 def __init__(self, callback, args, loop):
Victor Stinnerdc62b7e2014-02-10 00:45:44 +010074 assert not isinstance(callback, Handle), 'A Handle is not a callback'
Yury Selivanov569efa22014-02-18 18:02:19 -050075 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 self._callback = callback
77 self._args = args
78 self._cancelled = False
Victor Stinner80f53aa2014-06-27 13:52:20 +020079 if self._loop.get_debug():
80 self._source_traceback = traceback.extract_stack(sys._getframe(1))
81 else:
82 self._source_traceback = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083
84 def __repr__(self):
Victor Stinner975735f2014-06-25 21:41:58 +020085 info = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070086 if self._cancelled:
Victor Stinner975735f2014-06-25 21:41:58 +020087 info.append('cancelled')
88 info.append(_format_callback(self._callback, self._args))
89 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070090
91 def cancel(self):
92 self._cancelled = True
93
94 def _run(self):
95 try:
96 self._callback(*self._args)
Yury Selivanov569efa22014-02-18 18:02:19 -050097 except Exception as exc:
Victor Stinner17b53f12014-06-26 01:35:45 +020098 cb = _format_callback(self._callback, self._args)
99 msg = 'Exception in callback {}'.format(cb)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200100 context = {
Yury Selivanov569efa22014-02-18 18:02:19 -0500101 'message': msg,
102 'exception': exc,
103 'handle': self,
Victor Stinner80f53aa2014-06-27 13:52:20 +0200104 }
105 if self._source_traceback:
106 context['source_traceback'] = self._source_traceback
107 self._loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700108 self = None # Needed to break cycles when an exception occurs.
109
110
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700111class TimerHandle(Handle):
112 """Object returned by timed callback registration methods."""
113
Yury Selivanovb1317782014-02-12 17:01:52 -0500114 __slots__ = ['_when']
115
Yury Selivanov569efa22014-02-18 18:02:19 -0500116 def __init__(self, when, callback, args, loop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117 assert when is not None
Yury Selivanov569efa22014-02-18 18:02:19 -0500118 super().__init__(callback, args, loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200119 if self._source_traceback:
120 del self._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700121 self._when = when
122
123 def __repr__(self):
Victor Stinner975735f2014-06-25 21:41:58 +0200124 info = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700125 if self._cancelled:
Victor Stinner975735f2014-06-25 21:41:58 +0200126 info.append('cancelled')
127 info.append('when=%s' % self._when)
128 info.append(_format_callback(self._callback, self._args))
129 return '<%s %s>' % (self.__class__.__name__, ' '.join(info))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700130
131 def __hash__(self):
132 return hash(self._when)
133
134 def __lt__(self, other):
135 return self._when < other._when
136
137 def __le__(self, other):
138 if self._when < other._when:
139 return True
140 return self.__eq__(other)
141
142 def __gt__(self, other):
143 return self._when > other._when
144
145 def __ge__(self, other):
146 if self._when > other._when:
147 return True
148 return self.__eq__(other)
149
150 def __eq__(self, other):
151 if isinstance(other, TimerHandle):
152 return (self._when == other._when and
153 self._callback == other._callback and
154 self._args == other._args and
155 self._cancelled == other._cancelled)
156 return NotImplemented
157
158 def __ne__(self, other):
159 equal = self.__eq__(other)
160 return NotImplemented if equal is NotImplemented else not equal
161
162
163class AbstractServer:
Victor Stinnercf6f72e2013-12-03 18:23:52 +0100164 """Abstract server returned by create_server()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700165
166 def close(self):
167 """Stop serving. This leaves existing connections open."""
168 return NotImplemented
169
170 def wait_closed(self):
171 """Coroutine to wait until service is closed."""
172 return NotImplemented
173
174
175class AbstractEventLoop:
176 """Abstract event loop."""
177
178 # Running and stopping the event loop.
179
180 def run_forever(self):
181 """Run the event loop until stop() is called."""
182 raise NotImplementedError
183
184 def run_until_complete(self, future):
185 """Run the event loop until a Future is done.
186
187 Return the Future's result, or raise its exception.
188 """
189 raise NotImplementedError
190
191 def stop(self):
192 """Stop the event loop as soon as reasonable.
193
194 Exactly how soon that is may depend on the implementation, but
195 no more I/O callbacks should be scheduled.
196 """
197 raise NotImplementedError
198
199 def is_running(self):
200 """Return whether the event loop is currently running."""
201 raise NotImplementedError
202
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700203 def close(self):
204 """Close the loop.
205
206 The loop should not be running.
207
208 This is idempotent and irreversible.
209
210 No other methods should be called after this one.
211 """
212 raise NotImplementedError
213
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214 # Methods scheduling callbacks. All these return Handles.
215
216 def call_soon(self, callback, *args):
217 return self.call_later(0, callback, *args)
218
219 def call_later(self, delay, callback, *args):
220 raise NotImplementedError
221
222 def call_at(self, when, callback, *args):
223 raise NotImplementedError
224
225 def time(self):
226 raise NotImplementedError
227
228 # Methods for interacting with threads.
229
230 def call_soon_threadsafe(self, callback, *args):
231 raise NotImplementedError
232
233 def run_in_executor(self, executor, callback, *args):
234 raise NotImplementedError
235
236 def set_default_executor(self, executor):
237 raise NotImplementedError
238
239 # Network I/O methods returning Futures.
240
241 def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
242 raise NotImplementedError
243
244 def getnameinfo(self, sockaddr, flags=0):
245 raise NotImplementedError
246
247 def create_connection(self, protocol_factory, host=None, port=None, *,
248 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700249 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250 raise NotImplementedError
251
252 def create_server(self, protocol_factory, host=None, port=None, *,
253 family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
254 sock=None, backlog=100, ssl=None, reuse_address=None):
255 """A coroutine which creates a TCP server bound to host and port.
256
257 The return value is a Server object which can be used to stop
258 the service.
259
260 If host is an empty string or None all interfaces are assumed
261 and a list of multiple sockets will be returned (most likely
262 one for IPv4 and another one for IPv6).
263
264 family can be set to either AF_INET or AF_INET6 to force the
265 socket to use IPv4 or IPv6. If not set it will be determined
266 from host (defaults to AF_UNSPEC).
267
268 flags is a bitmask for getaddrinfo().
269
270 sock can optionally be specified in order to use a preexisting
271 socket object.
272
273 backlog is the maximum number of queued connections passed to
274 listen() (defaults to 100).
275
276 ssl can be set to an SSLContext to enable SSL over the
277 accepted connections.
278
279 reuse_address tells the kernel to reuse a local socket in
280 TIME_WAIT state, without waiting for its natural timeout to
281 expire. If not specified will automatically be set to True on
282 UNIX.
283 """
284 raise NotImplementedError
285
Yury Selivanovb057c522014-02-18 12:15:06 -0500286 def create_unix_connection(self, protocol_factory, path, *,
287 ssl=None, sock=None,
288 server_hostname=None):
289 raise NotImplementedError
290
291 def create_unix_server(self, protocol_factory, path, *,
292 sock=None, backlog=100, ssl=None):
293 """A coroutine which creates a UNIX Domain Socket server.
294
Yury Selivanovdec1a452014-02-18 22:27:48 -0500295 The return value is a Server object, which can be used to stop
Yury Selivanovb057c522014-02-18 12:15:06 -0500296 the service.
297
298 path is a str, representing a file systsem path to bind the
299 server socket to.
300
301 sock can optionally be specified in order to use a preexisting
302 socket object.
303
304 backlog is the maximum number of queued connections passed to
305 listen() (defaults to 100).
306
307 ssl can be set to an SSLContext to enable SSL over the
308 accepted connections.
309 """
310 raise NotImplementedError
311
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312 def create_datagram_endpoint(self, protocol_factory,
313 local_addr=None, remote_addr=None, *,
314 family=0, proto=0, flags=0):
315 raise NotImplementedError
316
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700317 # Pipes and subprocesses.
318
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700319 def connect_read_pipe(self, protocol_factory, pipe):
Victor Stinnera5b257a2014-05-29 00:14:03 +0200320 """Register read pipe in event loop. Set the pipe to non-blocking mode.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700321
322 protocol_factory should instantiate object with Protocol interface.
Victor Stinnera5b257a2014-05-29 00:14:03 +0200323 pipe is a file-like object.
324 Return pair (transport, protocol), where transport supports the
Guido van Rossum9204af42013-11-30 15:35:42 -0800325 ReadTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326 # The reason to accept file-like object instead of just file descriptor
327 # is: we need to own pipe and close it at transport finishing
328 # Can got complicated errors if pass f.fileno(),
329 # close fd in pipe transport then close f and vise versa.
330 raise NotImplementedError
331
332 def connect_write_pipe(self, protocol_factory, pipe):
Yury Selivanovdec1a452014-02-18 22:27:48 -0500333 """Register write pipe in event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334
335 protocol_factory should instantiate object with BaseProtocol interface.
336 Pipe is file-like object already switched to nonblocking.
337 Return pair (transport, protocol), where transport support
Guido van Rossum9204af42013-11-30 15:35:42 -0800338 WriteTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339 # The reason to accept file-like object instead of just file descriptor
340 # is: we need to own pipe and close it at transport finishing
341 # Can got complicated errors if pass f.fileno(),
342 # close fd in pipe transport then close f and vise versa.
343 raise NotImplementedError
344
345 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
346 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
347 **kwargs):
348 raise NotImplementedError
349
350 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
351 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
352 **kwargs):
353 raise NotImplementedError
354
355 # Ready-based callback registration methods.
356 # The add_*() methods return None.
357 # The remove_*() methods return True if something was removed,
358 # False if there was nothing to delete.
359
360 def add_reader(self, fd, callback, *args):
361 raise NotImplementedError
362
363 def remove_reader(self, fd):
364 raise NotImplementedError
365
366 def add_writer(self, fd, callback, *args):
367 raise NotImplementedError
368
369 def remove_writer(self, fd):
370 raise NotImplementedError
371
372 # Completion based I/O methods returning Futures.
373
374 def sock_recv(self, sock, nbytes):
375 raise NotImplementedError
376
377 def sock_sendall(self, sock, data):
378 raise NotImplementedError
379
380 def sock_connect(self, sock, address):
381 raise NotImplementedError
382
383 def sock_accept(self, sock):
384 raise NotImplementedError
385
386 # Signal handling.
387
388 def add_signal_handler(self, sig, callback, *args):
389 raise NotImplementedError
390
391 def remove_signal_handler(self, sig):
392 raise NotImplementedError
393
Yury Selivanov569efa22014-02-18 18:02:19 -0500394 # Error handlers.
395
396 def set_exception_handler(self, handler):
397 raise NotImplementedError
398
399 def default_exception_handler(self, context):
400 raise NotImplementedError
401
402 def call_exception_handler(self, context):
403 raise NotImplementedError
404
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100405 # Debug flag management.
406
407 def get_debug(self):
408 raise NotImplementedError
409
410 def set_debug(self, enabled):
411 raise NotImplementedError
412
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413
414class AbstractEventLoopPolicy:
415 """Abstract policy for accessing the event loop."""
416
417 def get_event_loop(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200418 """Get the event loop for the current context.
419
420 Returns an event loop object implementing the BaseEventLoop interface,
421 or raises an exception in case no event loop has been set for the
422 current context and the current policy does not specify to create one.
423
424 It should never return None."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425 raise NotImplementedError
426
427 def set_event_loop(self, loop):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200428 """Set the event loop for the current context to loop."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 raise NotImplementedError
430
431 def new_event_loop(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200432 """Create and return a new event loop object according to this
433 policy's rules. If there's need to set this loop as the event loop for
434 the current context, set_event_loop must be called explicitly."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700435 raise NotImplementedError
436
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800437 # Child processes handling (Unix only).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800439 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200440 "Get the watcher for child processes."
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800441 raise NotImplementedError
442
443 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200444 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800445 raise NotImplementedError
446
447
448class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700449 """Default policy implementation for accessing the event loop.
450
451 In this policy, each thread has its own event loop. However, we
452 only automatically create an event loop by default for the main
453 thread; other threads by default have no event loop.
454
455 Other policies may have different rules (e.g. a single global
456 event loop, or automatically creating an event loop per thread, or
457 using some other notion of context to which an event loop is
458 associated).
459 """
460
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800461 _loop_factory = None
462
463 class _Local(threading.local):
464 _loop = None
465 _set_called = False
466
467 def __init__(self):
468 self._local = self._Local()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469
470 def get_event_loop(self):
471 """Get the event loop.
472
473 This may be None or an instance of EventLoop.
474 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800475 if (self._local._loop is None and
476 not self._local._set_called and
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700477 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossumcced0762013-11-27 10:37:13 -0800478 self.set_event_loop(self.new_event_loop())
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800479 assert self._local._loop is not None, \
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480 ('There is no current event loop in thread %r.' %
481 threading.current_thread().name)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800482 return self._local._loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700483
484 def set_event_loop(self, loop):
485 """Set the event loop."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800486 self._local._set_called = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700487 assert loop is None or isinstance(loop, AbstractEventLoop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800488 self._local._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700489
490 def new_event_loop(self):
491 """Create a new event loop.
492
493 You must call set_event_loop() to make this the current event
494 loop.
495 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800496 return self._loop_factory()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700497
498
499# Event loop policy. The policy itself is always global, even if the
500# policy's rules say that there is an event loop per thread (or other
501# notion of context). The default policy is installed by the first
502# call to get_event_loop_policy().
503_event_loop_policy = None
504
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800505# Lock for protecting the on-the-fly creation of the event loop policy.
506_lock = threading.Lock()
507
508
509def _init_event_loop_policy():
510 global _event_loop_policy
511 with _lock:
512 if _event_loop_policy is None: # pragma: no branch
513 from . import DefaultEventLoopPolicy
514 _event_loop_policy = DefaultEventLoopPolicy()
515
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700516
517def get_event_loop_policy():
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200518 """Get the current event loop policy."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700519 if _event_loop_policy is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800520 _init_event_loop_policy()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700521 return _event_loop_policy
522
523
524def set_event_loop_policy(policy):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200525 """Set the current event loop policy.
526
527 If policy is None, the default policy is restored."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700528 global _event_loop_policy
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700529 assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
530 _event_loop_policy = policy
531
532
533def get_event_loop():
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200534 """Equivalent to calling get_event_loop_policy().get_event_loop()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700535 return get_event_loop_policy().get_event_loop()
536
537
538def set_event_loop(loop):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200539 """Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700540 get_event_loop_policy().set_event_loop(loop)
541
542
543def new_event_loop():
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200544 """Equivalent to calling get_event_loop_policy().new_event_loop()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700545 return get_event_loop_policy().new_event_loop()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800546
547
548def get_child_watcher():
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200549 """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800550 return get_event_loop_policy().get_child_watcher()
551
552
553def set_child_watcher(watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200554 """Equivalent to calling
555 get_event_loop_policy().set_child_watcher(watcher)."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800556 return get_event_loop_policy().set_child_watcher(watcher)