blob: 270a5e4c33d7d9e5209a4bbf171d7d986e9d4177 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop and event loop policy."""
2
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08003__all__ = ['AbstractEventLoopPolicy',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'AbstractEventLoop', 'AbstractServer',
5 'Handle', 'TimerHandle',
6 'get_event_loop_policy', 'set_event_loop_policy',
7 'get_event_loop', 'set_event_loop', 'new_event_loop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08008 'get_child_watcher', 'set_child_watcher',
Yury Selivanov6ea2b8f2016-11-07 19:00:46 -05009 '_set_running_loop', '_get_running_loop',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010 ]
11
Victor Stinner307bccc2014-06-12 18:39:26 +020012import functools
13import inspect
Yury Selivanovba7e1f92017-03-02 20:07:11 -050014import os
Victor Stinner313a9802014-07-29 12:58:23 +020015import reprlib
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import socket
Victor Stinner313a9802014-07-29 12:58:23 +020017import subprocess
Victor Stinner307bccc2014-06-12 18:39:26 +020018import sys
Victor Stinner313a9802014-07-29 12:58:23 +020019import threading
20import traceback
Victor Stinner307bccc2014-06-12 18:39:26 +020021
Victor Stinner975735f2014-06-25 21:41:58 +020022
Victor Stinner307bccc2014-06-12 18:39:26 +020023def _get_function_source(func):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +090024 func = inspect.unwrap(func)
Victor Stinner307bccc2014-06-12 18:39:26 +020025 if inspect.isfunction(func):
26 code = func.__code__
27 return (code.co_filename, code.co_firstlineno)
28 if isinstance(func, functools.partial):
29 return _get_function_source(func.func)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +090030 if isinstance(func, functools.partialmethod):
Victor Stinner307bccc2014-06-12 18:39:26 +020031 return _get_function_source(func.func)
32 return None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070033
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070034
Yury Selivanov45dccda2016-09-15 15:58:15 -040035def _format_args_and_kwargs(args, kwargs):
36 """Format function arguments and keyword arguments.
Victor Stinner313a9802014-07-29 12:58:23 +020037
38 Special case for a single parameter: ('hello',) is formatted as ('hello').
39 """
40 # use reprlib to limit the length of the output
Yury Selivanov45dccda2016-09-15 15:58:15 -040041 items = []
42 if args:
43 items.extend(reprlib.repr(arg) for arg in args)
44 if kwargs:
45 items.extend('{}={}'.format(k, reprlib.repr(v))
46 for k, v in kwargs.items())
47 return '(' + ', '.join(items) + ')'
Victor Stinner975735f2014-06-25 21:41:58 +020048
49
Yury Selivanov45dccda2016-09-15 15:58:15 -040050def _format_callback(func, args, kwargs, suffix=''):
Victor Stinner975735f2014-06-25 21:41:58 +020051 if isinstance(func, functools.partial):
Yury Selivanov45dccda2016-09-15 15:58:15 -040052 suffix = _format_args_and_kwargs(args, kwargs) + suffix
53 return _format_callback(func.func, func.args, func.keywords, suffix)
Victor Stinner975735f2014-06-25 21:41:58 +020054
Guido van Rossum0a9933e2015-05-02 18:38:24 -070055 if hasattr(func, '__qualname__'):
56 func_repr = getattr(func, '__qualname__')
57 elif hasattr(func, '__name__'):
58 func_repr = getattr(func, '__name__')
59 else:
Victor Stinner975735f2014-06-25 21:41:58 +020060 func_repr = repr(func)
61
Yury Selivanov45dccda2016-09-15 15:58:15 -040062 func_repr += _format_args_and_kwargs(args, kwargs)
Victor Stinner975735f2014-06-25 21:41:58 +020063 if suffix:
64 func_repr += suffix
Guido van Rossum0a9933e2015-05-02 18:38:24 -070065 return func_repr
Victor Stinner975735f2014-06-25 21:41:58 +020066
Guido van Rossum0a9933e2015-05-02 18:38:24 -070067def _format_callback_source(func, args):
Yury Selivanov45dccda2016-09-15 15:58:15 -040068 func_repr = _format_callback(func, args, None)
Victor Stinner975735f2014-06-25 21:41:58 +020069 source = _get_function_source(func)
70 if source:
71 func_repr += ' at %s:%s' % source
72 return func_repr
73
74
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070075class Handle:
76 """Object returned by callback registration methods."""
77
Victor Stinner80f53aa2014-06-27 13:52:20 +020078 __slots__ = ('_callback', '_args', '_cancelled', '_loop',
Victor Stinner1b38bc62014-09-17 23:24:13 +020079 '_source_traceback', '_repr', '__weakref__')
Yury Selivanovb1317782014-02-12 17:01:52 -050080
Yury Selivanov569efa22014-02-18 18:02:19 -050081 def __init__(self, callback, args, loop):
Yury Selivanov569efa22014-02-18 18:02:19 -050082 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083 self._callback = callback
84 self._args = args
85 self._cancelled = False
Victor Stinner1b38bc62014-09-17 23:24:13 +020086 self._repr = None
Victor Stinner80f53aa2014-06-27 13:52:20 +020087 if self._loop.get_debug():
88 self._source_traceback = traceback.extract_stack(sys._getframe(1))
89 else:
90 self._source_traceback = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070091
Victor Stinner1b38bc62014-09-17 23:24:13 +020092 def _repr_info(self):
Victor Stinnerf68bd882014-07-10 22:32:58 +020093 info = [self.__class__.__name__]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070094 if self._cancelled:
Victor Stinner975735f2014-06-25 21:41:58 +020095 info.append('cancelled')
Victor Stinnerf68bd882014-07-10 22:32:58 +020096 if self._callback is not None:
Guido van Rossum0a9933e2015-05-02 18:38:24 -070097 info.append(_format_callback_source(self._callback, self._args))
Victor Stinnerf68bd882014-07-10 22:32:58 +020098 if self._source_traceback:
99 frame = self._source_traceback[-1]
100 info.append('created at %s:%s' % (frame[0], frame[1]))
Victor Stinner1b38bc62014-09-17 23:24:13 +0200101 return info
102
103 def __repr__(self):
104 if self._repr is not None:
105 return self._repr
106 info = self._repr_info()
Victor Stinnerf68bd882014-07-10 22:32:58 +0200107 return '<%s>' % ' '.join(info)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700108
109 def cancel(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400110 if not self._cancelled:
111 self._cancelled = True
112 if self._loop.get_debug():
113 # Keep a representation in debug mode to keep callback and
114 # parameters. For example, to log the warning
115 # "Executing <Handle...> took 2.5 second"
116 self._repr = repr(self)
117 self._callback = None
118 self._args = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119
Marat Sharafutdinov69cfed12017-11-07 12:06:05 +0300120 def cancelled(self):
121 return self._cancelled
122
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700123 def _run(self):
124 try:
125 self._callback(*self._args)
Yury Selivanov569efa22014-02-18 18:02:19 -0500126 except Exception as exc:
Guido van Rossum0a9933e2015-05-02 18:38:24 -0700127 cb = _format_callback_source(self._callback, self._args)
Victor Stinner17b53f12014-06-26 01:35:45 +0200128 msg = 'Exception in callback {}'.format(cb)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200129 context = {
Yury Selivanov569efa22014-02-18 18:02:19 -0500130 'message': msg,
131 'exception': exc,
132 'handle': self,
Victor Stinner80f53aa2014-06-27 13:52:20 +0200133 }
134 if self._source_traceback:
135 context['source_traceback'] = self._source_traceback
136 self._loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700137 self = None # Needed to break cycles when an exception occurs.
138
139
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700140class TimerHandle(Handle):
141 """Object returned by timed callback registration methods."""
142
Yury Selivanov592ada92014-09-25 12:07:56 -0400143 __slots__ = ['_scheduled', '_when']
Yury Selivanovb1317782014-02-12 17:01:52 -0500144
Yury Selivanov569efa22014-02-18 18:02:19 -0500145 def __init__(self, when, callback, args, loop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146 assert when is not None
Yury Selivanov569efa22014-02-18 18:02:19 -0500147 super().__init__(callback, args, loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200148 if self._source_traceback:
149 del self._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700150 self._when = when
Yury Selivanov592ada92014-09-25 12:07:56 -0400151 self._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700152
Victor Stinner1b38bc62014-09-17 23:24:13 +0200153 def _repr_info(self):
154 info = super()._repr_info()
155 pos = 2 if self._cancelled else 1
156 info.insert(pos, 'when=%s' % self._when)
157 return info
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158
159 def __hash__(self):
160 return hash(self._when)
161
162 def __lt__(self, other):
163 return self._when < other._when
164
165 def __le__(self, other):
166 if self._when < other._when:
167 return True
168 return self.__eq__(other)
169
170 def __gt__(self, other):
171 return self._when > other._when
172
173 def __ge__(self, other):
174 if self._when > other._when:
175 return True
176 return self.__eq__(other)
177
178 def __eq__(self, other):
179 if isinstance(other, TimerHandle):
180 return (self._when == other._when and
181 self._callback == other._callback and
182 self._args == other._args and
183 self._cancelled == other._cancelled)
184 return NotImplemented
185
186 def __ne__(self, other):
187 equal = self.__eq__(other)
188 return NotImplemented if equal is NotImplemented else not equal
189
Yury Selivanov592ada92014-09-25 12:07:56 -0400190 def cancel(self):
191 if not self._cancelled:
192 self._loop._timer_handle_cancelled(self)
193 super().cancel()
194
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700195
196class AbstractServer:
Victor Stinnercf6f72e2013-12-03 18:23:52 +0100197 """Abstract server returned by create_server()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700198
199 def close(self):
200 """Stop serving. This leaves existing connections open."""
201 return NotImplemented
202
203 def wait_closed(self):
204 """Coroutine to wait until service is closed."""
205 return NotImplemented
206
207
208class AbstractEventLoop:
209 """Abstract event loop."""
210
211 # Running and stopping the event loop.
212
213 def run_forever(self):
214 """Run the event loop until stop() is called."""
215 raise NotImplementedError
216
217 def run_until_complete(self, future):
218 """Run the event loop until a Future is done.
219
220 Return the Future's result, or raise its exception.
221 """
222 raise NotImplementedError
223
224 def stop(self):
225 """Stop the event loop as soon as reasonable.
226
227 Exactly how soon that is may depend on the implementation, but
228 no more I/O callbacks should be scheduled.
229 """
230 raise NotImplementedError
231
232 def is_running(self):
233 """Return whether the event loop is currently running."""
234 raise NotImplementedError
235
Victor Stinner896a25a2014-07-08 11:29:25 +0200236 def is_closed(self):
237 """Returns True if the event loop was closed."""
238 raise NotImplementedError
239
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700240 def close(self):
241 """Close the loop.
242
243 The loop should not be running.
244
245 This is idempotent and irreversible.
246
247 No other methods should be called after this one.
248 """
249 raise NotImplementedError
250
Yury Selivanovf6d991d2016-09-15 13:10:51 -0400251 def shutdown_asyncgens(self):
252 """Shutdown all active asynchronous generators."""
253 raise NotImplementedError
254
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700255 # Methods scheduling callbacks. All these return Handles.
256
Yury Selivanov592ada92014-09-25 12:07:56 -0400257 def _timer_handle_cancelled(self, handle):
258 """Notification that a TimerHandle has been cancelled."""
259 raise NotImplementedError
260
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700261 def call_soon(self, callback, *args):
262 return self.call_later(0, callback, *args)
263
264 def call_later(self, delay, callback, *args):
265 raise NotImplementedError
266
267 def call_at(self, when, callback, *args):
268 raise NotImplementedError
269
270 def time(self):
271 raise NotImplementedError
272
Yury Selivanov7661db62016-05-16 15:38:39 -0400273 def create_future(self):
274 raise NotImplementedError
275
Victor Stinner896a25a2014-07-08 11:29:25 +0200276 # Method scheduling a coroutine object: create a task.
277
278 def create_task(self, coro):
279 raise NotImplementedError
280
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700281 # Methods for interacting with threads.
282
283 def call_soon_threadsafe(self, callback, *args):
284 raise NotImplementedError
285
Yury Selivanov740169c2015-05-11 14:23:38 -0400286 def run_in_executor(self, executor, func, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287 raise NotImplementedError
288
289 def set_default_executor(self, executor):
290 raise NotImplementedError
291
292 # Network I/O methods returning Futures.
293
294 def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
295 raise NotImplementedError
296
297 def getnameinfo(self, sockaddr, flags=0):
298 raise NotImplementedError
299
300 def create_connection(self, protocol_factory, host=None, port=None, *,
301 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700302 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700303 raise NotImplementedError
304
305 def create_server(self, protocol_factory, host=None, port=None, *,
306 family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700307 sock=None, backlog=100, ssl=None, reuse_address=None,
308 reuse_port=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700309 """A coroutine which creates a TCP server bound to host and port.
310
311 The return value is a Server object which can be used to stop
312 the service.
313
314 If host is an empty string or None all interfaces are assumed
315 and a list of multiple sockets will be returned (most likely
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200316 one for IPv4 and another one for IPv6). The host parameter can also be a
317 sequence (e.g. list) of hosts to bind to.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700318
319 family can be set to either AF_INET or AF_INET6 to force the
320 socket to use IPv4 or IPv6. If not set it will be determined
321 from host (defaults to AF_UNSPEC).
322
323 flags is a bitmask for getaddrinfo().
324
325 sock can optionally be specified in order to use a preexisting
326 socket object.
327
328 backlog is the maximum number of queued connections passed to
329 listen() (defaults to 100).
330
331 ssl can be set to an SSLContext to enable SSL over the
332 accepted connections.
333
334 reuse_address tells the kernel to reuse a local socket in
335 TIME_WAIT state, without waiting for its natural timeout to
336 expire. If not specified will automatically be set to True on
337 UNIX.
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700338
339 reuse_port tells the kernel to allow this endpoint to be bound to
340 the same port as other existing endpoints are bound to, so long as
341 they all set this flag when being created. This option is not
342 supported on Windows.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700343 """
344 raise NotImplementedError
345
Yury Selivanovb057c522014-02-18 12:15:06 -0500346 def create_unix_connection(self, protocol_factory, path, *,
347 ssl=None, sock=None,
348 server_hostname=None):
349 raise NotImplementedError
350
351 def create_unix_server(self, protocol_factory, path, *,
352 sock=None, backlog=100, ssl=None):
353 """A coroutine which creates a UNIX Domain Socket server.
354
Yury Selivanovdec1a452014-02-18 22:27:48 -0500355 The return value is a Server object, which can be used to stop
Yury Selivanovb057c522014-02-18 12:15:06 -0500356 the service.
357
358 path is a str, representing a file systsem path to bind the
359 server socket to.
360
361 sock can optionally be specified in order to use a preexisting
362 socket object.
363
364 backlog is the maximum number of queued connections passed to
365 listen() (defaults to 100).
366
367 ssl can be set to an SSLContext to enable SSL over the
368 accepted connections.
369 """
370 raise NotImplementedError
371
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372 def create_datagram_endpoint(self, protocol_factory,
373 local_addr=None, remote_addr=None, *,
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700374 family=0, proto=0, flags=0,
375 reuse_address=None, reuse_port=None,
376 allow_broadcast=None, sock=None):
377 """A coroutine which creates a datagram endpoint.
378
379 This method will try to establish the endpoint in the background.
380 When successful, the coroutine returns a (transport, protocol) pair.
381
382 protocol_factory must be a callable returning a protocol instance.
383
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +0100384 socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on
385 host (or family if specified), socket type SOCK_DGRAM.
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700386
387 reuse_address tells the kernel to reuse a local socket in
388 TIME_WAIT state, without waiting for its natural timeout to
389 expire. If not specified it will automatically be set to True on
390 UNIX.
391
392 reuse_port tells the kernel to allow this endpoint to be bound to
393 the same port as other existing endpoints are bound to, so long as
394 they all set this flag when being created. This option is not
395 supported on Windows and some UNIX's. If the
396 :py:data:`~socket.SO_REUSEPORT` constant is not defined then this
397 capability is unsupported.
398
399 allow_broadcast tells the kernel to allow this endpoint to send
400 messages to the broadcast address.
401
402 sock can optionally be specified in order to use a preexisting
403 socket object.
404 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405 raise NotImplementedError
406
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700407 # Pipes and subprocesses.
408
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 def connect_read_pipe(self, protocol_factory, pipe):
Victor Stinnera5b257a2014-05-29 00:14:03 +0200410 """Register read pipe in event loop. Set the pipe to non-blocking mode.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411
412 protocol_factory should instantiate object with Protocol interface.
Victor Stinnera5b257a2014-05-29 00:14:03 +0200413 pipe is a file-like object.
414 Return pair (transport, protocol), where transport supports the
Guido van Rossum9204af42013-11-30 15:35:42 -0800415 ReadTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700416 # The reason to accept file-like object instead of just file descriptor
417 # is: we need to own pipe and close it at transport finishing
418 # Can got complicated errors if pass f.fileno(),
419 # close fd in pipe transport then close f and vise versa.
420 raise NotImplementedError
421
422 def connect_write_pipe(self, protocol_factory, pipe):
Yury Selivanovdec1a452014-02-18 22:27:48 -0500423 """Register write pipe in event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424
425 protocol_factory should instantiate object with BaseProtocol interface.
426 Pipe is file-like object already switched to nonblocking.
427 Return pair (transport, protocol), where transport support
Guido van Rossum9204af42013-11-30 15:35:42 -0800428 WriteTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 # The reason to accept file-like object instead of just file descriptor
430 # is: we need to own pipe and close it at transport finishing
431 # Can got complicated errors if pass f.fileno(),
432 # close fd in pipe transport then close f and vise versa.
433 raise NotImplementedError
434
435 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
436 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
437 **kwargs):
438 raise NotImplementedError
439
440 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
441 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
442 **kwargs):
443 raise NotImplementedError
444
445 # Ready-based callback registration methods.
446 # The add_*() methods return None.
447 # The remove_*() methods return True if something was removed,
448 # False if there was nothing to delete.
449
450 def add_reader(self, fd, callback, *args):
451 raise NotImplementedError
452
453 def remove_reader(self, fd):
454 raise NotImplementedError
455
456 def add_writer(self, fd, callback, *args):
457 raise NotImplementedError
458
459 def remove_writer(self, fd):
460 raise NotImplementedError
461
462 # Completion based I/O methods returning Futures.
463
464 def sock_recv(self, sock, nbytes):
465 raise NotImplementedError
466
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200467 def sock_recv_into(self, sock, buf):
468 raise NotImplementedError
469
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700470 def sock_sendall(self, sock, data):
471 raise NotImplementedError
472
473 def sock_connect(self, sock, address):
474 raise NotImplementedError
475
476 def sock_accept(self, sock):
477 raise NotImplementedError
478
479 # Signal handling.
480
481 def add_signal_handler(self, sig, callback, *args):
482 raise NotImplementedError
483
484 def remove_signal_handler(self, sig):
485 raise NotImplementedError
486
Yury Selivanov740169c2015-05-11 14:23:38 -0400487 # Task factory.
488
489 def set_task_factory(self, factory):
490 raise NotImplementedError
491
492 def get_task_factory(self):
493 raise NotImplementedError
494
Yury Selivanov569efa22014-02-18 18:02:19 -0500495 # Error handlers.
496
Yury Selivanov7ed7ce62016-05-16 15:20:38 -0400497 def get_exception_handler(self):
498 raise NotImplementedError
499
Yury Selivanov569efa22014-02-18 18:02:19 -0500500 def set_exception_handler(self, handler):
501 raise NotImplementedError
502
503 def default_exception_handler(self, context):
504 raise NotImplementedError
505
506 def call_exception_handler(self, context):
507 raise NotImplementedError
508
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100509 # Debug flag management.
510
511 def get_debug(self):
512 raise NotImplementedError
513
514 def set_debug(self, enabled):
515 raise NotImplementedError
516
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700517
518class AbstractEventLoopPolicy:
519 """Abstract policy for accessing the event loop."""
520
521 def get_event_loop(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200522 """Get the event loop for the current context.
523
524 Returns an event loop object implementing the BaseEventLoop interface,
525 or raises an exception in case no event loop has been set for the
526 current context and the current policy does not specify to create one.
527
528 It should never return None."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700529 raise NotImplementedError
530
531 def set_event_loop(self, loop):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200532 """Set the event loop for the current context to loop."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533 raise NotImplementedError
534
535 def new_event_loop(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200536 """Create and return a new event loop object according to this
537 policy's rules. If there's need to set this loop as the event loop for
538 the current context, set_event_loop must be called explicitly."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700539 raise NotImplementedError
540
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800541 # Child processes handling (Unix only).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800543 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200544 "Get the watcher for child processes."
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800545 raise NotImplementedError
546
547 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200548 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800549 raise NotImplementedError
550
551
552class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700553 """Default policy implementation for accessing the event loop.
554
555 In this policy, each thread has its own event loop. However, we
556 only automatically create an event loop by default for the main
557 thread; other threads by default have no event loop.
558
559 Other policies may have different rules (e.g. a single global
560 event loop, or automatically creating an event loop per thread, or
561 using some other notion of context to which an event loop is
562 associated).
563 """
564
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800565 _loop_factory = None
566
567 class _Local(threading.local):
568 _loop = None
569 _set_called = False
570
571 def __init__(self):
572 self._local = self._Local()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700573
574 def get_event_loop(self):
575 """Get the event loop.
576
577 This may be None or an instance of EventLoop.
578 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800579 if (self._local._loop is None and
580 not self._local._set_called and
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700581 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossumcced0762013-11-27 10:37:13 -0800582 self.set_event_loop(self.new_event_loop())
Victor Stinner3a1c7382014-12-18 01:20:10 +0100583 if self._local._loop is None:
584 raise RuntimeError('There is no current event loop in thread %r.'
585 % threading.current_thread().name)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800586 return self._local._loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587
588 def set_event_loop(self, loop):
589 """Set the event loop."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800590 self._local._set_called = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700591 assert loop is None or isinstance(loop, AbstractEventLoop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800592 self._local._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700593
594 def new_event_loop(self):
595 """Create a new event loop.
596
597 You must call set_event_loop() to make this the current event
598 loop.
599 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800600 return self._loop_factory()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700601
602
603# Event loop policy. The policy itself is always global, even if the
604# policy's rules say that there is an event loop per thread (or other
605# notion of context). The default policy is installed by the first
606# call to get_event_loop_policy().
607_event_loop_policy = None
608
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800609# Lock for protecting the on-the-fly creation of the event loop policy.
610_lock = threading.Lock()
611
612
Yury Selivanov600a3492016-11-04 14:29:28 -0400613# A TLS for the running event loop, used by _get_running_loop.
614class _RunningLoop(threading.local):
jimmylai80bbe6a72017-09-05 17:36:59 -0700615 loop_pid = (None, None)
Yury Selivanovba7e1f92017-03-02 20:07:11 -0500616
617
Yury Selivanov600a3492016-11-04 14:29:28 -0400618_running_loop = _RunningLoop()
619
620
621def _get_running_loop():
622 """Return the running event loop or None.
623
624 This is a low-level function intended to be used by event loops.
625 This function is thread-specific.
626 """
jimmylai80bbe6a72017-09-05 17:36:59 -0700627 running_loop, pid = _running_loop.loop_pid
628 if running_loop is not None and pid == os.getpid():
Yury Selivanov902e9c52017-03-02 23:57:33 -0500629 return running_loop
Yury Selivanov600a3492016-11-04 14:29:28 -0400630
631
632def _set_running_loop(loop):
633 """Set the running event loop.
634
635 This is a low-level function intended to be used by event loops.
636 This function is thread-specific.
637 """
jimmylai80bbe6a72017-09-05 17:36:59 -0700638 _running_loop.loop_pid = (loop, os.getpid())
Yury Selivanov600a3492016-11-04 14:29:28 -0400639
640
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800641def _init_event_loop_policy():
642 global _event_loop_policy
643 with _lock:
644 if _event_loop_policy is None: # pragma: no branch
645 from . import DefaultEventLoopPolicy
646 _event_loop_policy = DefaultEventLoopPolicy()
647
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700648
649def get_event_loop_policy():
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200650 """Get the current event loop policy."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700651 if _event_loop_policy is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800652 _init_event_loop_policy()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700653 return _event_loop_policy
654
655
656def set_event_loop_policy(policy):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200657 """Set the current event loop policy.
658
659 If policy is None, the default policy is restored."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700660 global _event_loop_policy
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700661 assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
662 _event_loop_policy = policy
663
664
665def get_event_loop():
Yury Selivanov600a3492016-11-04 14:29:28 -0400666 """Return an asyncio event loop.
667
668 When called from a coroutine or a callback (e.g. scheduled with call_soon
669 or similar API), this function will always return the running event loop.
670
671 If there is no running event loop set, the function will return
672 the result of `get_event_loop_policy().get_event_loop()` call.
673 """
674 current_loop = _get_running_loop()
675 if current_loop is not None:
676 return current_loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700677 return get_event_loop_policy().get_event_loop()
678
679
680def set_event_loop(loop):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200681 """Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700682 get_event_loop_policy().set_event_loop(loop)
683
684
685def new_event_loop():
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200686 """Equivalent to calling get_event_loop_policy().new_event_loop()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700687 return get_event_loop_policy().new_event_loop()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800688
689
690def get_child_watcher():
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200691 """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800692 return get_event_loop_policy().get_child_watcher()
693
694
695def set_child_watcher(watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200696 """Equivalent to calling
697 get_event_loop_policy().set_child_watcher(watcher)."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800698 return get_event_loop_policy().set_child_watcher(watcher)