blob: 2cd6035973d1af29fbf6c2290748998c08bc213c [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop and event loop policy."""
2
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08003__all__ = ['AbstractEventLoopPolicy',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07004 'AbstractEventLoop', 'AbstractServer',
5 'Handle', 'TimerHandle',
6 'get_event_loop_policy', 'set_event_loop_policy',
7 'get_event_loop', 'set_event_loop', 'new_event_loop',
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -08008 'get_child_watcher', 'set_child_watcher',
Yury Selivanov6ea2b8f2016-11-07 19:00:46 -05009 '_set_running_loop', '_get_running_loop',
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010 ]
11
Victor Stinner307bccc2014-06-12 18:39:26 +020012import functools
13import inspect
Yury Selivanovba7e1f92017-03-02 20:07:11 -050014import os
Victor Stinner313a9802014-07-29 12:58:23 +020015import reprlib
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070016import socket
Victor Stinner313a9802014-07-29 12:58:23 +020017import subprocess
Victor Stinner307bccc2014-06-12 18:39:26 +020018import sys
Victor Stinner313a9802014-07-29 12:58:23 +020019import threading
20import traceback
Victor Stinner307bccc2014-06-12 18:39:26 +020021
Antoine Pitrou921e9432017-11-07 17:23:29 +010022from . import constants
23
Victor Stinner975735f2014-06-25 21:41:58 +020024
Victor Stinner307bccc2014-06-12 18:39:26 +020025def _get_function_source(func):
INADA Naoki3e2ad8e2017-04-25 10:57:18 +090026 func = inspect.unwrap(func)
Victor Stinner307bccc2014-06-12 18:39:26 +020027 if inspect.isfunction(func):
28 code = func.__code__
29 return (code.co_filename, code.co_firstlineno)
30 if isinstance(func, functools.partial):
31 return _get_function_source(func.func)
INADA Naoki3e2ad8e2017-04-25 10:57:18 +090032 if isinstance(func, functools.partialmethod):
Victor Stinner307bccc2014-06-12 18:39:26 +020033 return _get_function_source(func.func)
34 return None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070036
Yury Selivanov45dccda2016-09-15 15:58:15 -040037def _format_args_and_kwargs(args, kwargs):
38 """Format function arguments and keyword arguments.
Victor Stinner313a9802014-07-29 12:58:23 +020039
40 Special case for a single parameter: ('hello',) is formatted as ('hello').
41 """
42 # use reprlib to limit the length of the output
Yury Selivanov45dccda2016-09-15 15:58:15 -040043 items = []
44 if args:
45 items.extend(reprlib.repr(arg) for arg in args)
46 if kwargs:
47 items.extend('{}={}'.format(k, reprlib.repr(v))
48 for k, v in kwargs.items())
49 return '(' + ', '.join(items) + ')'
Victor Stinner975735f2014-06-25 21:41:58 +020050
51
Yury Selivanov45dccda2016-09-15 15:58:15 -040052def _format_callback(func, args, kwargs, suffix=''):
Victor Stinner975735f2014-06-25 21:41:58 +020053 if isinstance(func, functools.partial):
Yury Selivanov45dccda2016-09-15 15:58:15 -040054 suffix = _format_args_and_kwargs(args, kwargs) + suffix
55 return _format_callback(func.func, func.args, func.keywords, suffix)
Victor Stinner975735f2014-06-25 21:41:58 +020056
Guido van Rossum0a9933e2015-05-02 18:38:24 -070057 if hasattr(func, '__qualname__'):
58 func_repr = getattr(func, '__qualname__')
59 elif hasattr(func, '__name__'):
60 func_repr = getattr(func, '__name__')
61 else:
Victor Stinner975735f2014-06-25 21:41:58 +020062 func_repr = repr(func)
63
Yury Selivanov45dccda2016-09-15 15:58:15 -040064 func_repr += _format_args_and_kwargs(args, kwargs)
Victor Stinner975735f2014-06-25 21:41:58 +020065 if suffix:
66 func_repr += suffix
Guido van Rossum0a9933e2015-05-02 18:38:24 -070067 return func_repr
Victor Stinner975735f2014-06-25 21:41:58 +020068
Guido van Rossum0a9933e2015-05-02 18:38:24 -070069def _format_callback_source(func, args):
Yury Selivanov45dccda2016-09-15 15:58:15 -040070 func_repr = _format_callback(func, args, None)
Victor Stinner975735f2014-06-25 21:41:58 +020071 source = _get_function_source(func)
72 if source:
73 func_repr += ' at %s:%s' % source
74 return func_repr
75
76
Antoine Pitrou921e9432017-11-07 17:23:29 +010077def extract_stack(f=None, limit=None):
78 """Replacement for traceback.extract_stack() that only does the
79 necessary work for asyncio debug mode.
80 """
81 if f is None:
82 f = sys._getframe().f_back
83 if limit is None:
84 # Limit the amount of work to a reasonable amount, as extract_stack()
85 # can be called for each coroutine and future in debug mode.
86 limit = constants.DEBUG_STACK_DEPTH
87 stack = traceback.StackSummary.extract(traceback.walk_stack(f),
88 limit=limit,
89 lookup_lines=False)
90 stack.reverse()
91 return stack
92
93
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070094class Handle:
95 """Object returned by callback registration methods."""
96
Victor Stinner80f53aa2014-06-27 13:52:20 +020097 __slots__ = ('_callback', '_args', '_cancelled', '_loop',
Victor Stinner1b38bc62014-09-17 23:24:13 +020098 '_source_traceback', '_repr', '__weakref__')
Yury Selivanovb1317782014-02-12 17:01:52 -050099
Yury Selivanov569efa22014-02-18 18:02:19 -0500100 def __init__(self, callback, args, loop):
Yury Selivanov569efa22014-02-18 18:02:19 -0500101 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700102 self._callback = callback
103 self._args = args
104 self._cancelled = False
Victor Stinner1b38bc62014-09-17 23:24:13 +0200105 self._repr = None
Victor Stinner80f53aa2014-06-27 13:52:20 +0200106 if self._loop.get_debug():
Antoine Pitrou921e9432017-11-07 17:23:29 +0100107 self._source_traceback = extract_stack(sys._getframe(1))
Victor Stinner80f53aa2014-06-27 13:52:20 +0200108 else:
109 self._source_traceback = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110
Victor Stinner1b38bc62014-09-17 23:24:13 +0200111 def _repr_info(self):
Victor Stinnerf68bd882014-07-10 22:32:58 +0200112 info = [self.__class__.__name__]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113 if self._cancelled:
Victor Stinner975735f2014-06-25 21:41:58 +0200114 info.append('cancelled')
Victor Stinnerf68bd882014-07-10 22:32:58 +0200115 if self._callback is not None:
Guido van Rossum0a9933e2015-05-02 18:38:24 -0700116 info.append(_format_callback_source(self._callback, self._args))
Victor Stinnerf68bd882014-07-10 22:32:58 +0200117 if self._source_traceback:
118 frame = self._source_traceback[-1]
119 info.append('created at %s:%s' % (frame[0], frame[1]))
Victor Stinner1b38bc62014-09-17 23:24:13 +0200120 return info
121
122 def __repr__(self):
123 if self._repr is not None:
124 return self._repr
125 info = self._repr_info()
Victor Stinnerf68bd882014-07-10 22:32:58 +0200126 return '<%s>' % ' '.join(info)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127
128 def cancel(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400129 if not self._cancelled:
130 self._cancelled = True
131 if self._loop.get_debug():
132 # Keep a representation in debug mode to keep callback and
133 # parameters. For example, to log the warning
134 # "Executing <Handle...> took 2.5 second"
135 self._repr = repr(self)
136 self._callback = None
137 self._args = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700138
Marat Sharafutdinov69cfed12017-11-07 12:06:05 +0300139 def cancelled(self):
140 return self._cancelled
141
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142 def _run(self):
143 try:
144 self._callback(*self._args)
Yury Selivanov569efa22014-02-18 18:02:19 -0500145 except Exception as exc:
Guido van Rossum0a9933e2015-05-02 18:38:24 -0700146 cb = _format_callback_source(self._callback, self._args)
Victor Stinner17b53f12014-06-26 01:35:45 +0200147 msg = 'Exception in callback {}'.format(cb)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200148 context = {
Yury Selivanov569efa22014-02-18 18:02:19 -0500149 'message': msg,
150 'exception': exc,
151 'handle': self,
Victor Stinner80f53aa2014-06-27 13:52:20 +0200152 }
153 if self._source_traceback:
154 context['source_traceback'] = self._source_traceback
155 self._loop.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700156 self = None # Needed to break cycles when an exception occurs.
157
158
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700159class TimerHandle(Handle):
160 """Object returned by timed callback registration methods."""
161
Yury Selivanov592ada92014-09-25 12:07:56 -0400162 __slots__ = ['_scheduled', '_when']
Yury Selivanovb1317782014-02-12 17:01:52 -0500163
Yury Selivanov569efa22014-02-18 18:02:19 -0500164 def __init__(self, when, callback, args, loop):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700165 assert when is not None
Yury Selivanov569efa22014-02-18 18:02:19 -0500166 super().__init__(callback, args, loop)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200167 if self._source_traceback:
168 del self._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700169 self._when = when
Yury Selivanov592ada92014-09-25 12:07:56 -0400170 self._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700171
Victor Stinner1b38bc62014-09-17 23:24:13 +0200172 def _repr_info(self):
173 info = super()._repr_info()
174 pos = 2 if self._cancelled else 1
175 info.insert(pos, 'when=%s' % self._when)
176 return info
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700177
178 def __hash__(self):
179 return hash(self._when)
180
181 def __lt__(self, other):
182 return self._when < other._when
183
184 def __le__(self, other):
185 if self._when < other._when:
186 return True
187 return self.__eq__(other)
188
189 def __gt__(self, other):
190 return self._when > other._when
191
192 def __ge__(self, other):
193 if self._when > other._when:
194 return True
195 return self.__eq__(other)
196
197 def __eq__(self, other):
198 if isinstance(other, TimerHandle):
199 return (self._when == other._when and
200 self._callback == other._callback and
201 self._args == other._args and
202 self._cancelled == other._cancelled)
203 return NotImplemented
204
205 def __ne__(self, other):
206 equal = self.__eq__(other)
207 return NotImplemented if equal is NotImplemented else not equal
208
Yury Selivanov592ada92014-09-25 12:07:56 -0400209 def cancel(self):
210 if not self._cancelled:
211 self._loop._timer_handle_cancelled(self)
212 super().cancel()
213
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214
215class AbstractServer:
Victor Stinnercf6f72e2013-12-03 18:23:52 +0100216 """Abstract server returned by create_server()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217
218 def close(self):
219 """Stop serving. This leaves existing connections open."""
220 return NotImplemented
221
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200222 async def wait_closed(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700223 """Coroutine to wait until service is closed."""
224 return NotImplemented
225
226
227class AbstractEventLoop:
228 """Abstract event loop."""
229
230 # Running and stopping the event loop.
231
232 def run_forever(self):
233 """Run the event loop until stop() is called."""
234 raise NotImplementedError
235
236 def run_until_complete(self, future):
237 """Run the event loop until a Future is done.
238
239 Return the Future's result, or raise its exception.
240 """
241 raise NotImplementedError
242
243 def stop(self):
244 """Stop the event loop as soon as reasonable.
245
246 Exactly how soon that is may depend on the implementation, but
247 no more I/O callbacks should be scheduled.
248 """
249 raise NotImplementedError
250
251 def is_running(self):
252 """Return whether the event loop is currently running."""
253 raise NotImplementedError
254
Victor Stinner896a25a2014-07-08 11:29:25 +0200255 def is_closed(self):
256 """Returns True if the event loop was closed."""
257 raise NotImplementedError
258
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700259 def close(self):
260 """Close the loop.
261
262 The loop should not be running.
263
264 This is idempotent and irreversible.
265
266 No other methods should be called after this one.
267 """
268 raise NotImplementedError
269
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200270 async def shutdown_asyncgens(self):
Yury Selivanovf6d991d2016-09-15 13:10:51 -0400271 """Shutdown all active asynchronous generators."""
272 raise NotImplementedError
273
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274 # Methods scheduling callbacks. All these return Handles.
275
Yury Selivanov592ada92014-09-25 12:07:56 -0400276 def _timer_handle_cancelled(self, handle):
277 """Notification that a TimerHandle has been cancelled."""
278 raise NotImplementedError
279
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700280 def call_soon(self, callback, *args):
281 return self.call_later(0, callback, *args)
282
283 def call_later(self, delay, callback, *args):
284 raise NotImplementedError
285
286 def call_at(self, when, callback, *args):
287 raise NotImplementedError
288
289 def time(self):
290 raise NotImplementedError
291
Yury Selivanov7661db62016-05-16 15:38:39 -0400292 def create_future(self):
293 raise NotImplementedError
294
Victor Stinner896a25a2014-07-08 11:29:25 +0200295 # Method scheduling a coroutine object: create a task.
296
297 def create_task(self, coro):
298 raise NotImplementedError
299
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700300 # Methods for interacting with threads.
301
302 def call_soon_threadsafe(self, callback, *args):
303 raise NotImplementedError
304
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200305 async def run_in_executor(self, executor, func, *args):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306 raise NotImplementedError
307
308 def set_default_executor(self, executor):
309 raise NotImplementedError
310
311 # Network I/O methods returning Futures.
312
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200313 async def getaddrinfo(self, host, port, *,
314 family=0, type=0, proto=0, flags=0):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315 raise NotImplementedError
316
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200317 async def getnameinfo(self, sockaddr, flags=0):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700318 raise NotImplementedError
319
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200320 async def create_connection(self, protocol_factory, host=None, port=None,
321 *, ssl=None, family=0, proto=0,
322 flags=0, sock=None, local_addr=None,
323 server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324 raise NotImplementedError
325
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200326 async def create_server(self, protocol_factory, host=None, port=None,
327 *, family=socket.AF_UNSPEC,
328 flags=socket.AI_PASSIVE, sock=None, backlog=100,
329 ssl=None, reuse_address=None, reuse_port=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700330 """A coroutine which creates a TCP server bound to host and port.
331
332 The return value is a Server object which can be used to stop
333 the service.
334
335 If host is an empty string or None all interfaces are assumed
336 and a list of multiple sockets will be returned (most likely
Victor Stinner5e4a7d82015-09-21 18:33:43 +0200337 one for IPv4 and another one for IPv6). The host parameter can also be a
338 sequence (e.g. list) of hosts to bind to.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339
340 family can be set to either AF_INET or AF_INET6 to force the
341 socket to use IPv4 or IPv6. If not set it will be determined
342 from host (defaults to AF_UNSPEC).
343
344 flags is a bitmask for getaddrinfo().
345
346 sock can optionally be specified in order to use a preexisting
347 socket object.
348
349 backlog is the maximum number of queued connections passed to
350 listen() (defaults to 100).
351
352 ssl can be set to an SSLContext to enable SSL over the
353 accepted connections.
354
355 reuse_address tells the kernel to reuse a local socket in
356 TIME_WAIT state, without waiting for its natural timeout to
357 expire. If not specified will automatically be set to True on
358 UNIX.
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700359
360 reuse_port tells the kernel to allow this endpoint to be bound to
361 the same port as other existing endpoints are bound to, so long as
362 they all set this flag when being created. This option is not
363 supported on Windows.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700364 """
365 raise NotImplementedError
366
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200367 async def create_unix_connection(self, protocol_factory, path=None, *,
368 ssl=None, sock=None,
369 server_hostname=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500370 raise NotImplementedError
371
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200372 async def create_unix_server(self, protocol_factory, path=None, *,
373 sock=None, backlog=100, ssl=None):
Yury Selivanovb057c522014-02-18 12:15:06 -0500374 """A coroutine which creates a UNIX Domain Socket server.
375
Yury Selivanovdec1a452014-02-18 22:27:48 -0500376 The return value is a Server object, which can be used to stop
Yury Selivanovb057c522014-02-18 12:15:06 -0500377 the service.
378
379 path is a str, representing a file systsem path to bind the
380 server socket to.
381
382 sock can optionally be specified in order to use a preexisting
383 socket object.
384
385 backlog is the maximum number of queued connections passed to
386 listen() (defaults to 100).
387
388 ssl can be set to an SSLContext to enable SSL over the
389 accepted connections.
390 """
391 raise NotImplementedError
392
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200393 async def create_datagram_endpoint(self, protocol_factory,
394 local_addr=None, remote_addr=None, *,
395 family=0, proto=0, flags=0,
396 reuse_address=None, reuse_port=None,
397 allow_broadcast=None, sock=None):
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700398 """A coroutine which creates a datagram endpoint.
399
400 This method will try to establish the endpoint in the background.
401 When successful, the coroutine returns a (transport, protocol) pair.
402
403 protocol_factory must be a callable returning a protocol instance.
404
Quentin Dawansfe4ea9c2017-10-30 14:43:02 +0100405 socket family AF_INET, socket.AF_INET6 or socket.AF_UNIX depending on
406 host (or family if specified), socket type SOCK_DGRAM.
Guido van Rossumb9bf9132015-10-05 09:15:28 -0700407
408 reuse_address tells the kernel to reuse a local socket in
409 TIME_WAIT state, without waiting for its natural timeout to
410 expire. If not specified it will automatically be set to True on
411 UNIX.
412
413 reuse_port tells the kernel to allow this endpoint to be bound to
414 the same port as other existing endpoints are bound to, so long as
415 they all set this flag when being created. This option is not
416 supported on Windows and some UNIX's. If the
417 :py:data:`~socket.SO_REUSEPORT` constant is not defined then this
418 capability is unsupported.
419
420 allow_broadcast tells the kernel to allow this endpoint to send
421 messages to the broadcast address.
422
423 sock can optionally be specified in order to use a preexisting
424 socket object.
425 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 raise NotImplementedError
427
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700428 # Pipes and subprocesses.
429
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200430 async def connect_read_pipe(self, protocol_factory, pipe):
Victor Stinnera5b257a2014-05-29 00:14:03 +0200431 """Register read pipe in event loop. Set the pipe to non-blocking mode.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700432
433 protocol_factory should instantiate object with Protocol interface.
Victor Stinnera5b257a2014-05-29 00:14:03 +0200434 pipe is a file-like object.
435 Return pair (transport, protocol), where transport supports the
Guido van Rossum9204af42013-11-30 15:35:42 -0800436 ReadTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700437 # The reason to accept file-like object instead of just file descriptor
438 # is: we need to own pipe and close it at transport finishing
439 # Can got complicated errors if pass f.fileno(),
440 # close fd in pipe transport then close f and vise versa.
441 raise NotImplementedError
442
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200443 async def connect_write_pipe(self, protocol_factory, pipe):
Yury Selivanovdec1a452014-02-18 22:27:48 -0500444 """Register write pipe in event loop.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700445
446 protocol_factory should instantiate object with BaseProtocol interface.
447 Pipe is file-like object already switched to nonblocking.
448 Return pair (transport, protocol), where transport support
Guido van Rossum9204af42013-11-30 15:35:42 -0800449 WriteTransport interface."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700450 # The reason to accept file-like object instead of just file descriptor
451 # is: we need to own pipe and close it at transport finishing
452 # Can got complicated errors if pass f.fileno(),
453 # close fd in pipe transport then close f and vise versa.
454 raise NotImplementedError
455
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200456 async def subprocess_shell(self, protocol_factory, cmd, *,
457 stdin=subprocess.PIPE,
458 stdout=subprocess.PIPE,
459 stderr=subprocess.PIPE,
460 **kwargs):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461 raise NotImplementedError
462
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200463 async def subprocess_exec(self, protocol_factory, *args,
464 stdin=subprocess.PIPE,
465 stdout=subprocess.PIPE,
466 stderr=subprocess.PIPE,
467 **kwargs):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700468 raise NotImplementedError
469
470 # Ready-based callback registration methods.
471 # The add_*() methods return None.
472 # The remove_*() methods return True if something was removed,
473 # False if there was nothing to delete.
474
475 def add_reader(self, fd, callback, *args):
476 raise NotImplementedError
477
478 def remove_reader(self, fd):
479 raise NotImplementedError
480
481 def add_writer(self, fd, callback, *args):
482 raise NotImplementedError
483
484 def remove_writer(self, fd):
485 raise NotImplementedError
486
487 # Completion based I/O methods returning Futures.
488
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200489 async def sock_recv(self, sock, nbytes):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700490 raise NotImplementedError
491
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200492 async def sock_recv_into(self, sock, buf):
Antoine Pitrou525f40d2017-10-19 21:46:40 +0200493 raise NotImplementedError
494
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200495 async def sock_sendall(self, sock, data):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700496 raise NotImplementedError
497
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200498 async def sock_connect(self, sock, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700499 raise NotImplementedError
500
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200501 async def sock_accept(self, sock):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700502 raise NotImplementedError
503
504 # Signal handling.
505
506 def add_signal_handler(self, sig, callback, *args):
507 raise NotImplementedError
508
509 def remove_signal_handler(self, sig):
510 raise NotImplementedError
511
Yury Selivanov740169c2015-05-11 14:23:38 -0400512 # Task factory.
513
514 def set_task_factory(self, factory):
515 raise NotImplementedError
516
517 def get_task_factory(self):
518 raise NotImplementedError
519
Yury Selivanov569efa22014-02-18 18:02:19 -0500520 # Error handlers.
521
Yury Selivanov7ed7ce62016-05-16 15:20:38 -0400522 def get_exception_handler(self):
523 raise NotImplementedError
524
Yury Selivanov569efa22014-02-18 18:02:19 -0500525 def set_exception_handler(self, handler):
526 raise NotImplementedError
527
528 def default_exception_handler(self, context):
529 raise NotImplementedError
530
531 def call_exception_handler(self, context):
532 raise NotImplementedError
533
Victor Stinner0f3e6bc2014-02-19 23:15:02 +0100534 # Debug flag management.
535
536 def get_debug(self):
537 raise NotImplementedError
538
539 def set_debug(self, enabled):
540 raise NotImplementedError
541
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700542
543class AbstractEventLoopPolicy:
544 """Abstract policy for accessing the event loop."""
545
546 def get_event_loop(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200547 """Get the event loop for the current context.
548
549 Returns an event loop object implementing the BaseEventLoop interface,
550 or raises an exception in case no event loop has been set for the
551 current context and the current policy does not specify to create one.
552
553 It should never return None."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 raise NotImplementedError
555
556 def set_event_loop(self, loop):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200557 """Set the event loop for the current context to loop."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700558 raise NotImplementedError
559
560 def new_event_loop(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200561 """Create and return a new event loop object according to this
562 policy's rules. If there's need to set this loop as the event loop for
563 the current context, set_event_loop must be called explicitly."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700564 raise NotImplementedError
565
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800566 # Child processes handling (Unix only).
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700567
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800568 def get_child_watcher(self):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200569 "Get the watcher for child processes."
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800570 raise NotImplementedError
571
572 def set_child_watcher(self, watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200573 """Set the watcher for child processes."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800574 raise NotImplementedError
575
576
577class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700578 """Default policy implementation for accessing the event loop.
579
580 In this policy, each thread has its own event loop. However, we
581 only automatically create an event loop by default for the main
582 thread; other threads by default have no event loop.
583
584 Other policies may have different rules (e.g. a single global
585 event loop, or automatically creating an event loop per thread, or
586 using some other notion of context to which an event loop is
587 associated).
588 """
589
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800590 _loop_factory = None
591
592 class _Local(threading.local):
593 _loop = None
594 _set_called = False
595
596 def __init__(self):
597 self._local = self._Local()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700598
599 def get_event_loop(self):
600 """Get the event loop.
601
602 This may be None or an instance of EventLoop.
603 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800604 if (self._local._loop is None and
605 not self._local._set_called and
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700606 isinstance(threading.current_thread(), threading._MainThread)):
Guido van Rossumcced0762013-11-27 10:37:13 -0800607 self.set_event_loop(self.new_event_loop())
Victor Stinner3a1c7382014-12-18 01:20:10 +0100608 if self._local._loop is None:
609 raise RuntimeError('There is no current event loop in thread %r.'
610 % threading.current_thread().name)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800611 return self._local._loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700612
613 def set_event_loop(self, loop):
614 """Set the event loop."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800615 self._local._set_called = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700616 assert loop is None or isinstance(loop, AbstractEventLoop)
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800617 self._local._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700618
619 def new_event_loop(self):
620 """Create a new event loop.
621
622 You must call set_event_loop() to make this the current event
623 loop.
624 """
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800625 return self._loop_factory()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700626
627
628# Event loop policy. The policy itself is always global, even if the
629# policy's rules say that there is an event loop per thread (or other
630# notion of context). The default policy is installed by the first
631# call to get_event_loop_policy().
632_event_loop_policy = None
633
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800634# Lock for protecting the on-the-fly creation of the event loop policy.
635_lock = threading.Lock()
636
637
Yury Selivanov600a3492016-11-04 14:29:28 -0400638# A TLS for the running event loop, used by _get_running_loop.
639class _RunningLoop(threading.local):
jimmylai80bbe6a72017-09-05 17:36:59 -0700640 loop_pid = (None, None)
Yury Selivanovba7e1f92017-03-02 20:07:11 -0500641
642
Yury Selivanov600a3492016-11-04 14:29:28 -0400643_running_loop = _RunningLoop()
644
645
646def _get_running_loop():
647 """Return the running event loop or None.
648
649 This is a low-level function intended to be used by event loops.
650 This function is thread-specific.
651 """
jimmylai80bbe6a72017-09-05 17:36:59 -0700652 running_loop, pid = _running_loop.loop_pid
653 if running_loop is not None and pid == os.getpid():
Yury Selivanov902e9c52017-03-02 23:57:33 -0500654 return running_loop
Yury Selivanov600a3492016-11-04 14:29:28 -0400655
656
657def _set_running_loop(loop):
658 """Set the running event loop.
659
660 This is a low-level function intended to be used by event loops.
661 This function is thread-specific.
662 """
jimmylai80bbe6a72017-09-05 17:36:59 -0700663 _running_loop.loop_pid = (loop, os.getpid())
Yury Selivanov600a3492016-11-04 14:29:28 -0400664
665
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800666def _init_event_loop_policy():
667 global _event_loop_policy
668 with _lock:
669 if _event_loop_policy is None: # pragma: no branch
670 from . import DefaultEventLoopPolicy
671 _event_loop_policy = DefaultEventLoopPolicy()
672
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700673
674def get_event_loop_policy():
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200675 """Get the current event loop policy."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700676 if _event_loop_policy is None:
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800677 _init_event_loop_policy()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700678 return _event_loop_policy
679
680
681def set_event_loop_policy(policy):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200682 """Set the current event loop policy.
683
684 If policy is None, the default policy is restored."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700685 global _event_loop_policy
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686 assert policy is None or isinstance(policy, AbstractEventLoopPolicy)
687 _event_loop_policy = policy
688
689
690def get_event_loop():
Yury Selivanov600a3492016-11-04 14:29:28 -0400691 """Return an asyncio event loop.
692
693 When called from a coroutine or a callback (e.g. scheduled with call_soon
694 or similar API), this function will always return the running event loop.
695
696 If there is no running event loop set, the function will return
697 the result of `get_event_loop_policy().get_event_loop()` call.
698 """
699 current_loop = _get_running_loop()
700 if current_loop is not None:
701 return current_loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700702 return get_event_loop_policy().get_event_loop()
703
704
705def set_event_loop(loop):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200706 """Equivalent to calling get_event_loop_policy().set_event_loop(loop)."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700707 get_event_loop_policy().set_event_loop(loop)
708
709
710def new_event_loop():
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200711 """Equivalent to calling get_event_loop_policy().new_event_loop()."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700712 return get_event_loop_policy().new_event_loop()
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800713
714
715def get_child_watcher():
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200716 """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800717 return get_event_loop_policy().get_child_watcher()
718
719
720def set_child_watcher(watcher):
Victor Stinnerf9e49dd2014-06-05 12:06:44 +0200721 """Equivalent to calling
722 get_event_loop_policy().set_child_watcher(watcher)."""
Guido van Rossum0eaa5ac2013-11-04 15:50:46 -0800723 return get_event_loop_policy().set_child_watcher(watcher)