blob: de161df65f4f47d7e79e7e7468581f4a5d62ac84 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop and event loop policy."""
2
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08003__all__ = ['AbstractEventLoopPolicy',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'AbstractEventLoop', 'AbstractServer',
5 'Handle', 'TimerHandle',
6 'get_event_loop_policy', 'set_event_loop_policy',
7 'get_event_loop', 'set_event_loop', 'new_event_loop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08008 'get_child_watcher', 'set_child_watcher',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009 ]
10
Victor Stinner307bccc2014-06-12 18:39:26 +020011import functools
12import inspect
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070013import subprocess
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014import threading
15import socket
Victor Stinner307bccc2014-06-12 18:39:26 +020016import sys
17
18
19_PY34 = sys.version_info >= (3, 4)
20
21def _get_function_source(func):
22 if _PY34:
23 func = inspect.unwrap(func)
24 elif hasattr(func, '__wrapped__'):
25 func = func.__wrapped__
26 if inspect.isfunction(func):
27 code = func.__code__
28 return (code.co_filename, code.co_firstlineno)
29 if isinstance(func, functools.partial):
30 return _get_function_source(func.func)
31 if _PY34 and isinstance(func, functools.partialmethod):
32 return _get_function_source(func.func)
33 return None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
36class Handle:
37 """Object returned by callback registration methods."""
38
Guido van Rossum94ba1462014-04-27 10:44:22 -070039 __slots__ = ['_callback', '_args', '_cancelled', '_loop', '__weakref__']
Yury Selivanovb1317782014-02-12 17:01:52 -050040
Yury Selivanov569efa22014-02-18 18:02:19 -050041 def __init__(self, callback, args, loop):
Victor Stinnerdc62b7e2014-02-10 00:45:44 +010042 assert not isinstance(callback, Handle), 'A Handle is not a callback'
Yury Selivanov569efa22014-02-18 18:02:19 -050043 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044 self._callback = callback
45 self._args = args
46 self._cancelled = False
47
48 def __repr__(self):
Victor Stinner307bccc2014-06-12 18:39:26 +020049 cb_repr = getattr(self._callback, '__qualname__', None)
50 if not cb_repr:
51 cb_repr = str(self._callback)
52
53 source = _get_function_source(self._callback)
54 if source:
55 cb_repr += ' at %s:%s' % source
56
57 res = 'Handle({}, {})'.format(cb_repr, self._args)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070058 if self._cancelled:
59 res += '<cancelled>'
60 return res
61
62 def cancel(self):
63 self._cancelled = True
64
65 def _run(self):
66 try:
67 self._callback(*self._args)
Yury Selivanov569efa22014-02-18 18:02:19 -050068 except Exception as exc:
69 msg = 'Exception in callback {}{!r}'.format(self._callback,
70 self._args)
71 self._loop.call_exception_handler({
72 'message': msg,
73 'exception': exc,
74 'handle': self,
75 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070076 self = None # Needed to break cycles when an exception occurs.
77
78
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070079class TimerHandle(Handle):
80 """Object returned by timed callback registration methods."""
81
Yury Selivanovb1317782014-02-12 17:01:52 -050082 __slots__ = ['_when']
83
Yury Selivanov569efa22014-02-18 18:02:19 -050084 def __init__(self, when, callback, args, loop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 assert when is not None
Yury Selivanov569efa22014-02-18 18:02:19 -050086 super().__init__(callback, args, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070087
88 self._when = when
89
90 def __repr__(self):
91 res = 'TimerHandle({}, {}, {})'.format(self._when,
92 self._callback,
93 self._args)
94 if self._cancelled:
95 res += '<cancelled>'
96
97 return res
98
99 def __hash__(self):
100 return hash(self._when)
101
102 def __lt__(self, other):
103 return self._when < other._when
104
105 def __le__(self, other):
106 if self._when < other._when:
107 return True
108 return self.__eq__(other)
109
110 def __gt__(self, other):
111 return self._when > other._when
112
113 def __ge__(self, other):
114 if self._when > other._when:
115 return True
116 return self.__eq__(other)
117
118 def __eq__(self, other):
119 if isinstance(other, TimerHandle):
120 return (self._when == other._when and
121 self._callback == other._callback and
122 self._args == other._args and
123 self._cancelled == other._cancelled)
124 return NotImplemented
125
126 def __ne__(self, other):
127 equal = self.__eq__(other)
128 return NotImplemented if equal is NotImplemented else not equal
129
130
131class AbstractServer:
Victor Stinnercf6f72e2013-12-03 18:23:52 +0100132 """Abstract server returned by create_server()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700133
134 def close(self):
135 """Stop serving. This leaves existing connections open."""
136 return NotImplemented
137
138 def wait_closed(self):
139 """Coroutine to wait until service is closed."""
140 return NotImplemented
141
142
143class AbstractEventLoop:
144 """Abstract event loop."""
145
146 # Running and stopping the event loop.
147
148 def run_forever(self):
149 """Run the event loop until stop() is called."""
150 raise NotImplementedError
151
152 def run_until_complete(self, future):
153 """Run the event loop until a Future is done.
154
155 Return the Future's result, or raise its exception.
156 """
157 raise NotImplementedError
158
159 def stop(self):
160 """Stop the event loop as soon as reasonable.
161
162 Exactly how soon that is may depend on the implementation, but
163 no more I/O callbacks should be scheduled.
164 """
165 raise NotImplementedError
166
167 def is_running(self):
168 """Return whether the event loop is currently running."""
169 raise NotImplementedError
170
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700171 def close(self):
172 """Close the loop.
173
174 The loop should not be running.
175
176 This is idempotent and irreversible.
177
178 No other methods should be called after this one.
179 """
180 raise NotImplementedError
181
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700182 # Methods scheduling callbacks. All these return Handles.
183
184 def call_soon(self, callback, *args):
185 return self.call_later(0, callback, *args)
186
187 def call_later(self, delay, callback, *args):
188 raise NotImplementedError
189
190 def call_at(self, when, callback, *args):
191 raise NotImplementedError
192
193 def time(self):
194 raise NotImplementedError
195
196 # Methods for interacting with threads.
197
198 def call_soon_threadsafe(self, callback, *args):
199 raise NotImplementedError
200
201 def run_in_executor(self, executor, callback, *args):
202 raise NotImplementedError
203
204 def set_default_executor(self, executor):
205 raise NotImplementedError
206
207 # Network I/O methods returning Futures.
208
209 def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
210 raise NotImplementedError
211
212 def getnameinfo(self, sockaddr, flags=0):
213 raise NotImplementedError
214
215 def create_connection(self, protocol_factory, host=None, port=None, *,
216 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700217 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218 raise NotImplementedError
219
220 def create_server(self, protocol_factory, host=None, port=None, *,
221 family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
222 sock=None, backlog=100, ssl=None, reuse_address=None):
223 """A coroutine which creates a TCP server bound to host and port.
224
225 The return value is a Server object which can be used to stop
226 the service.
227
228 If host is an empty string or None all interfaces are assumed
229 and a list of multiple sockets will be returned (most likely
230 one for IPv4 and another one for IPv6).
231
232 family can be set to either AF_INET or AF_INET6 to force the
233 socket to use IPv4 or IPv6. If not set it will be determined
234 from host (defaults to AF_UNSPEC).
235
236 flags is a bitmask for getaddrinfo().
237
238 sock can optionally be specified in order to use a preexisting
239 socket object.
240
241 backlog is the maximum number of queued connections passed to
242 listen() (defaults to 100).
243
244 ssl can be set to an SSLContext to enable SSL over the
245 accepted connections.
246
247 reuse_address tells the kernel to reuse a local socket in
248 TIME_WAIT state, without waiting for its natural timeout to
249 expire. If not specified will automatically be set to True on
250 UNIX.
251 """
252 raise NotImplementedError
253
Yury Selivanovb057c522014-02-18 12:15:06 -0500254 def create_unix_connection(self, protocol_factory, path, *,
255 ssl=None, sock=None,
256 server_hostname=None):
257 raise NotImplementedError
258
259 def create_unix_server(self, protocol_factory, path, *,
260 sock=None, backlog=100, ssl=None):
261 """A coroutine which creates a UNIX Domain Socket server.
262
Yury Selivanovdec1a452014-02-18 22:27:48 -0500263 The return value is a Server object, which can be used to stop
Yury Selivanovb057c522014-02-18 12:15:06 -0500264 the service.
265
266 path is a str, representing a file systsem path to bind the
267 server socket to.
268
269 sock can optionally be specified in order to use a preexisting
270 socket object.
271
272 backlog is the maximum number of queued connections passed to
273 listen() (defaults to 100).
274
275 ssl can be set to an SSLContext to enable SSL over the
276 accepted connections.
277 """
278 raise NotImplementedError
279
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700280 def create_datagram_endpoint(self, protocol_factory,
281 local_addr=None, remote_addr=None, *,
282 family=0, proto=0, flags=0):
283 raise NotImplementedError
284
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700285 # Pipes and subprocesses.
286
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287 def connect_read_pipe(self, protocol_factory, pipe):
Victor Stinnera5b257a2014-05-29 00:14:03 +0200288 """Register read pipe in event loop. Set the pipe to non-blocking mode.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700289
290 protocol_factory should instantiate object with Protocol interface.
Victor Stinnera5b257a2014-05-29 00:14:03 +0200291 pipe is a file-like object.
292 Return pair (transport, protocol), where transport supports the
Guido van Rossum9204af42013-11-30 15:35:42 -0800293 ReadTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 # The reason to accept file-like object instead of just file descriptor
295 # is: we need to own pipe and close it at transport finishing
296 # Can got complicated errors if pass f.fileno(),
297 # close fd in pipe transport then close f and vise versa.
298 raise NotImplementedError
299
300 def connect_write_pipe(self, protocol_factory, pipe):
Yury Selivanovdec1a452014-02-18 22:27:48 -0500301 """Register write pipe in event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302
303 protocol_factory should instantiate object with BaseProtocol interface.
304 Pipe is file-like object already switched to nonblocking.
305 Return pair (transport, protocol), where transport support
Guido van Rossum9204af42013-11-30 15:35:42 -0800306 WriteTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307 # The reason to accept file-like object instead of just file descriptor
308 # is: we need to own pipe and close it at transport finishing
309 # Can got complicated errors if pass f.fileno(),
310 # close fd in pipe transport then close f and vise versa.
311 raise NotImplementedError
312
313 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
314 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
315 **kwargs):
316 raise NotImplementedError
317
318 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
319 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
320 **kwargs):
321 raise NotImplementedError
322
323 # Ready-based callback registration methods.
324 # The add_*() methods return None.
325 # The remove_*() methods return True if something was removed,
326 # False if there was nothing to delete.
327
328 def add_reader(self, fd, callback, *args):
329 raise NotImplementedError
330
331 def remove_reader(self, fd):
332 raise NotImplementedError
333
334 def add_writer(self, fd, callback, *args):
335 raise NotImplementedError
336
337 def remove_writer(self, fd):
338 raise NotImplementedError
339
340 # Completion based I/O methods returning Futures.
341
342 def sock_recv(self, sock, nbytes):
343 raise NotImplementedError
344
345 def sock_sendall(self, sock, data):
346 raise NotImplementedError
347
348 def sock_connect(self, sock, address):
349 raise NotImplementedError
350
351 def sock_accept(self, sock):
352 raise NotImplementedError
353
354 # Signal handling.
355
356 def add_signal_handler(self, sig, callback, *args):
357 raise NotImplementedError
358
359 def remove_signal_handler(self, sig):
360 raise NotImplementedError
361
Yury Selivanov569efa22014-02-18 18:02:19 -0500362 # Error handlers.
363
364 def set_exception_handler(self, handler):
365 raise NotImplementedError
366
367 def default_exception_handler(self, context):
368 raise NotImplementedError
369
370 def call_exception_handler(self, context):
371 raise NotImplementedError
372
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100373 # Debug flag management.
374
375 def get_debug(self):
376 raise NotImplementedError
377
378 def set_debug(self, enabled):
379 raise NotImplementedError
380
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700381
382class AbstractEventLoopPolicy:
383 """Abstract policy for accessing the event loop."""
384
385 def get_event_loop(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200386 """Get the event loop for the current context.
387
388 Returns an event loop object implementing the BaseEventLoop interface,
389 or raises an exception in case no event loop has been set for the
390 current context and the current policy does not specify to create one.
391
392 It should never return None."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700393 raise NotImplementedError
394
395 def set_event_loop(self, loop):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200396 """Set the event loop for the current context to loop."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397 raise NotImplementedError
398
399 def new_event_loop(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200400 """Create and return a new event loop object according to this
401 policy's rules. If there's need to set this loop as the event loop for
402 the current context, set_event_loop must be called explicitly."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403 raise NotImplementedError
404
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800405 # Child processes handling (Unix only).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700406
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800407 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200408 "Get the watcher for child processes."
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800409 raise NotImplementedError
410
411 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200412 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800413 raise NotImplementedError
414
415
416class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417 """Default policy implementation for accessing the event loop.
418
419 In this policy, each thread has its own event loop. However, we
420 only automatically create an event loop by default for the main
421 thread; other threads by default have no event loop.
422
423 Other policies may have different rules (e.g. a single global
424 event loop, or automatically creating an event loop per thread, or
425 using some other notion of context to which an event loop is
426 associated).
427 """
428
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800429 _loop_factory = None
430
431 class _Local(threading.local):
432 _loop = None
433 _set_called = False
434
435 def __init__(self):
436 self._local = self._Local()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437
438 def get_event_loop(self):
439 """Get the event loop.
440
441 This may be None or an instance of EventLoop.
442 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800443 if (self._local._loop is None and
444 not self._local._set_called and
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossumcced0762013-11-27 10:37:13 -0800446 self.set_event_loop(self.new_event_loop())
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800447 assert self._local._loop is not None, \
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700448 ('There is no current event loop in thread %r.' %
449 threading.current_thread().name)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800450 return self._local._loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451
452 def set_event_loop(self, loop):
453 """Set the event loop."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800454 self._local._set_called = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700455 assert loop is None or isinstance(loop, AbstractEventLoop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800456 self._local._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457
458 def new_event_loop(self):
459 """Create a new event loop.
460
461 You must call set_event_loop() to make this the current event
462 loop.
463 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800464 return self._loop_factory()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465
466
467# Event loop policy. The policy itself is always global, even if the
468# policy's rules say that there is an event loop per thread (or other
469# notion of context). The default policy is installed by the first
470# call to get_event_loop_policy().
471_event_loop_policy = None
472
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800473# Lock for protecting the on-the-fly creation of the event loop policy.
474_lock = threading.Lock()
475
476
477def _init_event_loop_policy():
478 global _event_loop_policy
479 with _lock:
480 if _event_loop_policy is None: # pragma: no branch
481 from . import DefaultEventLoopPolicy
482 _event_loop_policy = DefaultEventLoopPolicy()
483
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700484
485def get_event_loop_policy():
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200486 """Get the current event loop policy."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700487 if _event_loop_policy is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800488 _init_event_loop_policy()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700489 return _event_loop_policy
490
491
492def set_event_loop_policy(policy):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200493 """Set the current event loop policy.
494
495 If policy is None, the default policy is restored."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700496 global _event_loop_policy
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700497 assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
498 _event_loop_policy = policy
499
500
501def get_event_loop():
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200502 """Equivalent to calling get_event_loop_policy().get_event_loop()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700503 return get_event_loop_policy().get_event_loop()
504
505
506def set_event_loop(loop):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200507 """Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700508 get_event_loop_policy().set_event_loop(loop)
509
510
511def new_event_loop():
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200512 """Equivalent to calling get_event_loop_policy().new_event_loop()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700513 return get_event_loop_policy().new_event_loop()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800514
515
516def get_child_watcher():
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200517 """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800518 return get_event_loop_policy().get_child_watcher()
519
520
521def set_child_watcher(watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200522 """Equivalent to calling
523 get_event_loop_policy().set_child_watcher(watcher)."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800524 return get_event_loop_policy().set_child_watcher(watcher)