blob: 7108f2516ad649500acc4f98123db17df143d4f6 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
Victor Stinner0e6f52a2014-06-20 17:34:15 +020020import inspect
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
24import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010025import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020027import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028import sys
Victor Stinner978a9af2015-01-29 17:50:58 +010029import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030
Victor Stinnerf951d282014-06-29 00:46:45 +020031from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070032from . import events
33from . import futures
34from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020035from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070036from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037
38
Victor Stinner8c1a4a22015-01-06 01:03:58 +010039__all__ = ['BaseEventLoop']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040
41
42# Argument for default thread pool executor creation.
43_MAX_WORKERS = 5
44
Yury Selivanov592ada92014-09-25 12:07:56 -040045# Minimum number of _scheduled timer handles before cleanup of
46# cancelled handles is performed.
47_MIN_SCHEDULED_TIMER_HANDLES = 100
48
49# Minimum fraction of _scheduled timer handles that are cancelled
50# before cleanup of cancelled handles is performed.
51_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070052
Victor Stinner0e6f52a2014-06-20 17:34:15 +020053def _format_handle(handle):
54 cb = handle._callback
55 if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
56 # format the task
57 return repr(cb.__self__)
58 else:
59 return str(handle)
60
61
Victor Stinneracdb7822014-07-14 18:33:40 +020062def _format_pipe(fd):
63 if fd == subprocess.PIPE:
64 return '<pipe>'
65 elif fd == subprocess.STDOUT:
66 return '<stdout>'
67 else:
68 return repr(fd)
69
70
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070071class _StopError(BaseException):
72 """Raised to stop the event loop."""
73
74
Victor Stinner1b0580b2014-02-13 09:24:37 +010075def _check_resolved_address(sock, address):
76 # Ensure that the address is already resolved to avoid the trap of hanging
77 # the entire event loop when the address requires doing a DNS lookup.
78 family = sock.family
Victor Stinnerd1a727a2014-02-20 16:43:09 +010079 if family == socket.AF_INET:
80 host, port = address
81 elif family == socket.AF_INET6:
Victor Stinner934c8852014-02-20 21:59:38 +010082 host, port = address[:2]
Victor Stinnerd1a727a2014-02-20 16:43:09 +010083 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010084 return
85
Victor Stinner1b0580b2014-02-13 09:24:37 +010086 type_mask = 0
87 if hasattr(socket, 'SOCK_NONBLOCK'):
88 type_mask |= socket.SOCK_NONBLOCK
89 if hasattr(socket, 'SOCK_CLOEXEC'):
90 type_mask |= socket.SOCK_CLOEXEC
Victor Stinneracdb7822014-07-14 18:33:40 +020091 # Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
Victor Stinner1b0580b2014-02-13 09:24:37 +010092 # already resolved.
93 try:
94 socket.getaddrinfo(host, port,
95 family=family,
96 type=(sock.type & ~type_mask),
97 proto=sock.proto,
98 flags=socket.AI_NUMERICHOST)
99 except socket.gaierror as err:
100 raise ValueError("address must be resolved (IP address), got %r: %s"
101 % (address, err))
102
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700103def _raise_stop_error(*args):
104 raise _StopError
105
106
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100107def _run_until_complete_cb(fut):
108 exc = fut._exception
109 if (isinstance(exc, BaseException)
110 and not isinstance(exc, Exception)):
111 # Issue #22429: run_forever() already finished, no need to
112 # stop it.
113 return
114 _raise_stop_error()
115
116
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117class Server(events.AbstractServer):
118
119 def __init__(self, loop, sockets):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200120 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700121 self.sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200122 self._active_count = 0
123 self._waiters = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124
Victor Stinnere912e652014-07-12 03:11:53 +0200125 def __repr__(self):
126 return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
127
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200128 def _attach(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700129 assert self.sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200130 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700131
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200132 def _detach(self):
133 assert self._active_count > 0
134 self._active_count -= 1
135 if self._active_count == 0 and self.sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700136 self._wakeup()
137
138 def close(self):
139 sockets = self.sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200140 if sockets is None:
141 return
142 self.sockets = None
143 for sock in sockets:
144 self._loop._stop_serving(sock)
145 if self._active_count == 0:
146 self._wakeup()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147
148 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200149 waiters = self._waiters
150 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151 for waiter in waiters:
152 if not waiter.done():
153 waiter.set_result(waiter)
154
Victor Stinnerf951d282014-06-29 00:46:45 +0200155 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700156 def wait_closed(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200157 if self.sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158 return
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200159 waiter = futures.Future(loop=self._loop)
160 self._waiters.append(waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700161 yield from waiter
162
163
164class BaseEventLoop(events.AbstractEventLoop):
165
166 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400167 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200168 self._closed = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700169 self._ready = collections.deque()
170 self._scheduled = []
171 self._default_executor = None
172 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100173 # Identifier of the thread running the event loop, or None if the
174 # event loop is not running
175 self._owner = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100176 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500177 self._exception_handler = None
Victor Stinner7b7120e2014-06-23 00:12:14 +0200178 self._debug = (not sys.flags.ignore_environment
179 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200180 # In debug mode, if the execution of a callback or a step of a task
181 # exceed this duration in seconds, the slow callback/task is logged.
182 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100183 self._current_handle = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700184
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200185 def __repr__(self):
186 return ('<%s running=%s closed=%s debug=%s>'
187 % (self.__class__.__name__, self.is_running(),
188 self.is_closed(), self.get_debug()))
189
Victor Stinner896a25a2014-07-08 11:29:25 +0200190 def create_task(self, coro):
191 """Schedule a coroutine object.
192
Victor Stinneracdb7822014-07-14 18:33:40 +0200193 Return a task object.
194 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100195 self._check_closed()
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200196 task = tasks.Task(coro, loop=self)
197 if task._source_traceback:
198 del task._source_traceback[-1]
199 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200200
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700201 def _make_socket_transport(self, sock, protocol, waiter=None, *,
202 extra=None, server=None):
203 """Create socket transport."""
204 raise NotImplementedError
205
Victor Stinner15cc6782015-01-09 00:09:10 +0100206 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
207 *, server_side=False, server_hostname=None,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700208 extra=None, server=None):
209 """Create SSL transport."""
210 raise NotImplementedError
211
212 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200213 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700214 """Create datagram transport."""
215 raise NotImplementedError
216
217 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
218 extra=None):
219 """Create read pipe transport."""
220 raise NotImplementedError
221
222 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
223 extra=None):
224 """Create write pipe transport."""
225 raise NotImplementedError
226
Victor Stinnerf951d282014-06-29 00:46:45 +0200227 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700228 def _make_subprocess_transport(self, protocol, args, shell,
229 stdin, stdout, stderr, bufsize,
230 extra=None, **kwargs):
231 """Create subprocess transport."""
232 raise NotImplementedError
233
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200235 """Write a byte to self-pipe, to wake up the event loop.
236
237 This may be called from a different thread.
238
239 The subclass is responsible for implementing the self-pipe.
240 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700241 raise NotImplementedError
242
243 def _process_events(self, event_list):
244 """Process selector events."""
245 raise NotImplementedError
246
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200247 def _check_closed(self):
248 if self._closed:
249 raise RuntimeError('Event loop is closed')
250
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700251 def run_forever(self):
252 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200253 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100254 if self.is_running():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700255 raise RuntimeError('Event loop is running.')
Victor Stinner956de692014-12-26 21:07:52 +0100256 self._owner = threading.get_ident()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257 try:
258 while True:
259 try:
260 self._run_once()
261 except _StopError:
262 break
263 finally:
Victor Stinner956de692014-12-26 21:07:52 +0100264 self._owner = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700265
266 def run_until_complete(self, future):
267 """Run until the Future is done.
268
269 If the argument is a coroutine, it is wrapped in a Task.
270
Victor Stinneracdb7822014-07-14 18:33:40 +0200271 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700272 with the same coroutine twice -- it would wrap it in two
273 different Tasks and that can't be good.
274
275 Return the Future's result, or raise its exception.
276 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200277 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200278
279 new_task = not isinstance(future, futures.Future)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700280 future = tasks.async(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200281 if new_task:
282 # An exception is raised if the future didn't complete, so there
283 # is no need to log the "destroy pending task" message
284 future._log_destroy_pending = False
285
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100286 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200287 try:
288 self.run_forever()
289 except:
290 if new_task and future.done() and not future.cancelled():
291 # The coroutine raised a BaseException. Consume the exception
292 # to not log a warning, the caller doesn't have access to the
293 # local task.
294 future.exception()
295 raise
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100296 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700297 if not future.done():
298 raise RuntimeError('Event loop stopped before Future completed.')
299
300 return future.result()
301
302 def stop(self):
303 """Stop running the event loop.
304
Victor Stinner5006b1f2014-07-24 11:34:11 +0200305 Every callback scheduled before stop() is called will run. Callbacks
306 scheduled after stop() is called will not run. However, those callbacks
307 will run if run_forever is called again later.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700308 """
309 self.call_soon(_raise_stop_error)
310
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200311 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700312 """Close the event loop.
313
314 This clears the queues and shuts down the executor,
315 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200316
317 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700318 """
Victor Stinner956de692014-12-26 21:07:52 +0100319 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200320 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200321 if self._closed:
322 return
Victor Stinnere912e652014-07-12 03:11:53 +0200323 if self._debug:
324 logger.debug("Close %r", self)
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200325 self._closed = True
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200326 self._ready.clear()
327 self._scheduled.clear()
328 executor = self._default_executor
329 if executor is not None:
330 self._default_executor = None
331 executor.shutdown(wait=False)
332
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200333 def is_closed(self):
334 """Returns True if the event loop was closed."""
335 return self._closed
336
Victor Stinner978a9af2015-01-29 17:50:58 +0100337 # On Python 3.3 and older, objects with a destructor part of a reference
338 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
339 # to the PEP 442.
340 if sys.version_info >= (3, 4):
341 def __del__(self):
342 if not self.is_closed():
343 warnings.warn("unclosed event loop %r" % self, ResourceWarning)
344 if not self.is_running():
345 self.close()
346
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200348 """Returns True if the event loop is running."""
Victor Stinner956de692014-12-26 21:07:52 +0100349 return (self._owner is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700350
351 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200352 """Return the time according to the event loop's clock.
353
354 This is a float expressed in seconds since an epoch, but the
355 epoch, precision, accuracy and drift are unspecified and may
356 differ per event loop.
357 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 return time.monotonic()
359
360 def call_later(self, delay, callback, *args):
361 """Arrange for a callback to be called at a given time.
362
363 Return a Handle: an opaque object with a cancel() method that
364 can be used to cancel the call.
365
366 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200367 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368
369 Each callback will be called exactly once. If two callbacks
370 are scheduled for exactly the same time, it undefined which
371 will be called first.
372
373 Any positional arguments after the callback will be passed to
374 the callback when it is called.
375 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200376 timer = self.call_at(self.time() + delay, callback, *args)
377 if timer._source_traceback:
378 del timer._source_traceback[-1]
379 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380
381 def call_at(self, when, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200382 """Like call_later(), but uses an absolute time.
383
384 Absolute time corresponds to the event loop's time() method.
385 """
Victor Stinner2d99d932014-11-20 15:03:52 +0100386 if (coroutines.iscoroutine(callback)
387 or coroutines.iscoroutinefunction(callback)):
Victor Stinner9af4a242014-02-11 11:34:30 +0100388 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100389 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100390 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100391 self._check_thread()
Yury Selivanov569efa22014-02-18 18:02:19 -0500392 timer = events.TimerHandle(when, callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200393 if timer._source_traceback:
394 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700395 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400396 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397 return timer
398
399 def call_soon(self, callback, *args):
400 """Arrange for a callback to be called as soon as possible.
401
Victor Stinneracdb7822014-07-14 18:33:40 +0200402 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403 order in which they are registered. Each callback will be
404 called exactly once.
405
406 Any positional arguments after the callback will be passed to
407 the callback when it is called.
408 """
Victor Stinner956de692014-12-26 21:07:52 +0100409 if self._debug:
410 self._check_thread()
411 handle = self._call_soon(callback, args)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200412 if handle._source_traceback:
413 del handle._source_traceback[-1]
414 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100415
Victor Stinner956de692014-12-26 21:07:52 +0100416 def _call_soon(self, callback, args):
Victor Stinner2d99d932014-11-20 15:03:52 +0100417 if (coroutines.iscoroutine(callback)
418 or coroutines.iscoroutinefunction(callback)):
Victor Stinner9af4a242014-02-11 11:34:30 +0100419 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100420 self._check_closed()
Yury Selivanov569efa22014-02-18 18:02:19 -0500421 handle = events.Handle(callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200422 if handle._source_traceback:
423 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424 self._ready.append(handle)
425 return handle
426
Victor Stinner956de692014-12-26 21:07:52 +0100427 def _check_thread(self):
428 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100429
Victor Stinneracdb7822014-07-14 18:33:40 +0200430 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100431 likely behave incorrectly when the assumption is violated.
432
Victor Stinneracdb7822014-07-14 18:33:40 +0200433 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100434 responsible for checking this condition for performance reasons.
435 """
Victor Stinner956de692014-12-26 21:07:52 +0100436 if self._owner is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200437 return
Victor Stinner956de692014-12-26 21:07:52 +0100438 thread_id = threading.get_ident()
439 if thread_id != self._owner:
Victor Stinner93569c22014-03-21 10:00:52 +0100440 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200441 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100442 "than the current one")
443
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444 def call_soon_threadsafe(self, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200445 """Like call_soon(), but thread-safe."""
Victor Stinner956de692014-12-26 21:07:52 +0100446 handle = self._call_soon(callback, args)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200447 if handle._source_traceback:
448 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700449 self._write_to_self()
450 return handle
451
452 def run_in_executor(self, executor, callback, *args):
Victor Stinner2d99d932014-11-20 15:03:52 +0100453 if (coroutines.iscoroutine(callback)
454 or coroutines.iscoroutinefunction(callback)):
455 raise TypeError("coroutines cannot be used with run_in_executor()")
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100456 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700457 if isinstance(callback, events.Handle):
458 assert not args
459 assert not isinstance(callback, events.TimerHandle)
460 if callback._cancelled:
461 f = futures.Future(loop=self)
462 f.set_result(None)
463 return f
464 callback, args = callback._callback, callback._args
465 if executor is None:
466 executor = self._default_executor
467 if executor is None:
468 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
469 self._default_executor = executor
470 return futures.wrap_future(executor.submit(callback, *args), loop=self)
471
472 def set_default_executor(self, executor):
473 self._default_executor = executor
474
Victor Stinnere912e652014-07-12 03:11:53 +0200475 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
476 msg = ["%s:%r" % (host, port)]
477 if family:
478 msg.append('family=%r' % family)
479 if type:
480 msg.append('type=%r' % type)
481 if proto:
482 msg.append('proto=%r' % proto)
483 if flags:
484 msg.append('flags=%r' % flags)
485 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200486 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200487
488 t0 = self.time()
489 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
490 dt = self.time() - t0
491
Victor Stinneracdb7822014-07-14 18:33:40 +0200492 msg = ('Getting address info %s took %.3f ms: %r'
Victor Stinnere912e652014-07-12 03:11:53 +0200493 % (msg, dt * 1e3, addrinfo))
494 if dt >= self.slow_callback_duration:
495 logger.info(msg)
496 else:
497 logger.debug(msg)
498 return addrinfo
499
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700500 def getaddrinfo(self, host, port, *,
501 family=0, type=0, proto=0, flags=0):
Victor Stinnere912e652014-07-12 03:11:53 +0200502 if self._debug:
503 return self.run_in_executor(None, self._getaddrinfo_debug,
504 host, port, family, type, proto, flags)
505 else:
506 return self.run_in_executor(None, socket.getaddrinfo,
507 host, port, family, type, proto, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700508
509 def getnameinfo(self, sockaddr, flags=0):
510 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
511
Victor Stinnerf951d282014-06-29 00:46:45 +0200512 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700513 def create_connection(self, protocol_factory, host=None, port=None, *,
514 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700515 local_addr=None, server_hostname=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200516 """Connect to a TCP server.
517
518 Create a streaming transport connection to a given Internet host and
519 port: socket family AF_INET or socket.AF_INET6 depending on host (or
520 family if specified), socket type SOCK_STREAM. protocol_factory must be
521 a callable returning a protocol instance.
522
523 This method is a coroutine which will try to establish the connection
524 in the background. When successful, the coroutine returns a
525 (transport, protocol) pair.
526 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700527 if server_hostname is not None and not ssl:
528 raise ValueError('server_hostname is only meaningful with ssl')
529
530 if server_hostname is None and ssl:
531 # Use host as default for server_hostname. It is an error
532 # if host is empty or not set, e.g. when an
533 # already-connected socket was passed or when only a port
534 # is given. To avoid this error, you can pass
535 # server_hostname='' -- this will bypass the hostname
536 # check. (This also means that if host is a numeric
537 # IP/IPv6 address, we will attempt to verify that exact
538 # address; this will probably fail, but it is possible to
539 # create a certificate for a specific IP address, so we
540 # don't judge it here.)
541 if not host:
542 raise ValueError('You must set server_hostname '
543 'when using ssl without a host')
544 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700545
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700546 if host is not None or port is not None:
547 if sock is not None:
548 raise ValueError(
549 'host/port and sock can not be specified at the same time')
550
551 f1 = self.getaddrinfo(
552 host, port, family=family,
553 type=socket.SOCK_STREAM, proto=proto, flags=flags)
554 fs = [f1]
555 if local_addr is not None:
556 f2 = self.getaddrinfo(
557 *local_addr, family=family,
558 type=socket.SOCK_STREAM, proto=proto, flags=flags)
559 fs.append(f2)
560 else:
561 f2 = None
562
563 yield from tasks.wait(fs, loop=self)
564
565 infos = f1.result()
566 if not infos:
567 raise OSError('getaddrinfo() returned empty list')
568 if f2 is not None:
569 laddr_infos = f2.result()
570 if not laddr_infos:
571 raise OSError('getaddrinfo() returned empty list')
572
573 exceptions = []
574 for family, type, proto, cname, address in infos:
575 try:
576 sock = socket.socket(family=family, type=type, proto=proto)
577 sock.setblocking(False)
578 if f2 is not None:
579 for _, _, _, _, laddr in laddr_infos:
580 try:
581 sock.bind(laddr)
582 break
583 except OSError as exc:
584 exc = OSError(
585 exc.errno, 'error while '
586 'attempting to bind on address '
587 '{!r}: {}'.format(
588 laddr, exc.strerror.lower()))
589 exceptions.append(exc)
590 else:
591 sock.close()
592 sock = None
593 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200594 if self._debug:
595 logger.debug("connect %r to %r", sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700596 yield from self.sock_connect(sock, address)
597 except OSError as exc:
598 if sock is not None:
599 sock.close()
600 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200601 except:
602 if sock is not None:
603 sock.close()
604 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700605 else:
606 break
607 else:
608 if len(exceptions) == 1:
609 raise exceptions[0]
610 else:
611 # If they all have the same str(), raise one.
612 model = str(exceptions[0])
613 if all(str(exc) == model for exc in exceptions):
614 raise exceptions[0]
615 # Raise a combined exception so the user can see all
616 # the various error messages.
617 raise OSError('Multiple exceptions: {}'.format(
618 ', '.join(str(exc) for exc in exceptions)))
619
620 elif sock is None:
621 raise ValueError(
622 'host and port was not specified and no sock specified')
623
624 sock.setblocking(False)
625
Yury Selivanovb057c522014-02-18 12:15:06 -0500626 transport, protocol = yield from self._create_connection_transport(
627 sock, protocol_factory, ssl, server_hostname)
Victor Stinnere912e652014-07-12 03:11:53 +0200628 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +0200629 # Get the socket from the transport because SSL transport closes
630 # the old socket and creates a new SSL socket
631 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +0200632 logger.debug("%r connected to %s:%r: (%r, %r)",
633 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -0500634 return transport, protocol
635
Victor Stinnerf951d282014-06-29 00:46:45 +0200636 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500637 def _create_connection_transport(self, sock, protocol_factory, ssl,
638 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700639 protocol = protocol_factory()
640 waiter = futures.Future(loop=self)
641 if ssl:
642 sslcontext = None if isinstance(ssl, bool) else ssl
643 transport = self._make_ssl_transport(
644 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700645 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700646 else:
647 transport = self._make_socket_transport(sock, protocol, waiter)
648
Victor Stinner29ad0112015-01-15 00:04:21 +0100649 try:
650 yield from waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +0100651 except:
Victor Stinner29ad0112015-01-15 00:04:21 +0100652 transport.close()
653 raise
654
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700655 return transport, protocol
656
Victor Stinnerf951d282014-06-29 00:46:45 +0200657 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700658 def create_datagram_endpoint(self, protocol_factory,
659 local_addr=None, remote_addr=None, *,
660 family=0, proto=0, flags=0):
661 """Create datagram connection."""
662 if not (local_addr or remote_addr):
663 if family == 0:
664 raise ValueError('unexpected address family')
665 addr_pairs_info = (((family, proto), (None, None)),)
666 else:
Victor Stinneracdb7822014-07-14 18:33:40 +0200667 # join address by (family, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700668 addr_infos = collections.OrderedDict()
669 for idx, addr in ((0, local_addr), (1, remote_addr)):
670 if addr is not None:
671 assert isinstance(addr, tuple) and len(addr) == 2, (
672 '2-tuple is expected')
673
674 infos = yield from self.getaddrinfo(
675 *addr, family=family, type=socket.SOCK_DGRAM,
676 proto=proto, flags=flags)
677 if not infos:
678 raise OSError('getaddrinfo() returned empty list')
679
680 for fam, _, pro, _, address in infos:
681 key = (fam, pro)
682 if key not in addr_infos:
683 addr_infos[key] = [None, None]
684 addr_infos[key][idx] = address
685
686 # each addr has to have info for each (family, proto) pair
687 addr_pairs_info = [
688 (key, addr_pair) for key, addr_pair in addr_infos.items()
689 if not ((local_addr and addr_pair[0] is None) or
690 (remote_addr and addr_pair[1] is None))]
691
692 if not addr_pairs_info:
693 raise ValueError('can not get address information')
694
695 exceptions = []
696
697 for ((family, proto),
698 (local_address, remote_address)) in addr_pairs_info:
699 sock = None
700 r_addr = None
701 try:
702 sock = socket.socket(
703 family=family, type=socket.SOCK_DGRAM, proto=proto)
704 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
705 sock.setblocking(False)
706
707 if local_addr:
708 sock.bind(local_address)
709 if remote_addr:
710 yield from self.sock_connect(sock, remote_address)
711 r_addr = remote_address
712 except OSError as exc:
713 if sock is not None:
714 sock.close()
715 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200716 except:
717 if sock is not None:
718 sock.close()
719 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700720 else:
721 break
722 else:
723 raise exceptions[0]
724
725 protocol = protocol_factory()
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200726 waiter = futures.Future(loop=self)
727 transport = self._make_datagram_transport(sock, protocol, r_addr,
728 waiter)
Victor Stinnere912e652014-07-12 03:11:53 +0200729 if self._debug:
730 if local_addr:
731 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
732 "created: (%r, %r)",
733 local_addr, remote_addr, transport, protocol)
734 else:
735 logger.debug("Datagram endpoint remote_addr=%r created: "
736 "(%r, %r)",
737 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +0100738
739 try:
740 yield from waiter
741 except:
742 transport.close()
743 raise
744
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700745 return transport, protocol
746
Victor Stinnerf951d282014-06-29 00:46:45 +0200747 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700748 def create_server(self, protocol_factory, host=None, port=None,
749 *,
750 family=socket.AF_UNSPEC,
751 flags=socket.AI_PASSIVE,
752 sock=None,
753 backlog=100,
754 ssl=None,
755 reuse_address=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200756 """Create a TCP server bound to host and port.
757
Victor Stinneracdb7822014-07-14 18:33:40 +0200758 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +0200759
760 This method is a coroutine.
761 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700762 if isinstance(ssl, bool):
763 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700764 if host is not None or port is not None:
765 if sock is not None:
766 raise ValueError(
767 'host/port and sock can not be specified at the same time')
768
769 AF_INET6 = getattr(socket, 'AF_INET6', 0)
770 if reuse_address is None:
771 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
772 sockets = []
773 if host == '':
774 host = None
775
776 infos = yield from self.getaddrinfo(
777 host, port, family=family,
778 type=socket.SOCK_STREAM, proto=0, flags=flags)
779 if not infos:
780 raise OSError('getaddrinfo() returned empty list')
781
782 completed = False
783 try:
784 for res in infos:
785 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700786 try:
787 sock = socket.socket(af, socktype, proto)
788 except socket.error:
789 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +0200790 if self._debug:
791 logger.warning('create_server() failed to create '
792 'socket.socket(%r, %r, %r)',
793 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -0700794 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700795 sockets.append(sock)
796 if reuse_address:
797 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
798 True)
799 # Disable IPv4/IPv6 dual stack support (enabled by
800 # default on Linux) which makes a single socket
801 # listen on both address families.
802 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
803 sock.setsockopt(socket.IPPROTO_IPV6,
804 socket.IPV6_V6ONLY,
805 True)
806 try:
807 sock.bind(sa)
808 except OSError as err:
809 raise OSError(err.errno, 'error while attempting '
810 'to bind on address %r: %s'
811 % (sa, err.strerror.lower()))
812 completed = True
813 finally:
814 if not completed:
815 for sock in sockets:
816 sock.close()
817 else:
818 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +0200819 raise ValueError('Neither host/port nor sock were specified')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700820 sockets = [sock]
821
822 server = Server(self, sockets)
823 for sock in sockets:
824 sock.listen(backlog)
825 sock.setblocking(False)
826 self._start_serving(protocol_factory, sock, ssl, server)
Victor Stinnere912e652014-07-12 03:11:53 +0200827 if self._debug:
828 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700829 return server
830
Victor Stinnerf951d282014-06-29 00:46:45 +0200831 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700832 def connect_read_pipe(self, protocol_factory, pipe):
833 protocol = protocol_factory()
834 waiter = futures.Future(loop=self)
835 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +0100836
837 try:
838 yield from waiter
839 except:
840 transport.close()
841 raise
842
Victor Stinneracdb7822014-07-14 18:33:40 +0200843 if self._debug:
844 logger.debug('Read pipe %r connected: (%r, %r)',
845 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700846 return transport, protocol
847
Victor Stinnerf951d282014-06-29 00:46:45 +0200848 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700849 def connect_write_pipe(self, protocol_factory, pipe):
850 protocol = protocol_factory()
851 waiter = futures.Future(loop=self)
852 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +0100853
854 try:
855 yield from waiter
856 except:
857 transport.close()
858 raise
859
Victor Stinneracdb7822014-07-14 18:33:40 +0200860 if self._debug:
861 logger.debug('Write pipe %r connected: (%r, %r)',
862 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700863 return transport, protocol
864
Victor Stinneracdb7822014-07-14 18:33:40 +0200865 def _log_subprocess(self, msg, stdin, stdout, stderr):
866 info = [msg]
867 if stdin is not None:
868 info.append('stdin=%s' % _format_pipe(stdin))
869 if stdout is not None and stderr == subprocess.STDOUT:
870 info.append('stdout=stderr=%s' % _format_pipe(stdout))
871 else:
872 if stdout is not None:
873 info.append('stdout=%s' % _format_pipe(stdout))
874 if stderr is not None:
875 info.append('stderr=%s' % _format_pipe(stderr))
876 logger.debug(' '.join(info))
877
Victor Stinnerf951d282014-06-29 00:46:45 +0200878 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700879 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
880 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
881 universal_newlines=False, shell=True, bufsize=0,
882 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100883 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800884 raise ValueError("cmd must be a string")
885 if universal_newlines:
886 raise ValueError("universal_newlines must be False")
887 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100888 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800889 if bufsize != 0:
890 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700891 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +0200892 if self._debug:
893 # don't log parameters: they may contain sensitive information
894 # (password) and may be too long
895 debug_log = 'run shell command %r' % cmd
896 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700897 transport = yield from self._make_subprocess_transport(
898 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +0200899 if self._debug:
900 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700901 return transport, protocol
902
Victor Stinnerf951d282014-06-29 00:46:45 +0200903 @coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500904 def subprocess_exec(self, protocol_factory, program, *args,
905 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
906 stderr=subprocess.PIPE, universal_newlines=False,
907 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800908 if universal_newlines:
909 raise ValueError("universal_newlines must be False")
910 if shell:
911 raise ValueError("shell must be False")
912 if bufsize != 0:
913 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100914 popen_args = (program,) + args
915 for arg in popen_args:
916 if not isinstance(arg, (str, bytes)):
917 raise TypeError("program arguments must be "
918 "a bytes or text string, not %s"
919 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700920 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +0200921 if self._debug:
922 # don't log parameters: they may contain sensitive information
923 # (password) and may be too long
924 debug_log = 'execute program %r' % program
925 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700926 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500927 protocol, popen_args, False, stdin, stdout, stderr,
928 bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +0200929 if self._debug:
930 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700931 return transport, protocol
932
Yury Selivanov569efa22014-02-18 18:02:19 -0500933 def set_exception_handler(self, handler):
934 """Set handler as the new event loop exception handler.
935
936 If handler is None, the default exception handler will
937 be set.
938
939 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +0200940 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -0500941 will be a reference to the active event loop, 'context'
942 will be a dict object (see `call_exception_handler()`
943 documentation for details about context).
944 """
945 if handler is not None and not callable(handler):
946 raise TypeError('A callable object or None is expected, '
947 'got {!r}'.format(handler))
948 self._exception_handler = handler
949
950 def default_exception_handler(self, context):
951 """Default exception handler.
952
953 This is called when an exception occurs and no exception
954 handler is set, and can be called by a custom exception
955 handler that wants to defer to the default behavior.
956
Victor Stinneracdb7822014-07-14 18:33:40 +0200957 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -0500958 `call_exception_handler()`.
959 """
960 message = context.get('message')
961 if not message:
962 message = 'Unhandled exception in event loop'
963
964 exception = context.get('exception')
965 if exception is not None:
966 exc_info = (type(exception), exception, exception.__traceback__)
967 else:
968 exc_info = False
969
Victor Stinnerff018e42015-01-28 00:30:40 +0100970 if ('source_traceback' not in context
971 and self._current_handle is not None
Victor Stinner9b524d52015-01-26 11:05:12 +0100972 and self._current_handle._source_traceback):
973 context['handle_traceback'] = self._current_handle._source_traceback
974
Yury Selivanov569efa22014-02-18 18:02:19 -0500975 log_lines = [message]
976 for key in sorted(context):
977 if key in {'message', 'exception'}:
978 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +0200979 value = context[key]
980 if key == 'source_traceback':
981 tb = ''.join(traceback.format_list(value))
982 value = 'Object created at (most recent call last):\n'
983 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +0100984 elif key == 'handle_traceback':
985 tb = ''.join(traceback.format_list(value))
986 value = 'Handle created at (most recent call last):\n'
987 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +0200988 else:
989 value = repr(value)
990 log_lines.append('{}: {}'.format(key, value))
Yury Selivanov569efa22014-02-18 18:02:19 -0500991
992 logger.error('\n'.join(log_lines), exc_info=exc_info)
993
994 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +0200995 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -0500996
Victor Stinneracdb7822014-07-14 18:33:40 +0200997 The context argument is a dict containing the following keys:
998
Yury Selivanov569efa22014-02-18 18:02:19 -0500999 - 'message': Error message;
1000 - 'exception' (optional): Exception object;
1001 - 'future' (optional): Future instance;
1002 - 'handle' (optional): Handle instance;
1003 - 'protocol' (optional): Protocol instance;
1004 - 'transport' (optional): Transport instance;
1005 - 'socket' (optional): Socket instance.
1006
Victor Stinneracdb7822014-07-14 18:33:40 +02001007 New keys maybe introduced in the future.
1008
1009 Note: do not overload this method in an event loop subclass.
1010 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001011 `set_exception_handler()` method.
1012 """
1013 if self._exception_handler is None:
1014 try:
1015 self.default_exception_handler(context)
1016 except Exception:
1017 # Second protection layer for unexpected errors
1018 # in the default implementation, as well as for subclassed
1019 # event loops with overloaded "default_exception_handler".
1020 logger.error('Exception in default exception handler',
1021 exc_info=True)
1022 else:
1023 try:
1024 self._exception_handler(self, context)
1025 except Exception as exc:
1026 # Exception in the user set custom exception handler.
1027 try:
1028 # Let's try default handler.
1029 self.default_exception_handler({
1030 'message': 'Unhandled error in exception handler',
1031 'exception': exc,
1032 'context': context,
1033 })
1034 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +02001035 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001036 # overloaded.
1037 logger.error('Exception in default exception handler '
1038 'while handling an unexpected error '
1039 'in custom exception handler',
1040 exc_info=True)
1041
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001042 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001043 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001044 assert isinstance(handle, events.Handle), 'A Handle is required here'
1045 if handle._cancelled:
1046 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001047 assert not isinstance(handle, events.TimerHandle)
1048 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001049
1050 def _add_callback_signalsafe(self, handle):
1051 """Like _add_callback() but called from a signal handler."""
1052 self._add_callback(handle)
1053 self._write_to_self()
1054
Yury Selivanov592ada92014-09-25 12:07:56 -04001055 def _timer_handle_cancelled(self, handle):
1056 """Notification that a TimerHandle has been cancelled."""
1057 if handle._scheduled:
1058 self._timer_cancelled_count += 1
1059
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001060 def _run_once(self):
1061 """Run one full iteration of the event loop.
1062
1063 This calls all currently ready callbacks, polls for I/O,
1064 schedules the resulting callbacks, and finally schedules
1065 'call_later' callbacks.
1066 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001067
Yury Selivanov592ada92014-09-25 12:07:56 -04001068 sched_count = len(self._scheduled)
1069 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1070 self._timer_cancelled_count / sched_count >
1071 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001072 # Remove delayed calls that were cancelled if their number
1073 # is too high
1074 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001075 for handle in self._scheduled:
1076 if handle._cancelled:
1077 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001078 else:
1079 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001080
Victor Stinner68da8fc2014-09-30 18:08:36 +02001081 heapq.heapify(new_scheduled)
1082 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001083 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001084 else:
1085 # Remove delayed calls that were cancelled from head of queue.
1086 while self._scheduled and self._scheduled[0]._cancelled:
1087 self._timer_cancelled_count -= 1
1088 handle = heapq.heappop(self._scheduled)
1089 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001090
1091 timeout = None
1092 if self._ready:
1093 timeout = 0
1094 elif self._scheduled:
1095 # Compute the desired timeout.
1096 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -07001097 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001098
Victor Stinner770e48d2014-07-11 11:58:33 +02001099 if self._debug and timeout != 0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001100 t0 = self.time()
1101 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001102 dt = self.time() - t0
Victor Stinner770e48d2014-07-11 11:58:33 +02001103 if dt >= 1.0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001104 level = logging.INFO
1105 else:
1106 level = logging.DEBUG
Victor Stinner770e48d2014-07-11 11:58:33 +02001107 nevent = len(event_list)
1108 if timeout is None:
1109 logger.log(level, 'poll took %.3f ms: %s events',
1110 dt * 1e3, nevent)
1111 elif nevent:
1112 logger.log(level,
1113 'poll %.3f ms took %.3f ms: %s events',
1114 timeout * 1e3, dt * 1e3, nevent)
1115 elif dt >= 1.0:
1116 logger.log(level,
1117 'poll %.3f ms took %.3f ms: timeout',
1118 timeout * 1e3, dt * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001119 else:
Victor Stinner22463aa2014-01-20 23:56:40 +01001120 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001121 self._process_events(event_list)
1122
1123 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001124 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001125 while self._scheduled:
1126 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001127 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001128 break
1129 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001130 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001131 self._ready.append(handle)
1132
1133 # This is the only place where callbacks are actually *called*.
1134 # All other places just add them to ready.
1135 # Note: We run all currently scheduled callbacks, but not any
1136 # callbacks scheduled by callbacks run this time around --
1137 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001138 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001139 ntodo = len(self._ready)
1140 for i in range(ntodo):
1141 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001142 if handle._cancelled:
1143 continue
1144 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001145 try:
1146 self._current_handle = handle
1147 t0 = self.time()
1148 handle._run()
1149 dt = self.time() - t0
1150 if dt >= self.slow_callback_duration:
1151 logger.warning('Executing %s took %.3f seconds',
1152 _format_handle(handle), dt)
1153 finally:
1154 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001155 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001156 handle._run()
1157 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001158
1159 def get_debug(self):
1160 return self._debug
1161
1162 def set_debug(self, enabled):
1163 self._debug = enabled