blob: 5a536a22b7b3f3e19e7042ed129e352a933f5b6f [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
Victor Stinner0e6f52a2014-06-20 17:34:15 +020020import inspect
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
24import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010025import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020027import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010029import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030
Victor Stinnerf951d282014-06-29 00:46:45 +020031from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032from . import events
33from . import futures
34from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020035from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070036from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037
38
Victor Stinner8c1a4a22015-01-06 01:03:58 +010039__all__ = ['BaseEventLoop']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040
41
42# Argument for default thread pool executor creation.
43_MAX_WORKERS = 5
44
Yury Selivanov592ada92014-09-25 12:07:56 -040045# Minimum number of _scheduled timer handles before cleanup of
46# cancelled handles is performed.
47_MIN_SCHEDULED_TIMER_HANDLES = 100
48
49# Minimum fraction of _scheduled timer handles that are cancelled
50# before cleanup of cancelled handles is performed.
51_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070052
Victor Stinner0e6f52a2014-06-20 17:34:15 +020053def _format_handle(handle):
54 cb = handle._callback
55 if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
56 # format the task
57 return repr(cb.__self__)
58 else:
59 return str(handle)
60
61
Victor Stinneracdb7822014-07-14 18:33:40 +020062def _format_pipe(fd):
63 if fd == subprocess.PIPE:
64 return '<pipe>'
65 elif fd == subprocess.STDOUT:
66 return '<stdout>'
67 else:
68 return repr(fd)
69
70
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070071class _StopError(BaseException):
72 """Raised to stop the event loop."""
73
74
Victor Stinner1b0580b2014-02-13 09:24:37 +010075def _check_resolved_address(sock, address):
76 # Ensure that the address is already resolved to avoid the trap of hanging
77 # the entire event loop when the address requires doing a DNS lookup.
Victor Stinner2fc23132015-02-04 14:51:23 +010078 #
79 # getaddrinfo() is slow (around 10 us per call): this function should only
80 # be called in debug mode
Victor Stinner1b0580b2014-02-13 09:24:37 +010081 family = sock.family
Victor Stinner2fc23132015-02-04 14:51:23 +010082
Victor Stinnerd1a727a2014-02-20 16:43:09 +010083 if family == socket.AF_INET:
84 host, port = address
85 elif family == socket.AF_INET6:
Victor Stinner934c8852014-02-20 21:59:38 +010086 host, port = address[:2]
Victor Stinnerd1a727a2014-02-20 16:43:09 +010087 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010088 return
89
Victor Stinner2fc23132015-02-04 14:51:23 +010090 # On Windows, socket.inet_pton() is only available since Python 3.4
91 if hasattr(socket, 'inet_pton'):
92 # getaddrinfo() is slow and has known issue: prefer inet_pton()
93 # if available
94 try:
95 socket.inet_pton(family, host)
96 except OSError as exc:
97 raise ValueError("address must be resolved (IP address), "
98 "got host %r: %s"
99 % (host, exc))
100 else:
101 # Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
102 # already resolved.
103 type_mask = 0
104 if hasattr(socket, 'SOCK_NONBLOCK'):
105 type_mask |= socket.SOCK_NONBLOCK
106 if hasattr(socket, 'SOCK_CLOEXEC'):
107 type_mask |= socket.SOCK_CLOEXEC
108 try:
109 socket.getaddrinfo(host, port,
110 family=family,
111 type=(sock.type & ~type_mask),
112 proto=sock.proto,
113 flags=socket.AI_NUMERICHOST)
114 except socket.gaierror as err:
115 raise ValueError("address must be resolved (IP address), "
116 "got host %r: %s"
117 % (host, err))
Victor Stinner1b0580b2014-02-13 09:24:37 +0100118
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119def _raise_stop_error(*args):
120 raise _StopError
121
122
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100123def _run_until_complete_cb(fut):
124 exc = fut._exception
125 if (isinstance(exc, BaseException)
126 and not isinstance(exc, Exception)):
127 # Issue #22429: run_forever() already finished, no need to
128 # stop it.
129 return
130 _raise_stop_error()
131
132
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700133class Server(events.AbstractServer):
134
135 def __init__(self, loop, sockets):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200136 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700137 self.sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200138 self._active_count = 0
139 self._waiters = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700140
Victor Stinnere912e652014-07-12 03:11:53 +0200141 def __repr__(self):
142 return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
143
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200144 def _attach(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700145 assert self.sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200146 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200148 def _detach(self):
149 assert self._active_count > 0
150 self._active_count -= 1
151 if self._active_count == 0 and self.sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700152 self._wakeup()
153
154 def close(self):
155 sockets = self.sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200156 if sockets is None:
157 return
158 self.sockets = None
159 for sock in sockets:
160 self._loop._stop_serving(sock)
161 if self._active_count == 0:
162 self._wakeup()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163
164 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200165 waiters = self._waiters
166 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 for waiter in waiters:
168 if not waiter.done():
169 waiter.set_result(waiter)
170
Victor Stinnerf951d282014-06-29 00:46:45 +0200171 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700172 def wait_closed(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200173 if self.sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700174 return
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200175 waiter = futures.Future(loop=self._loop)
176 self._waiters.append(waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700177 yield from waiter
178
179
180class BaseEventLoop(events.AbstractEventLoop):
181
182 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400183 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200184 self._closed = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185 self._ready = collections.deque()
186 self._scheduled = []
187 self._default_executor = None
188 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100189 # Identifier of the thread running the event loop, or None if the
190 # event loop is not running
Victor Stinnera87501f2015-02-05 11:45:33 +0100191 self._thread_id = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100192 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500193 self._exception_handler = None
Yury Selivanov1af2bf72015-05-11 22:27:25 -0400194 self.set_debug((not sys.flags.ignore_environment
195 and bool(os.environ.get('PYTHONASYNCIODEBUG'))))
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200196 # In debug mode, if the execution of a callback or a step of a task
197 # exceed this duration in seconds, the slow callback/task is logged.
198 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100199 self._current_handle = None
Yury Selivanov740169c2015-05-11 14:23:38 -0400200 self._task_factory = None
Yury Selivanove8944cb2015-05-12 11:43:04 -0400201 self._coroutine_wrapper_set = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700202
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200203 def __repr__(self):
204 return ('<%s running=%s closed=%s debug=%s>'
205 % (self.__class__.__name__, self.is_running(),
206 self.is_closed(), self.get_debug()))
207
Victor Stinner896a25a2014-07-08 11:29:25 +0200208 def create_task(self, coro):
209 """Schedule a coroutine object.
210
Victor Stinneracdb7822014-07-14 18:33:40 +0200211 Return a task object.
212 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100213 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400214 if self._task_factory is None:
215 task = tasks.Task(coro, loop=self)
216 if task._source_traceback:
217 del task._source_traceback[-1]
218 else:
219 task = self._task_factory(self, coro)
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200220 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200221
Yury Selivanov740169c2015-05-11 14:23:38 -0400222 def set_task_factory(self, factory):
223 """Set a task factory that will be used by loop.create_task().
224
225 If factory is None the default task factory will be set.
226
227 If factory is a callable, it should have a signature matching
228 '(loop, coro)', where 'loop' will be a reference to the active
229 event loop, 'coro' will be a coroutine object. The callable
230 must return a Future.
231 """
232 if factory is not None and not callable(factory):
233 raise TypeError('task factory must be a callable or None')
234 self._task_factory = factory
235
236 def get_task_factory(self):
237 """Return a task factory, or None if the default one is in use."""
238 return self._task_factory
239
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240 def _make_socket_transport(self, sock, protocol, waiter=None, *,
241 extra=None, server=None):
242 """Create socket transport."""
243 raise NotImplementedError
244
Victor Stinner15cc6782015-01-09 00:09:10 +0100245 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
246 *, server_side=False, server_hostname=None,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700247 extra=None, server=None):
248 """Create SSL transport."""
249 raise NotImplementedError
250
251 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200252 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700253 """Create datagram transport."""
254 raise NotImplementedError
255
256 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
257 extra=None):
258 """Create read pipe transport."""
259 raise NotImplementedError
260
261 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
262 extra=None):
263 """Create write pipe transport."""
264 raise NotImplementedError
265
Victor Stinnerf951d282014-06-29 00:46:45 +0200266 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700267 def _make_subprocess_transport(self, protocol, args, shell,
268 stdin, stdout, stderr, bufsize,
269 extra=None, **kwargs):
270 """Create subprocess transport."""
271 raise NotImplementedError
272
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700273 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200274 """Write a byte to self-pipe, to wake up the event loop.
275
276 This may be called from a different thread.
277
278 The subclass is responsible for implementing the self-pipe.
279 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700280 raise NotImplementedError
281
282 def _process_events(self, event_list):
283 """Process selector events."""
284 raise NotImplementedError
285
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200286 def _check_closed(self):
287 if self._closed:
288 raise RuntimeError('Event loop is closed')
289
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700290 def run_forever(self):
291 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200292 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100293 if self.is_running():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700294 raise RuntimeError('Event loop is running.')
Yury Selivanove8944cb2015-05-12 11:43:04 -0400295 self._set_coroutine_wrapper(self._debug)
Victor Stinnera87501f2015-02-05 11:45:33 +0100296 self._thread_id = threading.get_ident()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 try:
298 while True:
299 try:
300 self._run_once()
301 except _StopError:
302 break
303 finally:
Victor Stinnera87501f2015-02-05 11:45:33 +0100304 self._thread_id = None
Yury Selivanove8944cb2015-05-12 11:43:04 -0400305 self._set_coroutine_wrapper(False)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700306
307 def run_until_complete(self, future):
308 """Run until the Future is done.
309
310 If the argument is a coroutine, it is wrapped in a Task.
311
Victor Stinneracdb7822014-07-14 18:33:40 +0200312 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313 with the same coroutine twice -- it would wrap it in two
314 different Tasks and that can't be good.
315
316 Return the Future's result, or raise its exception.
317 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200318 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200319
320 new_task = not isinstance(future, futures.Future)
Yury Selivanov59eb9a42015-05-11 14:48:38 -0400321 future = tasks.ensure_future(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200322 if new_task:
323 # An exception is raised if the future didn't complete, so there
324 # is no need to log the "destroy pending task" message
325 future._log_destroy_pending = False
326
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100327 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200328 try:
329 self.run_forever()
330 except:
331 if new_task and future.done() and not future.cancelled():
332 # The coroutine raised a BaseException. Consume the exception
333 # to not log a warning, the caller doesn't have access to the
334 # local task.
335 future.exception()
336 raise
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100337 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338 if not future.done():
339 raise RuntimeError('Event loop stopped before Future completed.')
340
341 return future.result()
342
343 def stop(self):
344 """Stop running the event loop.
345
Victor Stinner5006b1f2014-07-24 11:34:11 +0200346 Every callback scheduled before stop() is called will run. Callbacks
347 scheduled after stop() is called will not run. However, those callbacks
348 will run if run_forever is called again later.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700349 """
350 self.call_soon(_raise_stop_error)
351
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200352 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700353 """Close the event loop.
354
355 This clears the queues and shuts down the executor,
356 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200357
358 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700359 """
Victor Stinner956de692014-12-26 21:07:52 +0100360 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200361 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200362 if self._closed:
363 return
Victor Stinnere912e652014-07-12 03:11:53 +0200364 if self._debug:
365 logger.debug("Close %r", self)
Yury Selivanove8944cb2015-05-12 11:43:04 -0400366 self._closed = True
367 self._ready.clear()
368 self._scheduled.clear()
369 executor = self._default_executor
370 if executor is not None:
371 self._default_executor = None
372 executor.shutdown(wait=False)
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200373
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200374 def is_closed(self):
375 """Returns True if the event loop was closed."""
376 return self._closed
377
Victor Stinner978a9af2015-01-29 17:50:58 +0100378 # On Python 3.3 and older, objects with a destructor part of a reference
379 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
380 # to the PEP 442.
381 if sys.version_info >= (3, 4):
382 def __del__(self):
383 if not self.is_closed():
384 warnings.warn("unclosed event loop %r" % self, ResourceWarning)
385 if not self.is_running():
386 self.close()
387
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200389 """Returns True if the event loop is running."""
Victor Stinnera87501f2015-02-05 11:45:33 +0100390 return (self._thread_id is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700391
392 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200393 """Return the time according to the event loop's clock.
394
395 This is a float expressed in seconds since an epoch, but the
396 epoch, precision, accuracy and drift are unspecified and may
397 differ per event loop.
398 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700399 return time.monotonic()
400
401 def call_later(self, delay, callback, *args):
402 """Arrange for a callback to be called at a given time.
403
404 Return a Handle: an opaque object with a cancel() method that
405 can be used to cancel the call.
406
407 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200408 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409
410 Each callback will be called exactly once. If two callbacks
411 are scheduled for exactly the same time, it undefined which
412 will be called first.
413
414 Any positional arguments after the callback will be passed to
415 the callback when it is called.
416 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200417 timer = self.call_at(self.time() + delay, callback, *args)
418 if timer._source_traceback:
419 del timer._source_traceback[-1]
420 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700421
422 def call_at(self, when, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200423 """Like call_later(), but uses an absolute time.
424
425 Absolute time corresponds to the event loop's time() method.
426 """
Victor Stinner2d99d932014-11-20 15:03:52 +0100427 if (coroutines.iscoroutine(callback)
428 or coroutines.iscoroutinefunction(callback)):
Victor Stinner9af4a242014-02-11 11:34:30 +0100429 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100430 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100431 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100432 self._check_thread()
Yury Selivanov569efa22014-02-18 18:02:19 -0500433 timer = events.TimerHandle(when, callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200434 if timer._source_traceback:
435 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400437 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438 return timer
439
440 def call_soon(self, callback, *args):
441 """Arrange for a callback to be called as soon as possible.
442
Victor Stinneracdb7822014-07-14 18:33:40 +0200443 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444 order in which they are registered. Each callback will be
445 called exactly once.
446
447 Any positional arguments after the callback will be passed to
448 the callback when it is called.
449 """
Victor Stinner956de692014-12-26 21:07:52 +0100450 if self._debug:
451 self._check_thread()
452 handle = self._call_soon(callback, args)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200453 if handle._source_traceback:
454 del handle._source_traceback[-1]
455 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100456
Victor Stinner956de692014-12-26 21:07:52 +0100457 def _call_soon(self, callback, args):
Victor Stinner2d99d932014-11-20 15:03:52 +0100458 if (coroutines.iscoroutine(callback)
459 or coroutines.iscoroutinefunction(callback)):
Victor Stinner9af4a242014-02-11 11:34:30 +0100460 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100461 self._check_closed()
Yury Selivanov569efa22014-02-18 18:02:19 -0500462 handle = events.Handle(callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200463 if handle._source_traceback:
464 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465 self._ready.append(handle)
466 return handle
467
Victor Stinner956de692014-12-26 21:07:52 +0100468 def _check_thread(self):
469 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100470
Victor Stinneracdb7822014-07-14 18:33:40 +0200471 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100472 likely behave incorrectly when the assumption is violated.
473
Victor Stinneracdb7822014-07-14 18:33:40 +0200474 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100475 responsible for checking this condition for performance reasons.
476 """
Victor Stinnera87501f2015-02-05 11:45:33 +0100477 if self._thread_id is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200478 return
Victor Stinner956de692014-12-26 21:07:52 +0100479 thread_id = threading.get_ident()
Victor Stinnera87501f2015-02-05 11:45:33 +0100480 if thread_id != self._thread_id:
Victor Stinner93569c22014-03-21 10:00:52 +0100481 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200482 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100483 "than the current one")
484
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 def call_soon_threadsafe(self, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200486 """Like call_soon(), but thread-safe."""
Victor Stinner956de692014-12-26 21:07:52 +0100487 handle = self._call_soon(callback, args)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200488 if handle._source_traceback:
489 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700490 self._write_to_self()
491 return handle
492
Yury Selivanov740169c2015-05-11 14:23:38 -0400493 def run_in_executor(self, executor, func, *args):
494 if (coroutines.iscoroutine(func)
495 or coroutines.iscoroutinefunction(func)):
Victor Stinner2d99d932014-11-20 15:03:52 +0100496 raise TypeError("coroutines cannot be used with run_in_executor()")
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100497 self._check_closed()
Yury Selivanov740169c2015-05-11 14:23:38 -0400498 if isinstance(func, events.Handle):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700499 assert not args
Yury Selivanov740169c2015-05-11 14:23:38 -0400500 assert not isinstance(func, events.TimerHandle)
501 if func._cancelled:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700502 f = futures.Future(loop=self)
503 f.set_result(None)
504 return f
Yury Selivanov740169c2015-05-11 14:23:38 -0400505 func, args = func._callback, func._args
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700506 if executor is None:
507 executor = self._default_executor
508 if executor is None:
509 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
510 self._default_executor = executor
Yury Selivanov740169c2015-05-11 14:23:38 -0400511 return futures.wrap_future(executor.submit(func, *args), loop=self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700512
513 def set_default_executor(self, executor):
514 self._default_executor = executor
515
Victor Stinnere912e652014-07-12 03:11:53 +0200516 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
517 msg = ["%s:%r" % (host, port)]
518 if family:
519 msg.append('family=%r' % family)
520 if type:
521 msg.append('type=%r' % type)
522 if proto:
523 msg.append('proto=%r' % proto)
524 if flags:
525 msg.append('flags=%r' % flags)
526 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200527 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200528
529 t0 = self.time()
530 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
531 dt = self.time() - t0
532
Victor Stinneracdb7822014-07-14 18:33:40 +0200533 msg = ('Getting address info %s took %.3f ms: %r'
Victor Stinnere912e652014-07-12 03:11:53 +0200534 % (msg, dt * 1e3, addrinfo))
535 if dt >= self.slow_callback_duration:
536 logger.info(msg)
537 else:
538 logger.debug(msg)
539 return addrinfo
540
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700541 def getaddrinfo(self, host, port, *,
542 family=0, type=0, proto=0, flags=0):
Victor Stinnere912e652014-07-12 03:11:53 +0200543 if self._debug:
544 return self.run_in_executor(None, self._getaddrinfo_debug,
545 host, port, family, type, proto, flags)
546 else:
547 return self.run_in_executor(None, socket.getaddrinfo,
548 host, port, family, type, proto, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700549
550 def getnameinfo(self, sockaddr, flags=0):
551 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
552
Victor Stinnerf951d282014-06-29 00:46:45 +0200553 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 def create_connection(self, protocol_factory, host=None, port=None, *,
555 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700556 local_addr=None, server_hostname=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200557 """Connect to a TCP server.
558
559 Create a streaming transport connection to a given Internet host and
560 port: socket family AF_INET or socket.AF_INET6 depending on host (or
561 family if specified), socket type SOCK_STREAM. protocol_factory must be
562 a callable returning a protocol instance.
563
564 This method is a coroutine which will try to establish the connection
565 in the background. When successful, the coroutine returns a
566 (transport, protocol) pair.
567 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700568 if server_hostname is not None and not ssl:
569 raise ValueError('server_hostname is only meaningful with ssl')
570
571 if server_hostname is None and ssl:
572 # Use host as default for server_hostname. It is an error
573 # if host is empty or not set, e.g. when an
574 # already-connected socket was passed or when only a port
575 # is given. To avoid this error, you can pass
576 # server_hostname='' -- this will bypass the hostname
577 # check. (This also means that if host is a numeric
578 # IP/IPv6 address, we will attempt to verify that exact
579 # address; this will probably fail, but it is possible to
580 # create a certificate for a specific IP address, so we
581 # don't judge it here.)
582 if not host:
583 raise ValueError('You must set server_hostname '
584 'when using ssl without a host')
585 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700586
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587 if host is not None or port is not None:
588 if sock is not None:
589 raise ValueError(
590 'host/port and sock can not be specified at the same time')
591
592 f1 = self.getaddrinfo(
593 host, port, family=family,
594 type=socket.SOCK_STREAM, proto=proto, flags=flags)
595 fs = [f1]
596 if local_addr is not None:
597 f2 = self.getaddrinfo(
598 *local_addr, family=family,
599 type=socket.SOCK_STREAM, proto=proto, flags=flags)
600 fs.append(f2)
601 else:
602 f2 = None
603
604 yield from tasks.wait(fs, loop=self)
605
606 infos = f1.result()
607 if not infos:
608 raise OSError('getaddrinfo() returned empty list')
609 if f2 is not None:
610 laddr_infos = f2.result()
611 if not laddr_infos:
612 raise OSError('getaddrinfo() returned empty list')
613
614 exceptions = []
615 for family, type, proto, cname, address in infos:
616 try:
617 sock = socket.socket(family=family, type=type, proto=proto)
618 sock.setblocking(False)
619 if f2 is not None:
620 for _, _, _, _, laddr in laddr_infos:
621 try:
622 sock.bind(laddr)
623 break
624 except OSError as exc:
625 exc = OSError(
626 exc.errno, 'error while '
627 'attempting to bind on address '
628 '{!r}: {}'.format(
629 laddr, exc.strerror.lower()))
630 exceptions.append(exc)
631 else:
632 sock.close()
633 sock = None
634 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200635 if self._debug:
636 logger.debug("connect %r to %r", sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700637 yield from self.sock_connect(sock, address)
638 except OSError as exc:
639 if sock is not None:
640 sock.close()
641 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200642 except:
643 if sock is not None:
644 sock.close()
645 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700646 else:
647 break
648 else:
649 if len(exceptions) == 1:
650 raise exceptions[0]
651 else:
652 # If they all have the same str(), raise one.
653 model = str(exceptions[0])
654 if all(str(exc) == model for exc in exceptions):
655 raise exceptions[0]
656 # Raise a combined exception so the user can see all
657 # the various error messages.
658 raise OSError('Multiple exceptions: {}'.format(
659 ', '.join(str(exc) for exc in exceptions)))
660
661 elif sock is None:
662 raise ValueError(
663 'host and port was not specified and no sock specified')
664
665 sock.setblocking(False)
666
Yury Selivanovb057c522014-02-18 12:15:06 -0500667 transport, protocol = yield from self._create_connection_transport(
668 sock, protocol_factory, ssl, server_hostname)
Victor Stinnere912e652014-07-12 03:11:53 +0200669 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +0200670 # Get the socket from the transport because SSL transport closes
671 # the old socket and creates a new SSL socket
672 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +0200673 logger.debug("%r connected to %s:%r: (%r, %r)",
674 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -0500675 return transport, protocol
676
Victor Stinnerf951d282014-06-29 00:46:45 +0200677 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500678 def _create_connection_transport(self, sock, protocol_factory, ssl,
679 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700680 protocol = protocol_factory()
681 waiter = futures.Future(loop=self)
682 if ssl:
683 sslcontext = None if isinstance(ssl, bool) else ssl
684 transport = self._make_ssl_transport(
685 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700686 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700687 else:
688 transport = self._make_socket_transport(sock, protocol, waiter)
689
Victor Stinner29ad0112015-01-15 00:04:21 +0100690 try:
691 yield from waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +0100692 except:
Victor Stinner29ad0112015-01-15 00:04:21 +0100693 transport.close()
694 raise
695
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700696 return transport, protocol
697
Victor Stinnerf951d282014-06-29 00:46:45 +0200698 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700699 def create_datagram_endpoint(self, protocol_factory,
700 local_addr=None, remote_addr=None, *,
701 family=0, proto=0, flags=0):
702 """Create datagram connection."""
703 if not (local_addr or remote_addr):
704 if family == 0:
705 raise ValueError('unexpected address family')
706 addr_pairs_info = (((family, proto), (None, None)),)
707 else:
Victor Stinneracdb7822014-07-14 18:33:40 +0200708 # join address by (family, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700709 addr_infos = collections.OrderedDict()
710 for idx, addr in ((0, local_addr), (1, remote_addr)):
711 if addr is not None:
712 assert isinstance(addr, tuple) and len(addr) == 2, (
713 '2-tuple is expected')
714
715 infos = yield from self.getaddrinfo(
716 *addr, family=family, type=socket.SOCK_DGRAM,
717 proto=proto, flags=flags)
718 if not infos:
719 raise OSError('getaddrinfo() returned empty list')
720
721 for fam, _, pro, _, address in infos:
722 key = (fam, pro)
723 if key not in addr_infos:
724 addr_infos[key] = [None, None]
725 addr_infos[key][idx] = address
726
727 # each addr has to have info for each (family, proto) pair
728 addr_pairs_info = [
729 (key, addr_pair) for key, addr_pair in addr_infos.items()
730 if not ((local_addr and addr_pair[0] is None) or
731 (remote_addr and addr_pair[1] is None))]
732
733 if not addr_pairs_info:
734 raise ValueError('can not get address information')
735
736 exceptions = []
737
738 for ((family, proto),
739 (local_address, remote_address)) in addr_pairs_info:
740 sock = None
741 r_addr = None
742 try:
743 sock = socket.socket(
744 family=family, type=socket.SOCK_DGRAM, proto=proto)
745 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
746 sock.setblocking(False)
747
748 if local_addr:
749 sock.bind(local_address)
750 if remote_addr:
751 yield from self.sock_connect(sock, remote_address)
752 r_addr = remote_address
753 except OSError as exc:
754 if sock is not None:
755 sock.close()
756 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200757 except:
758 if sock is not None:
759 sock.close()
760 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700761 else:
762 break
763 else:
764 raise exceptions[0]
765
766 protocol = protocol_factory()
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200767 waiter = futures.Future(loop=self)
768 transport = self._make_datagram_transport(sock, protocol, r_addr,
769 waiter)
Victor Stinnere912e652014-07-12 03:11:53 +0200770 if self._debug:
771 if local_addr:
772 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
773 "created: (%r, %r)",
774 local_addr, remote_addr, transport, protocol)
775 else:
776 logger.debug("Datagram endpoint remote_addr=%r created: "
777 "(%r, %r)",
778 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +0100779
780 try:
781 yield from waiter
782 except:
783 transport.close()
784 raise
785
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700786 return transport, protocol
787
Victor Stinnerf951d282014-06-29 00:46:45 +0200788 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700789 def create_server(self, protocol_factory, host=None, port=None,
790 *,
791 family=socket.AF_UNSPEC,
792 flags=socket.AI_PASSIVE,
793 sock=None,
794 backlog=100,
795 ssl=None,
796 reuse_address=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200797 """Create a TCP server bound to host and port.
798
Victor Stinneracdb7822014-07-14 18:33:40 +0200799 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +0200800
801 This method is a coroutine.
802 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700803 if isinstance(ssl, bool):
804 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700805 if host is not None or port is not None:
806 if sock is not None:
807 raise ValueError(
808 'host/port and sock can not be specified at the same time')
809
810 AF_INET6 = getattr(socket, 'AF_INET6', 0)
811 if reuse_address is None:
812 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
813 sockets = []
814 if host == '':
815 host = None
816
817 infos = yield from self.getaddrinfo(
818 host, port, family=family,
819 type=socket.SOCK_STREAM, proto=0, flags=flags)
820 if not infos:
821 raise OSError('getaddrinfo() returned empty list')
822
823 completed = False
824 try:
825 for res in infos:
826 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700827 try:
828 sock = socket.socket(af, socktype, proto)
829 except socket.error:
830 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +0200831 if self._debug:
832 logger.warning('create_server() failed to create '
833 'socket.socket(%r, %r, %r)',
834 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -0700835 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700836 sockets.append(sock)
837 if reuse_address:
838 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
839 True)
840 # Disable IPv4/IPv6 dual stack support (enabled by
841 # default on Linux) which makes a single socket
842 # listen on both address families.
843 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
844 sock.setsockopt(socket.IPPROTO_IPV6,
845 socket.IPV6_V6ONLY,
846 True)
847 try:
848 sock.bind(sa)
849 except OSError as err:
850 raise OSError(err.errno, 'error while attempting '
851 'to bind on address %r: %s'
852 % (sa, err.strerror.lower()))
853 completed = True
854 finally:
855 if not completed:
856 for sock in sockets:
857 sock.close()
858 else:
859 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +0200860 raise ValueError('Neither host/port nor sock were specified')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700861 sockets = [sock]
862
863 server = Server(self, sockets)
864 for sock in sockets:
865 sock.listen(backlog)
866 sock.setblocking(False)
867 self._start_serving(protocol_factory, sock, ssl, server)
Victor Stinnere912e652014-07-12 03:11:53 +0200868 if self._debug:
869 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700870 return server
871
Victor Stinnerf951d282014-06-29 00:46:45 +0200872 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700873 def connect_read_pipe(self, protocol_factory, pipe):
874 protocol = protocol_factory()
875 waiter = futures.Future(loop=self)
876 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +0100877
878 try:
879 yield from waiter
880 except:
881 transport.close()
882 raise
883
Victor Stinneracdb7822014-07-14 18:33:40 +0200884 if self._debug:
885 logger.debug('Read pipe %r connected: (%r, %r)',
886 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700887 return transport, protocol
888
Victor Stinnerf951d282014-06-29 00:46:45 +0200889 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700890 def connect_write_pipe(self, protocol_factory, pipe):
891 protocol = protocol_factory()
892 waiter = futures.Future(loop=self)
893 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +0100894
895 try:
896 yield from waiter
897 except:
898 transport.close()
899 raise
900
Victor Stinneracdb7822014-07-14 18:33:40 +0200901 if self._debug:
902 logger.debug('Write pipe %r connected: (%r, %r)',
903 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700904 return transport, protocol
905
Victor Stinneracdb7822014-07-14 18:33:40 +0200906 def _log_subprocess(self, msg, stdin, stdout, stderr):
907 info = [msg]
908 if stdin is not None:
909 info.append('stdin=%s' % _format_pipe(stdin))
910 if stdout is not None and stderr == subprocess.STDOUT:
911 info.append('stdout=stderr=%s' % _format_pipe(stdout))
912 else:
913 if stdout is not None:
914 info.append('stdout=%s' % _format_pipe(stdout))
915 if stderr is not None:
916 info.append('stderr=%s' % _format_pipe(stderr))
917 logger.debug(' '.join(info))
918
Victor Stinnerf951d282014-06-29 00:46:45 +0200919 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700920 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
921 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
922 universal_newlines=False, shell=True, bufsize=0,
923 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100924 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800925 raise ValueError("cmd must be a string")
926 if universal_newlines:
927 raise ValueError("universal_newlines must be False")
928 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100929 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800930 if bufsize != 0:
931 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700932 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +0200933 if self._debug:
934 # don't log parameters: they may contain sensitive information
935 # (password) and may be too long
936 debug_log = 'run shell command %r' % cmd
937 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700938 transport = yield from self._make_subprocess_transport(
939 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +0200940 if self._debug:
941 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700942 return transport, protocol
943
Victor Stinnerf951d282014-06-29 00:46:45 +0200944 @coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500945 def subprocess_exec(self, protocol_factory, program, *args,
946 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
947 stderr=subprocess.PIPE, universal_newlines=False,
948 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800949 if universal_newlines:
950 raise ValueError("universal_newlines must be False")
951 if shell:
952 raise ValueError("shell must be False")
953 if bufsize != 0:
954 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100955 popen_args = (program,) + args
956 for arg in popen_args:
957 if not isinstance(arg, (str, bytes)):
958 raise TypeError("program arguments must be "
959 "a bytes or text string, not %s"
960 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700961 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +0200962 if self._debug:
963 # don't log parameters: they may contain sensitive information
964 # (password) and may be too long
965 debug_log = 'execute program %r' % program
966 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700967 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500968 protocol, popen_args, False, stdin, stdout, stderr,
969 bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +0200970 if self._debug:
971 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700972 return transport, protocol
973
Yury Selivanov569efa22014-02-18 18:02:19 -0500974 def set_exception_handler(self, handler):
975 """Set handler as the new event loop exception handler.
976
977 If handler is None, the default exception handler will
978 be set.
979
980 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +0200981 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -0500982 will be a reference to the active event loop, 'context'
983 will be a dict object (see `call_exception_handler()`
984 documentation for details about context).
985 """
986 if handler is not None and not callable(handler):
987 raise TypeError('A callable object or None is expected, '
988 'got {!r}'.format(handler))
989 self._exception_handler = handler
990
991 def default_exception_handler(self, context):
992 """Default exception handler.
993
994 This is called when an exception occurs and no exception
995 handler is set, and can be called by a custom exception
996 handler that wants to defer to the default behavior.
997
Victor Stinneracdb7822014-07-14 18:33:40 +0200998 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -0500999 `call_exception_handler()`.
1000 """
1001 message = context.get('message')
1002 if not message:
1003 message = 'Unhandled exception in event loop'
1004
1005 exception = context.get('exception')
1006 if exception is not None:
1007 exc_info = (type(exception), exception, exception.__traceback__)
1008 else:
1009 exc_info = False
1010
Victor Stinnerff018e42015-01-28 00:30:40 +01001011 if ('source_traceback' not in context
1012 and self._current_handle is not None
Victor Stinner9b524d52015-01-26 11:05:12 +01001013 and self._current_handle._source_traceback):
1014 context['handle_traceback'] = self._current_handle._source_traceback
1015
Yury Selivanov569efa22014-02-18 18:02:19 -05001016 log_lines = [message]
1017 for key in sorted(context):
1018 if key in {'message', 'exception'}:
1019 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +02001020 value = context[key]
1021 if key == 'source_traceback':
1022 tb = ''.join(traceback.format_list(value))
1023 value = 'Object created at (most recent call last):\n'
1024 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +01001025 elif key == 'handle_traceback':
1026 tb = ''.join(traceback.format_list(value))
1027 value = 'Handle created at (most recent call last):\n'
1028 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +02001029 else:
1030 value = repr(value)
1031 log_lines.append('{}: {}'.format(key, value))
Yury Selivanov569efa22014-02-18 18:02:19 -05001032
1033 logger.error('\n'.join(log_lines), exc_info=exc_info)
1034
1035 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +02001036 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -05001037
Victor Stinneracdb7822014-07-14 18:33:40 +02001038 The context argument is a dict containing the following keys:
1039
Yury Selivanov569efa22014-02-18 18:02:19 -05001040 - 'message': Error message;
1041 - 'exception' (optional): Exception object;
1042 - 'future' (optional): Future instance;
1043 - 'handle' (optional): Handle instance;
1044 - 'protocol' (optional): Protocol instance;
1045 - 'transport' (optional): Transport instance;
1046 - 'socket' (optional): Socket instance.
1047
Victor Stinneracdb7822014-07-14 18:33:40 +02001048 New keys maybe introduced in the future.
1049
1050 Note: do not overload this method in an event loop subclass.
1051 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001052 `set_exception_handler()` method.
1053 """
1054 if self._exception_handler is None:
1055 try:
1056 self.default_exception_handler(context)
1057 except Exception:
1058 # Second protection layer for unexpected errors
1059 # in the default implementation, as well as for subclassed
1060 # event loops with overloaded "default_exception_handler".
1061 logger.error('Exception in default exception handler',
1062 exc_info=True)
1063 else:
1064 try:
1065 self._exception_handler(self, context)
1066 except Exception as exc:
1067 # Exception in the user set custom exception handler.
1068 try:
1069 # Let's try default handler.
1070 self.default_exception_handler({
1071 'message': 'Unhandled error in exception handler',
1072 'exception': exc,
1073 'context': context,
1074 })
1075 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +02001076 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001077 # overloaded.
1078 logger.error('Exception in default exception handler '
1079 'while handling an unexpected error '
1080 'in custom exception handler',
1081 exc_info=True)
1082
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001083 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001084 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001085 assert isinstance(handle, events.Handle), 'A Handle is required here'
1086 if handle._cancelled:
1087 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001088 assert not isinstance(handle, events.TimerHandle)
1089 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001090
1091 def _add_callback_signalsafe(self, handle):
1092 """Like _add_callback() but called from a signal handler."""
1093 self._add_callback(handle)
1094 self._write_to_self()
1095
Yury Selivanov592ada92014-09-25 12:07:56 -04001096 def _timer_handle_cancelled(self, handle):
1097 """Notification that a TimerHandle has been cancelled."""
1098 if handle._scheduled:
1099 self._timer_cancelled_count += 1
1100
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001101 def _run_once(self):
1102 """Run one full iteration of the event loop.
1103
1104 This calls all currently ready callbacks, polls for I/O,
1105 schedules the resulting callbacks, and finally schedules
1106 'call_later' callbacks.
1107 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001108
Yury Selivanov592ada92014-09-25 12:07:56 -04001109 sched_count = len(self._scheduled)
1110 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1111 self._timer_cancelled_count / sched_count >
1112 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001113 # Remove delayed calls that were cancelled if their number
1114 # is too high
1115 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001116 for handle in self._scheduled:
1117 if handle._cancelled:
1118 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001119 else:
1120 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001121
Victor Stinner68da8fc2014-09-30 18:08:36 +02001122 heapq.heapify(new_scheduled)
1123 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001124 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001125 else:
1126 # Remove delayed calls that were cancelled from head of queue.
1127 while self._scheduled and self._scheduled[0]._cancelled:
1128 self._timer_cancelled_count -= 1
1129 handle = heapq.heappop(self._scheduled)
1130 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001131
1132 timeout = None
1133 if self._ready:
1134 timeout = 0
1135 elif self._scheduled:
1136 # Compute the desired timeout.
1137 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -07001138 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001139
Victor Stinner770e48d2014-07-11 11:58:33 +02001140 if self._debug and timeout != 0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001141 t0 = self.time()
1142 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001143 dt = self.time() - t0
Victor Stinner770e48d2014-07-11 11:58:33 +02001144 if dt >= 1.0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001145 level = logging.INFO
1146 else:
1147 level = logging.DEBUG
Victor Stinner770e48d2014-07-11 11:58:33 +02001148 nevent = len(event_list)
1149 if timeout is None:
1150 logger.log(level, 'poll took %.3f ms: %s events',
1151 dt * 1e3, nevent)
1152 elif nevent:
1153 logger.log(level,
1154 'poll %.3f ms took %.3f ms: %s events',
1155 timeout * 1e3, dt * 1e3, nevent)
1156 elif dt >= 1.0:
1157 logger.log(level,
1158 'poll %.3f ms took %.3f ms: timeout',
1159 timeout * 1e3, dt * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001160 else:
Victor Stinner22463aa2014-01-20 23:56:40 +01001161 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001162 self._process_events(event_list)
1163
1164 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001165 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001166 while self._scheduled:
1167 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001168 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001169 break
1170 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001171 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001172 self._ready.append(handle)
1173
1174 # This is the only place where callbacks are actually *called*.
1175 # All other places just add them to ready.
1176 # Note: We run all currently scheduled callbacks, but not any
1177 # callbacks scheduled by callbacks run this time around --
1178 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001179 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001180 ntodo = len(self._ready)
1181 for i in range(ntodo):
1182 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001183 if handle._cancelled:
1184 continue
1185 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001186 try:
1187 self._current_handle = handle
1188 t0 = self.time()
1189 handle._run()
1190 dt = self.time() - t0
1191 if dt >= self.slow_callback_duration:
1192 logger.warning('Executing %s took %.3f seconds',
1193 _format_handle(handle), dt)
1194 finally:
1195 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001196 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001197 handle._run()
1198 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001199
Yury Selivanove8944cb2015-05-12 11:43:04 -04001200 def _set_coroutine_wrapper(self, enabled):
1201 try:
1202 set_wrapper = sys.set_coroutine_wrapper
1203 get_wrapper = sys.get_coroutine_wrapper
1204 except AttributeError:
1205 return
1206
1207 enabled = bool(enabled)
1208 if self._coroutine_wrapper_set is enabled:
1209 return
1210
1211 wrapper = coroutines.debug_wrapper
1212 current_wrapper = get_wrapper()
1213
1214 if enabled:
1215 if current_wrapper not in (None, wrapper):
1216 warnings.warn(
1217 "loop.set_debug(True): cannot set debug coroutine "
1218 "wrapper; another wrapper is already set %r" %
1219 current_wrapper, RuntimeWarning)
1220 else:
1221 set_wrapper(wrapper)
1222 self._coroutine_wrapper_set = True
1223 else:
1224 if current_wrapper not in (None, wrapper):
1225 warnings.warn(
1226 "loop.set_debug(False): cannot unset debug coroutine "
1227 "wrapper; another wrapper was set %r" %
1228 current_wrapper, RuntimeWarning)
1229 else:
1230 set_wrapper(None)
1231 self._coroutine_wrapper_set = False
1232
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001233 def get_debug(self):
1234 return self._debug
1235
1236 def set_debug(self, enabled):
1237 self._debug = enabled
Yury Selivanov1af2bf72015-05-11 22:27:25 -04001238
Yury Selivanove8944cb2015-05-12 11:43:04 -04001239 if self.is_running():
1240 self._set_coroutine_wrapper(enabled)