blob: e40d3ad5f2b4cd23fa71f4f9dbbc205456fbbb5d [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
Victor Stinner0e6f52a2014-06-20 17:34:15 +020020import inspect
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
24import subprocess
Victor Stinner956de692014-12-26 21:07:52 +010025import threading
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020027import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028import sys
29
Victor Stinnerf951d282014-06-29 00:46:45 +020030from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031from . import events
32from . import futures
33from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020034from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070035from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070036
37
Victor Stinner8c1a4a22015-01-06 01:03:58 +010038__all__ = ['BaseEventLoop']
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039
40
41# Argument for default thread pool executor creation.
42_MAX_WORKERS = 5
43
Yury Selivanov592ada92014-09-25 12:07:56 -040044# Minimum number of _scheduled timer handles before cleanup of
45# cancelled handles is performed.
46_MIN_SCHEDULED_TIMER_HANDLES = 100
47
48# Minimum fraction of _scheduled timer handles that are cancelled
49# before cleanup of cancelled handles is performed.
50_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051
Victor Stinner0e6f52a2014-06-20 17:34:15 +020052def _format_handle(handle):
53 cb = handle._callback
54 if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
55 # format the task
56 return repr(cb.__self__)
57 else:
58 return str(handle)
59
60
Victor Stinneracdb7822014-07-14 18:33:40 +020061def _format_pipe(fd):
62 if fd == subprocess.PIPE:
63 return '<pipe>'
64 elif fd == subprocess.STDOUT:
65 return '<stdout>'
66 else:
67 return repr(fd)
68
69
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070070class _StopError(BaseException):
71 """Raised to stop the event loop."""
72
73
Victor Stinner1b0580b2014-02-13 09:24:37 +010074def _check_resolved_address(sock, address):
75 # Ensure that the address is already resolved to avoid the trap of hanging
76 # the entire event loop when the address requires doing a DNS lookup.
77 family = sock.family
Victor Stinnerd1a727a2014-02-20 16:43:09 +010078 if family == socket.AF_INET:
79 host, port = address
80 elif family == socket.AF_INET6:
Victor Stinner934c8852014-02-20 21:59:38 +010081 host, port = address[:2]
Victor Stinnerd1a727a2014-02-20 16:43:09 +010082 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010083 return
84
Victor Stinner1b0580b2014-02-13 09:24:37 +010085 type_mask = 0
86 if hasattr(socket, 'SOCK_NONBLOCK'):
87 type_mask |= socket.SOCK_NONBLOCK
88 if hasattr(socket, 'SOCK_CLOEXEC'):
89 type_mask |= socket.SOCK_CLOEXEC
Victor Stinneracdb7822014-07-14 18:33:40 +020090 # Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
Victor Stinner1b0580b2014-02-13 09:24:37 +010091 # already resolved.
92 try:
93 socket.getaddrinfo(host, port,
94 family=family,
95 type=(sock.type & ~type_mask),
96 proto=sock.proto,
97 flags=socket.AI_NUMERICHOST)
98 except socket.gaierror as err:
99 raise ValueError("address must be resolved (IP address), got %r: %s"
100 % (address, err))
101
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700102def _raise_stop_error(*args):
103 raise _StopError
104
105
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100106def _run_until_complete_cb(fut):
107 exc = fut._exception
108 if (isinstance(exc, BaseException)
109 and not isinstance(exc, Exception)):
110 # Issue #22429: run_forever() already finished, no need to
111 # stop it.
112 return
113 _raise_stop_error()
114
115
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700116class Server(events.AbstractServer):
117
118 def __init__(self, loop, sockets):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200119 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700120 self.sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200121 self._active_count = 0
122 self._waiters = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700123
Victor Stinnere912e652014-07-12 03:11:53 +0200124 def __repr__(self):
125 return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
126
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200127 def _attach(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700128 assert self.sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200129 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700130
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200131 def _detach(self):
132 assert self._active_count > 0
133 self._active_count -= 1
134 if self._active_count == 0 and self.sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700135 self._wakeup()
136
137 def close(self):
138 sockets = self.sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200139 if sockets is None:
140 return
141 self.sockets = None
142 for sock in sockets:
143 self._loop._stop_serving(sock)
144 if self._active_count == 0:
145 self._wakeup()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146
147 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200148 waiters = self._waiters
149 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700150 for waiter in waiters:
151 if not waiter.done():
152 waiter.set_result(waiter)
153
Victor Stinnerf951d282014-06-29 00:46:45 +0200154 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700155 def wait_closed(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200156 if self.sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700157 return
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200158 waiter = futures.Future(loop=self._loop)
159 self._waiters.append(waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160 yield from waiter
161
162
163class BaseEventLoop(events.AbstractEventLoop):
164
165 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400166 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200167 self._closed = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700168 self._ready = collections.deque()
169 self._scheduled = []
170 self._default_executor = None
171 self._internal_fds = 0
Victor Stinner956de692014-12-26 21:07:52 +0100172 # Identifier of the thread running the event loop, or None if the
173 # event loop is not running
174 self._owner = None
Victor Stinnered1654f2014-02-10 23:42:32 +0100175 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500176 self._exception_handler = None
Victor Stinner7b7120e2014-06-23 00:12:14 +0200177 self._debug = (not sys.flags.ignore_environment
178 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200179 # In debug mode, if the execution of a callback or a step of a task
180 # exceed this duration in seconds, the slow callback/task is logged.
181 self.slow_callback_duration = 0.1
Victor Stinner9b524d52015-01-26 11:05:12 +0100182 self._current_handle = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700183
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200184 def __repr__(self):
185 return ('<%s running=%s closed=%s debug=%s>'
186 % (self.__class__.__name__, self.is_running(),
187 self.is_closed(), self.get_debug()))
188
Victor Stinner896a25a2014-07-08 11:29:25 +0200189 def create_task(self, coro):
190 """Schedule a coroutine object.
191
Victor Stinneracdb7822014-07-14 18:33:40 +0200192 Return a task object.
193 """
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100194 self._check_closed()
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200195 task = tasks.Task(coro, loop=self)
196 if task._source_traceback:
197 del task._source_traceback[-1]
198 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200199
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200 def _make_socket_transport(self, sock, protocol, waiter=None, *,
201 extra=None, server=None):
202 """Create socket transport."""
203 raise NotImplementedError
204
Victor Stinner15cc6782015-01-09 00:09:10 +0100205 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
206 *, server_side=False, server_hostname=None,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700207 extra=None, server=None):
208 """Create SSL transport."""
209 raise NotImplementedError
210
211 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200212 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700213 """Create datagram transport."""
214 raise NotImplementedError
215
216 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
217 extra=None):
218 """Create read pipe transport."""
219 raise NotImplementedError
220
221 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
222 extra=None):
223 """Create write pipe transport."""
224 raise NotImplementedError
225
Victor Stinnerf951d282014-06-29 00:46:45 +0200226 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227 def _make_subprocess_transport(self, protocol, args, shell,
228 stdin, stdout, stderr, bufsize,
229 extra=None, **kwargs):
230 """Create subprocess transport."""
231 raise NotImplementedError
232
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200234 """Write a byte to self-pipe, to wake up the event loop.
235
236 This may be called from a different thread.
237
238 The subclass is responsible for implementing the self-pipe.
239 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700240 raise NotImplementedError
241
242 def _process_events(self, event_list):
243 """Process selector events."""
244 raise NotImplementedError
245
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200246 def _check_closed(self):
247 if self._closed:
248 raise RuntimeError('Event loop is closed')
249
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700250 def run_forever(self):
251 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200252 self._check_closed()
Victor Stinner956de692014-12-26 21:07:52 +0100253 if self.is_running():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254 raise RuntimeError('Event loop is running.')
Victor Stinner956de692014-12-26 21:07:52 +0100255 self._owner = threading.get_ident()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700256 try:
257 while True:
258 try:
259 self._run_once()
260 except _StopError:
261 break
262 finally:
Victor Stinner956de692014-12-26 21:07:52 +0100263 self._owner = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264
265 def run_until_complete(self, future):
266 """Run until the Future is done.
267
268 If the argument is a coroutine, it is wrapped in a Task.
269
Victor Stinneracdb7822014-07-14 18:33:40 +0200270 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271 with the same coroutine twice -- it would wrap it in two
272 different Tasks and that can't be good.
273
274 Return the Future's result, or raise its exception.
275 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200276 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200277
278 new_task = not isinstance(future, futures.Future)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700279 future = tasks.async(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200280 if new_task:
281 # An exception is raised if the future didn't complete, so there
282 # is no need to log the "destroy pending task" message
283 future._log_destroy_pending = False
284
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100285 future.add_done_callback(_run_until_complete_cb)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200286 try:
287 self.run_forever()
288 except:
289 if new_task and future.done() and not future.cancelled():
290 # The coroutine raised a BaseException. Consume the exception
291 # to not log a warning, the caller doesn't have access to the
292 # local task.
293 future.exception()
294 raise
Victor Stinnerf3e2e092014-12-05 01:44:10 +0100295 future.remove_done_callback(_run_until_complete_cb)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700296 if not future.done():
297 raise RuntimeError('Event loop stopped before Future completed.')
298
299 return future.result()
300
301 def stop(self):
302 """Stop running the event loop.
303
Victor Stinner5006b1f2014-07-24 11:34:11 +0200304 Every callback scheduled before stop() is called will run. Callbacks
305 scheduled after stop() is called will not run. However, those callbacks
306 will run if run_forever is called again later.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307 """
308 self.call_soon(_raise_stop_error)
309
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200310 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700311 """Close the event loop.
312
313 This clears the queues and shuts down the executor,
314 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200315
316 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700317 """
Victor Stinner956de692014-12-26 21:07:52 +0100318 if self.is_running():
Victor Stinneracdb7822014-07-14 18:33:40 +0200319 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200320 if self._closed:
321 return
Victor Stinnere912e652014-07-12 03:11:53 +0200322 if self._debug:
323 logger.debug("Close %r", self)
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200324 self._closed = True
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200325 self._ready.clear()
326 self._scheduled.clear()
327 executor = self._default_executor
328 if executor is not None:
329 self._default_executor = None
330 executor.shutdown(wait=False)
331
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200332 def is_closed(self):
333 """Returns True if the event loop was closed."""
334 return self._closed
335
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200337 """Returns True if the event loop is running."""
Victor Stinner956de692014-12-26 21:07:52 +0100338 return (self._owner is not None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700339
340 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200341 """Return the time according to the event loop's clock.
342
343 This is a float expressed in seconds since an epoch, but the
344 epoch, precision, accuracy and drift are unspecified and may
345 differ per event loop.
346 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700347 return time.monotonic()
348
349 def call_later(self, delay, callback, *args):
350 """Arrange for a callback to be called at a given time.
351
352 Return a Handle: an opaque object with a cancel() method that
353 can be used to cancel the call.
354
355 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200356 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700357
358 Each callback will be called exactly once. If two callbacks
359 are scheduled for exactly the same time, it undefined which
360 will be called first.
361
362 Any positional arguments after the callback will be passed to
363 the callback when it is called.
364 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200365 timer = self.call_at(self.time() + delay, callback, *args)
366 if timer._source_traceback:
367 del timer._source_traceback[-1]
368 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369
370 def call_at(self, when, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200371 """Like call_later(), but uses an absolute time.
372
373 Absolute time corresponds to the event loop's time() method.
374 """
Victor Stinner2d99d932014-11-20 15:03:52 +0100375 if (coroutines.iscoroutine(callback)
376 or coroutines.iscoroutinefunction(callback)):
Victor Stinner9af4a242014-02-11 11:34:30 +0100377 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100378 self._check_closed()
Victor Stinner93569c22014-03-21 10:00:52 +0100379 if self._debug:
Victor Stinner956de692014-12-26 21:07:52 +0100380 self._check_thread()
Yury Selivanov569efa22014-02-18 18:02:19 -0500381 timer = events.TimerHandle(when, callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200382 if timer._source_traceback:
383 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700384 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400385 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 return timer
387
388 def call_soon(self, callback, *args):
389 """Arrange for a callback to be called as soon as possible.
390
Victor Stinneracdb7822014-07-14 18:33:40 +0200391 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 order in which they are registered. Each callback will be
393 called exactly once.
394
395 Any positional arguments after the callback will be passed to
396 the callback when it is called.
397 """
Victor Stinner956de692014-12-26 21:07:52 +0100398 if self._debug:
399 self._check_thread()
400 handle = self._call_soon(callback, args)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200401 if handle._source_traceback:
402 del handle._source_traceback[-1]
403 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100404
Victor Stinner956de692014-12-26 21:07:52 +0100405 def _call_soon(self, callback, args):
Victor Stinner2d99d932014-11-20 15:03:52 +0100406 if (coroutines.iscoroutine(callback)
407 or coroutines.iscoroutinefunction(callback)):
Victor Stinner9af4a242014-02-11 11:34:30 +0100408 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100409 self._check_closed()
Yury Selivanov569efa22014-02-18 18:02:19 -0500410 handle = events.Handle(callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200411 if handle._source_traceback:
412 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413 self._ready.append(handle)
414 return handle
415
Victor Stinner956de692014-12-26 21:07:52 +0100416 def _check_thread(self):
417 """Check that the current thread is the thread running the event loop.
Victor Stinner93569c22014-03-21 10:00:52 +0100418
Victor Stinneracdb7822014-07-14 18:33:40 +0200419 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100420 likely behave incorrectly when the assumption is violated.
421
Victor Stinneracdb7822014-07-14 18:33:40 +0200422 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100423 responsible for checking this condition for performance reasons.
424 """
Victor Stinner956de692014-12-26 21:07:52 +0100425 if self._owner is None:
Victor Stinner751c7c02014-06-23 15:14:13 +0200426 return
Victor Stinner956de692014-12-26 21:07:52 +0100427 thread_id = threading.get_ident()
428 if thread_id != self._owner:
Victor Stinner93569c22014-03-21 10:00:52 +0100429 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200430 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100431 "than the current one")
432
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433 def call_soon_threadsafe(self, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200434 """Like call_soon(), but thread-safe."""
Victor Stinner956de692014-12-26 21:07:52 +0100435 handle = self._call_soon(callback, args)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200436 if handle._source_traceback:
437 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700438 self._write_to_self()
439 return handle
440
441 def run_in_executor(self, executor, callback, *args):
Victor Stinner2d99d932014-11-20 15:03:52 +0100442 if (coroutines.iscoroutine(callback)
443 or coroutines.iscoroutinefunction(callback)):
444 raise TypeError("coroutines cannot be used with run_in_executor()")
Victor Stinnere80bf0d2014-12-04 23:07:47 +0100445 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446 if isinstance(callback, events.Handle):
447 assert not args
448 assert not isinstance(callback, events.TimerHandle)
449 if callback._cancelled:
450 f = futures.Future(loop=self)
451 f.set_result(None)
452 return f
453 callback, args = callback._callback, callback._args
454 if executor is None:
455 executor = self._default_executor
456 if executor is None:
457 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
458 self._default_executor = executor
459 return futures.wrap_future(executor.submit(callback, *args), loop=self)
460
461 def set_default_executor(self, executor):
462 self._default_executor = executor
463
Victor Stinnere912e652014-07-12 03:11:53 +0200464 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
465 msg = ["%s:%r" % (host, port)]
466 if family:
467 msg.append('family=%r' % family)
468 if type:
469 msg.append('type=%r' % type)
470 if proto:
471 msg.append('proto=%r' % proto)
472 if flags:
473 msg.append('flags=%r' % flags)
474 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200475 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200476
477 t0 = self.time()
478 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
479 dt = self.time() - t0
480
Victor Stinneracdb7822014-07-14 18:33:40 +0200481 msg = ('Getting address info %s took %.3f ms: %r'
Victor Stinnere912e652014-07-12 03:11:53 +0200482 % (msg, dt * 1e3, addrinfo))
483 if dt >= self.slow_callback_duration:
484 logger.info(msg)
485 else:
486 logger.debug(msg)
487 return addrinfo
488
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700489 def getaddrinfo(self, host, port, *,
490 family=0, type=0, proto=0, flags=0):
Victor Stinnere912e652014-07-12 03:11:53 +0200491 if self._debug:
492 return self.run_in_executor(None, self._getaddrinfo_debug,
493 host, port, family, type, proto, flags)
494 else:
495 return self.run_in_executor(None, socket.getaddrinfo,
496 host, port, family, type, proto, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700497
498 def getnameinfo(self, sockaddr, flags=0):
499 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
500
Victor Stinnerf951d282014-06-29 00:46:45 +0200501 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700502 def create_connection(self, protocol_factory, host=None, port=None, *,
503 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700504 local_addr=None, server_hostname=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200505 """Connect to a TCP server.
506
507 Create a streaming transport connection to a given Internet host and
508 port: socket family AF_INET or socket.AF_INET6 depending on host (or
509 family if specified), socket type SOCK_STREAM. protocol_factory must be
510 a callable returning a protocol instance.
511
512 This method is a coroutine which will try to establish the connection
513 in the background. When successful, the coroutine returns a
514 (transport, protocol) pair.
515 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700516 if server_hostname is not None and not ssl:
517 raise ValueError('server_hostname is only meaningful with ssl')
518
519 if server_hostname is None and ssl:
520 # Use host as default for server_hostname. It is an error
521 # if host is empty or not set, e.g. when an
522 # already-connected socket was passed or when only a port
523 # is given. To avoid this error, you can pass
524 # server_hostname='' -- this will bypass the hostname
525 # check. (This also means that if host is a numeric
526 # IP/IPv6 address, we will attempt to verify that exact
527 # address; this will probably fail, but it is possible to
528 # create a certificate for a specific IP address, so we
529 # don't judge it here.)
530 if not host:
531 raise ValueError('You must set server_hostname '
532 'when using ssl without a host')
533 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700534
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700535 if host is not None or port is not None:
536 if sock is not None:
537 raise ValueError(
538 'host/port and sock can not be specified at the same time')
539
540 f1 = self.getaddrinfo(
541 host, port, family=family,
542 type=socket.SOCK_STREAM, proto=proto, flags=flags)
543 fs = [f1]
544 if local_addr is not None:
545 f2 = self.getaddrinfo(
546 *local_addr, family=family,
547 type=socket.SOCK_STREAM, proto=proto, flags=flags)
548 fs.append(f2)
549 else:
550 f2 = None
551
552 yield from tasks.wait(fs, loop=self)
553
554 infos = f1.result()
555 if not infos:
556 raise OSError('getaddrinfo() returned empty list')
557 if f2 is not None:
558 laddr_infos = f2.result()
559 if not laddr_infos:
560 raise OSError('getaddrinfo() returned empty list')
561
562 exceptions = []
563 for family, type, proto, cname, address in infos:
564 try:
565 sock = socket.socket(family=family, type=type, proto=proto)
566 sock.setblocking(False)
567 if f2 is not None:
568 for _, _, _, _, laddr in laddr_infos:
569 try:
570 sock.bind(laddr)
571 break
572 except OSError as exc:
573 exc = OSError(
574 exc.errno, 'error while '
575 'attempting to bind on address '
576 '{!r}: {}'.format(
577 laddr, exc.strerror.lower()))
578 exceptions.append(exc)
579 else:
580 sock.close()
581 sock = None
582 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200583 if self._debug:
584 logger.debug("connect %r to %r", sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700585 yield from self.sock_connect(sock, address)
586 except OSError as exc:
587 if sock is not None:
588 sock.close()
589 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200590 except:
591 if sock is not None:
592 sock.close()
593 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700594 else:
595 break
596 else:
597 if len(exceptions) == 1:
598 raise exceptions[0]
599 else:
600 # If they all have the same str(), raise one.
601 model = str(exceptions[0])
602 if all(str(exc) == model for exc in exceptions):
603 raise exceptions[0]
604 # Raise a combined exception so the user can see all
605 # the various error messages.
606 raise OSError('Multiple exceptions: {}'.format(
607 ', '.join(str(exc) for exc in exceptions)))
608
609 elif sock is None:
610 raise ValueError(
611 'host and port was not specified and no sock specified')
612
613 sock.setblocking(False)
614
Yury Selivanovb057c522014-02-18 12:15:06 -0500615 transport, protocol = yield from self._create_connection_transport(
616 sock, protocol_factory, ssl, server_hostname)
Victor Stinnere912e652014-07-12 03:11:53 +0200617 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +0200618 # Get the socket from the transport because SSL transport closes
619 # the old socket and creates a new SSL socket
620 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +0200621 logger.debug("%r connected to %s:%r: (%r, %r)",
622 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -0500623 return transport, protocol
624
Victor Stinnerf951d282014-06-29 00:46:45 +0200625 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500626 def _create_connection_transport(self, sock, protocol_factory, ssl,
627 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628 protocol = protocol_factory()
629 waiter = futures.Future(loop=self)
630 if ssl:
631 sslcontext = None if isinstance(ssl, bool) else ssl
632 transport = self._make_ssl_transport(
633 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700634 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700635 else:
636 transport = self._make_socket_transport(sock, protocol, waiter)
637
Victor Stinner29ad0112015-01-15 00:04:21 +0100638 try:
639 yield from waiter
Victor Stinner0c2e4082015-01-22 00:17:41 +0100640 except:
Victor Stinner29ad0112015-01-15 00:04:21 +0100641 transport.close()
642 raise
643
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700644 return transport, protocol
645
Victor Stinnerf951d282014-06-29 00:46:45 +0200646 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647 def create_datagram_endpoint(self, protocol_factory,
648 local_addr=None, remote_addr=None, *,
649 family=0, proto=0, flags=0):
650 """Create datagram connection."""
651 if not (local_addr or remote_addr):
652 if family == 0:
653 raise ValueError('unexpected address family')
654 addr_pairs_info = (((family, proto), (None, None)),)
655 else:
Victor Stinneracdb7822014-07-14 18:33:40 +0200656 # join address by (family, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700657 addr_infos = collections.OrderedDict()
658 for idx, addr in ((0, local_addr), (1, remote_addr)):
659 if addr is not None:
660 assert isinstance(addr, tuple) and len(addr) == 2, (
661 '2-tuple is expected')
662
663 infos = yield from self.getaddrinfo(
664 *addr, family=family, type=socket.SOCK_DGRAM,
665 proto=proto, flags=flags)
666 if not infos:
667 raise OSError('getaddrinfo() returned empty list')
668
669 for fam, _, pro, _, address in infos:
670 key = (fam, pro)
671 if key not in addr_infos:
672 addr_infos[key] = [None, None]
673 addr_infos[key][idx] = address
674
675 # each addr has to have info for each (family, proto) pair
676 addr_pairs_info = [
677 (key, addr_pair) for key, addr_pair in addr_infos.items()
678 if not ((local_addr and addr_pair[0] is None) or
679 (remote_addr and addr_pair[1] is None))]
680
681 if not addr_pairs_info:
682 raise ValueError('can not get address information')
683
684 exceptions = []
685
686 for ((family, proto),
687 (local_address, remote_address)) in addr_pairs_info:
688 sock = None
689 r_addr = None
690 try:
691 sock = socket.socket(
692 family=family, type=socket.SOCK_DGRAM, proto=proto)
693 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
694 sock.setblocking(False)
695
696 if local_addr:
697 sock.bind(local_address)
698 if remote_addr:
699 yield from self.sock_connect(sock, remote_address)
700 r_addr = remote_address
701 except OSError as exc:
702 if sock is not None:
703 sock.close()
704 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200705 except:
706 if sock is not None:
707 sock.close()
708 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700709 else:
710 break
711 else:
712 raise exceptions[0]
713
714 protocol = protocol_factory()
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200715 waiter = futures.Future(loop=self)
716 transport = self._make_datagram_transport(sock, protocol, r_addr,
717 waiter)
Victor Stinnere912e652014-07-12 03:11:53 +0200718 if self._debug:
719 if local_addr:
720 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
721 "created: (%r, %r)",
722 local_addr, remote_addr, transport, protocol)
723 else:
724 logger.debug("Datagram endpoint remote_addr=%r created: "
725 "(%r, %r)",
726 remote_addr, transport, protocol)
Victor Stinner2596dd02015-01-26 11:02:18 +0100727
728 try:
729 yield from waiter
730 except:
731 transport.close()
732 raise
733
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700734 return transport, protocol
735
Victor Stinnerf951d282014-06-29 00:46:45 +0200736 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700737 def create_server(self, protocol_factory, host=None, port=None,
738 *,
739 family=socket.AF_UNSPEC,
740 flags=socket.AI_PASSIVE,
741 sock=None,
742 backlog=100,
743 ssl=None,
744 reuse_address=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200745 """Create a TCP server bound to host and port.
746
Victor Stinneracdb7822014-07-14 18:33:40 +0200747 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +0200748
749 This method is a coroutine.
750 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700751 if isinstance(ssl, bool):
752 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700753 if host is not None or port is not None:
754 if sock is not None:
755 raise ValueError(
756 'host/port and sock can not be specified at the same time')
757
758 AF_INET6 = getattr(socket, 'AF_INET6', 0)
759 if reuse_address is None:
760 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
761 sockets = []
762 if host == '':
763 host = None
764
765 infos = yield from self.getaddrinfo(
766 host, port, family=family,
767 type=socket.SOCK_STREAM, proto=0, flags=flags)
768 if not infos:
769 raise OSError('getaddrinfo() returned empty list')
770
771 completed = False
772 try:
773 for res in infos:
774 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700775 try:
776 sock = socket.socket(af, socktype, proto)
777 except socket.error:
778 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +0200779 if self._debug:
780 logger.warning('create_server() failed to create '
781 'socket.socket(%r, %r, %r)',
782 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -0700783 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700784 sockets.append(sock)
785 if reuse_address:
786 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
787 True)
788 # Disable IPv4/IPv6 dual stack support (enabled by
789 # default on Linux) which makes a single socket
790 # listen on both address families.
791 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
792 sock.setsockopt(socket.IPPROTO_IPV6,
793 socket.IPV6_V6ONLY,
794 True)
795 try:
796 sock.bind(sa)
797 except OSError as err:
798 raise OSError(err.errno, 'error while attempting '
799 'to bind on address %r: %s'
800 % (sa, err.strerror.lower()))
801 completed = True
802 finally:
803 if not completed:
804 for sock in sockets:
805 sock.close()
806 else:
807 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +0200808 raise ValueError('Neither host/port nor sock were specified')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700809 sockets = [sock]
810
811 server = Server(self, sockets)
812 for sock in sockets:
813 sock.listen(backlog)
814 sock.setblocking(False)
815 self._start_serving(protocol_factory, sock, ssl, server)
Victor Stinnere912e652014-07-12 03:11:53 +0200816 if self._debug:
817 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700818 return server
819
Victor Stinnerf951d282014-06-29 00:46:45 +0200820 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700821 def connect_read_pipe(self, protocol_factory, pipe):
822 protocol = protocol_factory()
823 waiter = futures.Future(loop=self)
824 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +0100825
826 try:
827 yield from waiter
828 except:
829 transport.close()
830 raise
831
Victor Stinneracdb7822014-07-14 18:33:40 +0200832 if self._debug:
833 logger.debug('Read pipe %r connected: (%r, %r)',
834 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700835 return transport, protocol
836
Victor Stinnerf951d282014-06-29 00:46:45 +0200837 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700838 def connect_write_pipe(self, protocol_factory, pipe):
839 protocol = protocol_factory()
840 waiter = futures.Future(loop=self)
841 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
Victor Stinner2596dd02015-01-26 11:02:18 +0100842
843 try:
844 yield from waiter
845 except:
846 transport.close()
847 raise
848
Victor Stinneracdb7822014-07-14 18:33:40 +0200849 if self._debug:
850 logger.debug('Write pipe %r connected: (%r, %r)',
851 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700852 return transport, protocol
853
Victor Stinneracdb7822014-07-14 18:33:40 +0200854 def _log_subprocess(self, msg, stdin, stdout, stderr):
855 info = [msg]
856 if stdin is not None:
857 info.append('stdin=%s' % _format_pipe(stdin))
858 if stdout is not None and stderr == subprocess.STDOUT:
859 info.append('stdout=stderr=%s' % _format_pipe(stdout))
860 else:
861 if stdout is not None:
862 info.append('stdout=%s' % _format_pipe(stdout))
863 if stderr is not None:
864 info.append('stderr=%s' % _format_pipe(stderr))
865 logger.debug(' '.join(info))
866
Victor Stinnerf951d282014-06-29 00:46:45 +0200867 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700868 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
869 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
870 universal_newlines=False, shell=True, bufsize=0,
871 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100872 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800873 raise ValueError("cmd must be a string")
874 if universal_newlines:
875 raise ValueError("universal_newlines must be False")
876 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100877 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800878 if bufsize != 0:
879 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700880 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +0200881 if self._debug:
882 # don't log parameters: they may contain sensitive information
883 # (password) and may be too long
884 debug_log = 'run shell command %r' % cmd
885 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700886 transport = yield from self._make_subprocess_transport(
887 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +0200888 if self._debug:
889 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700890 return transport, protocol
891
Victor Stinnerf951d282014-06-29 00:46:45 +0200892 @coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500893 def subprocess_exec(self, protocol_factory, program, *args,
894 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
895 stderr=subprocess.PIPE, universal_newlines=False,
896 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800897 if universal_newlines:
898 raise ValueError("universal_newlines must be False")
899 if shell:
900 raise ValueError("shell must be False")
901 if bufsize != 0:
902 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100903 popen_args = (program,) + args
904 for arg in popen_args:
905 if not isinstance(arg, (str, bytes)):
906 raise TypeError("program arguments must be "
907 "a bytes or text string, not %s"
908 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700909 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +0200910 if self._debug:
911 # don't log parameters: they may contain sensitive information
912 # (password) and may be too long
913 debug_log = 'execute program %r' % program
914 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700915 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500916 protocol, popen_args, False, stdin, stdout, stderr,
917 bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +0200918 if self._debug:
919 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700920 return transport, protocol
921
Yury Selivanov569efa22014-02-18 18:02:19 -0500922 def set_exception_handler(self, handler):
923 """Set handler as the new event loop exception handler.
924
925 If handler is None, the default exception handler will
926 be set.
927
928 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +0200929 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -0500930 will be a reference to the active event loop, 'context'
931 will be a dict object (see `call_exception_handler()`
932 documentation for details about context).
933 """
934 if handler is not None and not callable(handler):
935 raise TypeError('A callable object or None is expected, '
936 'got {!r}'.format(handler))
937 self._exception_handler = handler
938
939 def default_exception_handler(self, context):
940 """Default exception handler.
941
942 This is called when an exception occurs and no exception
943 handler is set, and can be called by a custom exception
944 handler that wants to defer to the default behavior.
945
Victor Stinneracdb7822014-07-14 18:33:40 +0200946 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -0500947 `call_exception_handler()`.
948 """
949 message = context.get('message')
950 if not message:
951 message = 'Unhandled exception in event loop'
952
953 exception = context.get('exception')
954 if exception is not None:
955 exc_info = (type(exception), exception, exception.__traceback__)
956 else:
957 exc_info = False
958
Victor Stinnerff018e42015-01-28 00:30:40 +0100959 if ('source_traceback' not in context
960 and self._current_handle is not None
Victor Stinner9b524d52015-01-26 11:05:12 +0100961 and self._current_handle._source_traceback):
962 context['handle_traceback'] = self._current_handle._source_traceback
963
Yury Selivanov569efa22014-02-18 18:02:19 -0500964 log_lines = [message]
965 for key in sorted(context):
966 if key in {'message', 'exception'}:
967 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +0200968 value = context[key]
969 if key == 'source_traceback':
970 tb = ''.join(traceback.format_list(value))
971 value = 'Object created at (most recent call last):\n'
972 value += tb.rstrip()
Victor Stinner9b524d52015-01-26 11:05:12 +0100973 elif key == 'handle_traceback':
974 tb = ''.join(traceback.format_list(value))
975 value = 'Handle created at (most recent call last):\n'
976 value += tb.rstrip()
Victor Stinner80f53aa2014-06-27 13:52:20 +0200977 else:
978 value = repr(value)
979 log_lines.append('{}: {}'.format(key, value))
Yury Selivanov569efa22014-02-18 18:02:19 -0500980
981 logger.error('\n'.join(log_lines), exc_info=exc_info)
982
983 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +0200984 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -0500985
Victor Stinneracdb7822014-07-14 18:33:40 +0200986 The context argument is a dict containing the following keys:
987
Yury Selivanov569efa22014-02-18 18:02:19 -0500988 - 'message': Error message;
989 - 'exception' (optional): Exception object;
990 - 'future' (optional): Future instance;
991 - 'handle' (optional): Handle instance;
992 - 'protocol' (optional): Protocol instance;
993 - 'transport' (optional): Transport instance;
994 - 'socket' (optional): Socket instance.
995
Victor Stinneracdb7822014-07-14 18:33:40 +0200996 New keys maybe introduced in the future.
997
998 Note: do not overload this method in an event loop subclass.
999 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -05001000 `set_exception_handler()` method.
1001 """
1002 if self._exception_handler is None:
1003 try:
1004 self.default_exception_handler(context)
1005 except Exception:
1006 # Second protection layer for unexpected errors
1007 # in the default implementation, as well as for subclassed
1008 # event loops with overloaded "default_exception_handler".
1009 logger.error('Exception in default exception handler',
1010 exc_info=True)
1011 else:
1012 try:
1013 self._exception_handler(self, context)
1014 except Exception as exc:
1015 # Exception in the user set custom exception handler.
1016 try:
1017 # Let's try default handler.
1018 self.default_exception_handler({
1019 'message': 'Unhandled error in exception handler',
1020 'exception': exc,
1021 'context': context,
1022 })
1023 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +02001024 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -05001025 # overloaded.
1026 logger.error('Exception in default exception handler '
1027 'while handling an unexpected error '
1028 'in custom exception handler',
1029 exc_info=True)
1030
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001031 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +02001032 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001033 assert isinstance(handle, events.Handle), 'A Handle is required here'
1034 if handle._cancelled:
1035 return
Yury Selivanov592ada92014-09-25 12:07:56 -04001036 assert not isinstance(handle, events.TimerHandle)
1037 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001038
1039 def _add_callback_signalsafe(self, handle):
1040 """Like _add_callback() but called from a signal handler."""
1041 self._add_callback(handle)
1042 self._write_to_self()
1043
Yury Selivanov592ada92014-09-25 12:07:56 -04001044 def _timer_handle_cancelled(self, handle):
1045 """Notification that a TimerHandle has been cancelled."""
1046 if handle._scheduled:
1047 self._timer_cancelled_count += 1
1048
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001049 def _run_once(self):
1050 """Run one full iteration of the event loop.
1051
1052 This calls all currently ready callbacks, polls for I/O,
1053 schedules the resulting callbacks, and finally schedules
1054 'call_later' callbacks.
1055 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001056
Yury Selivanov592ada92014-09-25 12:07:56 -04001057 sched_count = len(self._scheduled)
1058 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1059 self._timer_cancelled_count / sched_count >
1060 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001061 # Remove delayed calls that were cancelled if their number
1062 # is too high
1063 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001064 for handle in self._scheduled:
1065 if handle._cancelled:
1066 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001067 else:
1068 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001069
Victor Stinner68da8fc2014-09-30 18:08:36 +02001070 heapq.heapify(new_scheduled)
1071 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001072 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001073 else:
1074 # Remove delayed calls that were cancelled from head of queue.
1075 while self._scheduled and self._scheduled[0]._cancelled:
1076 self._timer_cancelled_count -= 1
1077 handle = heapq.heappop(self._scheduled)
1078 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001079
1080 timeout = None
1081 if self._ready:
1082 timeout = 0
1083 elif self._scheduled:
1084 # Compute the desired timeout.
1085 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -07001086 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001087
Victor Stinner770e48d2014-07-11 11:58:33 +02001088 if self._debug and timeout != 0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001089 t0 = self.time()
1090 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001091 dt = self.time() - t0
Victor Stinner770e48d2014-07-11 11:58:33 +02001092 if dt >= 1.0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001093 level = logging.INFO
1094 else:
1095 level = logging.DEBUG
Victor Stinner770e48d2014-07-11 11:58:33 +02001096 nevent = len(event_list)
1097 if timeout is None:
1098 logger.log(level, 'poll took %.3f ms: %s events',
1099 dt * 1e3, nevent)
1100 elif nevent:
1101 logger.log(level,
1102 'poll %.3f ms took %.3f ms: %s events',
1103 timeout * 1e3, dt * 1e3, nevent)
1104 elif dt >= 1.0:
1105 logger.log(level,
1106 'poll %.3f ms took %.3f ms: timeout',
1107 timeout * 1e3, dt * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001108 else:
Victor Stinner22463aa2014-01-20 23:56:40 +01001109 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001110 self._process_events(event_list)
1111
1112 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001113 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001114 while self._scheduled:
1115 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001116 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001117 break
1118 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001119 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001120 self._ready.append(handle)
1121
1122 # This is the only place where callbacks are actually *called*.
1123 # All other places just add them to ready.
1124 # Note: We run all currently scheduled callbacks, but not any
1125 # callbacks scheduled by callbacks run this time around --
1126 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001127 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001128 ntodo = len(self._ready)
1129 for i in range(ntodo):
1130 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001131 if handle._cancelled:
1132 continue
1133 if self._debug:
Victor Stinner9b524d52015-01-26 11:05:12 +01001134 try:
1135 self._current_handle = handle
1136 t0 = self.time()
1137 handle._run()
1138 dt = self.time() - t0
1139 if dt >= self.slow_callback_duration:
1140 logger.warning('Executing %s took %.3f seconds',
1141 _format_handle(handle), dt)
1142 finally:
1143 self._current_handle = None
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001144 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001145 handle._run()
1146 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001147
1148 def get_debug(self):
1149 return self._debug
1150
1151 def set_debug(self, enabled):
1152 self._debug = enabled