blob: db132505a8ca2ec3fb08d81504407539d7960ffd [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
Victor Stinner0e6f52a2014-06-20 17:34:15 +020020import inspect
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
24import subprocess
25import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020026import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027import sys
28
Victor Stinnerf951d282014-06-29 00:46:45 +020029from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030from . import events
31from . import futures
32from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020033from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070034from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
36
37__all__ = ['BaseEventLoop', 'Server']
38
39
40# Argument for default thread pool executor creation.
41_MAX_WORKERS = 5
42
43
Victor Stinner0e6f52a2014-06-20 17:34:15 +020044def _format_handle(handle):
45 cb = handle._callback
46 if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
47 # format the task
48 return repr(cb.__self__)
49 else:
50 return str(handle)
51
52
Victor Stinneracdb7822014-07-14 18:33:40 +020053def _format_pipe(fd):
54 if fd == subprocess.PIPE:
55 return '<pipe>'
56 elif fd == subprocess.STDOUT:
57 return '<stdout>'
58 else:
59 return repr(fd)
60
61
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062class _StopError(BaseException):
63 """Raised to stop the event loop."""
64
65
Victor Stinner1b0580b2014-02-13 09:24:37 +010066def _check_resolved_address(sock, address):
67 # Ensure that the address is already resolved to avoid the trap of hanging
68 # the entire event loop when the address requires doing a DNS lookup.
69 family = sock.family
Victor Stinnerd1a727a2014-02-20 16:43:09 +010070 if family == socket.AF_INET:
71 host, port = address
72 elif family == socket.AF_INET6:
Victor Stinner934c8852014-02-20 21:59:38 +010073 host, port = address[:2]
Victor Stinnerd1a727a2014-02-20 16:43:09 +010074 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010075 return
76
Victor Stinner1b0580b2014-02-13 09:24:37 +010077 type_mask = 0
78 if hasattr(socket, 'SOCK_NONBLOCK'):
79 type_mask |= socket.SOCK_NONBLOCK
80 if hasattr(socket, 'SOCK_CLOEXEC'):
81 type_mask |= socket.SOCK_CLOEXEC
Victor Stinneracdb7822014-07-14 18:33:40 +020082 # Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
Victor Stinner1b0580b2014-02-13 09:24:37 +010083 # already resolved.
84 try:
85 socket.getaddrinfo(host, port,
86 family=family,
87 type=(sock.type & ~type_mask),
88 proto=sock.proto,
89 flags=socket.AI_NUMERICHOST)
90 except socket.gaierror as err:
91 raise ValueError("address must be resolved (IP address), got %r: %s"
92 % (address, err))
93
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070094def _raise_stop_error(*args):
95 raise _StopError
96
97
98class Server(events.AbstractServer):
99
100 def __init__(self, loop, sockets):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200101 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700102 self.sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200103 self._active_count = 0
104 self._waiters = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700105
Victor Stinnere912e652014-07-12 03:11:53 +0200106 def __repr__(self):
107 return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
108
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200109 def _attach(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110 assert self.sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200111 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700112
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200113 def _detach(self):
114 assert self._active_count > 0
115 self._active_count -= 1
116 if self._active_count == 0 and self.sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117 self._wakeup()
118
119 def close(self):
120 sockets = self.sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200121 if sockets is None:
122 return
123 self.sockets = None
124 for sock in sockets:
125 self._loop._stop_serving(sock)
126 if self._active_count == 0:
127 self._wakeup()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700128
129 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200130 waiters = self._waiters
131 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700132 for waiter in waiters:
133 if not waiter.done():
134 waiter.set_result(waiter)
135
Victor Stinnerf951d282014-06-29 00:46:45 +0200136 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700137 def wait_closed(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200138 if self.sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700139 return
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200140 waiter = futures.Future(loop=self._loop)
141 self._waiters.append(waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142 yield from waiter
143
144
145class BaseEventLoop(events.AbstractEventLoop):
146
147 def __init__(self):
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200148 self._closed = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149 self._ready = collections.deque()
150 self._scheduled = []
151 self._default_executor = None
152 self._internal_fds = 0
153 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100154 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500155 self._exception_handler = None
Victor Stinner7b7120e2014-06-23 00:12:14 +0200156 self._debug = (not sys.flags.ignore_environment
157 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200158 # In debug mode, if the execution of a callback or a step of a task
159 # exceed this duration in seconds, the slow callback/task is logged.
160 self.slow_callback_duration = 0.1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700161
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200162 def __repr__(self):
163 return ('<%s running=%s closed=%s debug=%s>'
164 % (self.__class__.__name__, self.is_running(),
165 self.is_closed(), self.get_debug()))
166
Victor Stinner896a25a2014-07-08 11:29:25 +0200167 def create_task(self, coro):
168 """Schedule a coroutine object.
169
Victor Stinneracdb7822014-07-14 18:33:40 +0200170 Return a task object.
171 """
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200172 task = tasks.Task(coro, loop=self)
173 if task._source_traceback:
174 del task._source_traceback[-1]
175 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200176
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700177 def _make_socket_transport(self, sock, protocol, waiter=None, *,
178 extra=None, server=None):
179 """Create socket transport."""
180 raise NotImplementedError
181
182 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
183 server_side=False, server_hostname=None,
184 extra=None, server=None):
185 """Create SSL transport."""
186 raise NotImplementedError
187
188 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200189 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700190 """Create datagram transport."""
191 raise NotImplementedError
192
193 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
194 extra=None):
195 """Create read pipe transport."""
196 raise NotImplementedError
197
198 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
199 extra=None):
200 """Create write pipe transport."""
201 raise NotImplementedError
202
Victor Stinnerf951d282014-06-29 00:46:45 +0200203 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204 def _make_subprocess_transport(self, protocol, args, shell,
205 stdin, stdout, stderr, bufsize,
206 extra=None, **kwargs):
207 """Create subprocess transport."""
208 raise NotImplementedError
209
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700210 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200211 """Write a byte to self-pipe, to wake up the event loop.
212
213 This may be called from a different thread.
214
215 The subclass is responsible for implementing the self-pipe.
216 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217 raise NotImplementedError
218
219 def _process_events(self, event_list):
220 """Process selector events."""
221 raise NotImplementedError
222
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200223 def _check_closed(self):
224 if self._closed:
225 raise RuntimeError('Event loop is closed')
226
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227 def run_forever(self):
228 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200229 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230 if self._running:
231 raise RuntimeError('Event loop is running.')
232 self._running = True
233 try:
234 while True:
235 try:
236 self._run_once()
237 except _StopError:
238 break
239 finally:
240 self._running = False
241
242 def run_until_complete(self, future):
243 """Run until the Future is done.
244
245 If the argument is a coroutine, it is wrapped in a Task.
246
Victor Stinneracdb7822014-07-14 18:33:40 +0200247 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700248 with the same coroutine twice -- it would wrap it in two
249 different Tasks and that can't be good.
250
251 Return the Future's result, or raise its exception.
252 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200253 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200254
255 new_task = not isinstance(future, futures.Future)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700256 future = tasks.async(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200257 if new_task:
258 # An exception is raised if the future didn't complete, so there
259 # is no need to log the "destroy pending task" message
260 future._log_destroy_pending = False
261
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700262 future.add_done_callback(_raise_stop_error)
263 self.run_forever()
264 future.remove_done_callback(_raise_stop_error)
265 if not future.done():
266 raise RuntimeError('Event loop stopped before Future completed.')
267
268 return future.result()
269
270 def stop(self):
271 """Stop running the event loop.
272
Victor Stinner5006b1f2014-07-24 11:34:11 +0200273 Every callback scheduled before stop() is called will run. Callbacks
274 scheduled after stop() is called will not run. However, those callbacks
275 will run if run_forever is called again later.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276 """
277 self.call_soon(_raise_stop_error)
278
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200279 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700280 """Close the event loop.
281
282 This clears the queues and shuts down the executor,
283 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200284
285 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700286 """
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200287 if self._running:
Victor Stinneracdb7822014-07-14 18:33:40 +0200288 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200289 if self._closed:
290 return
Victor Stinnere912e652014-07-12 03:11:53 +0200291 if self._debug:
292 logger.debug("Close %r", self)
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200293 self._closed = True
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200294 self._ready.clear()
295 self._scheduled.clear()
296 executor = self._default_executor
297 if executor is not None:
298 self._default_executor = None
299 executor.shutdown(wait=False)
300
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200301 def is_closed(self):
302 """Returns True if the event loop was closed."""
303 return self._closed
304
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700305 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200306 """Returns True if the event loop is running."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307 return self._running
308
309 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200310 """Return the time according to the event loop's clock.
311
312 This is a float expressed in seconds since an epoch, but the
313 epoch, precision, accuracy and drift are unspecified and may
314 differ per event loop.
315 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 return time.monotonic()
317
318 def call_later(self, delay, callback, *args):
319 """Arrange for a callback to be called at a given time.
320
321 Return a Handle: an opaque object with a cancel() method that
322 can be used to cancel the call.
323
324 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200325 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326
327 Each callback will be called exactly once. If two callbacks
328 are scheduled for exactly the same time, it undefined which
329 will be called first.
330
331 Any positional arguments after the callback will be passed to
332 the callback when it is called.
333 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200334 timer = self.call_at(self.time() + delay, callback, *args)
335 if timer._source_traceback:
336 del timer._source_traceback[-1]
337 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338
339 def call_at(self, when, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200340 """Like call_later(), but uses an absolute time.
341
342 Absolute time corresponds to the event loop's time() method.
343 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200344 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100345 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinner93569c22014-03-21 10:00:52 +0100346 if self._debug:
347 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500348 timer = events.TimerHandle(when, callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200349 if timer._source_traceback:
350 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 heapq.heappush(self._scheduled, timer)
352 return timer
353
354 def call_soon(self, callback, *args):
355 """Arrange for a callback to be called as soon as possible.
356
Victor Stinneracdb7822014-07-14 18:33:40 +0200357 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 order in which they are registered. Each callback will be
359 called exactly once.
360
361 Any positional arguments after the callback will be passed to
362 the callback when it is called.
363 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200364 handle = self._call_soon(callback, args, check_loop=True)
365 if handle._source_traceback:
366 del handle._source_traceback[-1]
367 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100368
369 def _call_soon(self, callback, args, check_loop):
Victor Stinnerf951d282014-06-29 00:46:45 +0200370 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100371 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinner93569c22014-03-21 10:00:52 +0100372 if self._debug and check_loop:
373 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500374 handle = events.Handle(callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200375 if handle._source_traceback:
376 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377 self._ready.append(handle)
378 return handle
379
Victor Stinner93569c22014-03-21 10:00:52 +0100380 def _assert_is_current_event_loop(self):
381 """Asserts that this event loop is the current event loop.
382
Victor Stinneracdb7822014-07-14 18:33:40 +0200383 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100384 likely behave incorrectly when the assumption is violated.
385
Victor Stinneracdb7822014-07-14 18:33:40 +0200386 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100387 responsible for checking this condition for performance reasons.
388 """
Victor Stinner751c7c02014-06-23 15:14:13 +0200389 try:
390 current = events.get_event_loop()
391 except AssertionError:
392 return
393 if current is not self:
Victor Stinner93569c22014-03-21 10:00:52 +0100394 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200395 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100396 "than the current one")
397
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 def call_soon_threadsafe(self, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200399 """Like call_soon(), but thread-safe."""
Victor Stinner93569c22014-03-21 10:00:52 +0100400 handle = self._call_soon(callback, args, check_loop=False)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200401 if handle._source_traceback:
402 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403 self._write_to_self()
404 return handle
405
406 def run_in_executor(self, executor, callback, *args):
Victor Stinnerf951d282014-06-29 00:46:45 +0200407 if coroutines.iscoroutinefunction(callback):
Victor Stinneracdb7822014-07-14 18:33:40 +0200408 raise TypeError("Coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 if isinstance(callback, events.Handle):
410 assert not args
411 assert not isinstance(callback, events.TimerHandle)
412 if callback._cancelled:
413 f = futures.Future(loop=self)
414 f.set_result(None)
415 return f
416 callback, args = callback._callback, callback._args
417 if executor is None:
418 executor = self._default_executor
419 if executor is None:
420 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
421 self._default_executor = executor
422 return futures.wrap_future(executor.submit(callback, *args), loop=self)
423
424 def set_default_executor(self, executor):
425 self._default_executor = executor
426
Victor Stinnere912e652014-07-12 03:11:53 +0200427 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
428 msg = ["%s:%r" % (host, port)]
429 if family:
430 msg.append('family=%r' % family)
431 if type:
432 msg.append('type=%r' % type)
433 if proto:
434 msg.append('proto=%r' % proto)
435 if flags:
436 msg.append('flags=%r' % flags)
437 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200438 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200439
440 t0 = self.time()
441 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
442 dt = self.time() - t0
443
Victor Stinneracdb7822014-07-14 18:33:40 +0200444 msg = ('Getting address info %s took %.3f ms: %r'
Victor Stinnere912e652014-07-12 03:11:53 +0200445 % (msg, dt * 1e3, addrinfo))
446 if dt >= self.slow_callback_duration:
447 logger.info(msg)
448 else:
449 logger.debug(msg)
450 return addrinfo
451
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452 def getaddrinfo(self, host, port, *,
453 family=0, type=0, proto=0, flags=0):
Victor Stinnere912e652014-07-12 03:11:53 +0200454 if self._debug:
455 return self.run_in_executor(None, self._getaddrinfo_debug,
456 host, port, family, type, proto, flags)
457 else:
458 return self.run_in_executor(None, socket.getaddrinfo,
459 host, port, family, type, proto, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460
461 def getnameinfo(self, sockaddr, flags=0):
462 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
463
Victor Stinnerf951d282014-06-29 00:46:45 +0200464 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465 def create_connection(self, protocol_factory, host=None, port=None, *,
466 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700467 local_addr=None, server_hostname=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200468 """Connect to a TCP server.
469
470 Create a streaming transport connection to a given Internet host and
471 port: socket family AF_INET or socket.AF_INET6 depending on host (or
472 family if specified), socket type SOCK_STREAM. protocol_factory must be
473 a callable returning a protocol instance.
474
475 This method is a coroutine which will try to establish the connection
476 in the background. When successful, the coroutine returns a
477 (transport, protocol) pair.
478 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700479 if server_hostname is not None and not ssl:
480 raise ValueError('server_hostname is only meaningful with ssl')
481
482 if server_hostname is None and ssl:
483 # Use host as default for server_hostname. It is an error
484 # if host is empty or not set, e.g. when an
485 # already-connected socket was passed or when only a port
486 # is given. To avoid this error, you can pass
487 # server_hostname='' -- this will bypass the hostname
488 # check. (This also means that if host is a numeric
489 # IP/IPv6 address, we will attempt to verify that exact
490 # address; this will probably fail, but it is possible to
491 # create a certificate for a specific IP address, so we
492 # don't judge it here.)
493 if not host:
494 raise ValueError('You must set server_hostname '
495 'when using ssl without a host')
496 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700497
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 if host is not None or port is not None:
499 if sock is not None:
500 raise ValueError(
501 'host/port and sock can not be specified at the same time')
502
503 f1 = self.getaddrinfo(
504 host, port, family=family,
505 type=socket.SOCK_STREAM, proto=proto, flags=flags)
506 fs = [f1]
507 if local_addr is not None:
508 f2 = self.getaddrinfo(
509 *local_addr, family=family,
510 type=socket.SOCK_STREAM, proto=proto, flags=flags)
511 fs.append(f2)
512 else:
513 f2 = None
514
515 yield from tasks.wait(fs, loop=self)
516
517 infos = f1.result()
518 if not infos:
519 raise OSError('getaddrinfo() returned empty list')
520 if f2 is not None:
521 laddr_infos = f2.result()
522 if not laddr_infos:
523 raise OSError('getaddrinfo() returned empty list')
524
525 exceptions = []
526 for family, type, proto, cname, address in infos:
527 try:
528 sock = socket.socket(family=family, type=type, proto=proto)
529 sock.setblocking(False)
530 if f2 is not None:
531 for _, _, _, _, laddr in laddr_infos:
532 try:
533 sock.bind(laddr)
534 break
535 except OSError as exc:
536 exc = OSError(
537 exc.errno, 'error while '
538 'attempting to bind on address '
539 '{!r}: {}'.format(
540 laddr, exc.strerror.lower()))
541 exceptions.append(exc)
542 else:
543 sock.close()
544 sock = None
545 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200546 if self._debug:
547 logger.debug("connect %r to %r", sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700548 yield from self.sock_connect(sock, address)
549 except OSError as exc:
550 if sock is not None:
551 sock.close()
552 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200553 except:
554 if sock is not None:
555 sock.close()
556 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700557 else:
558 break
559 else:
560 if len(exceptions) == 1:
561 raise exceptions[0]
562 else:
563 # If they all have the same str(), raise one.
564 model = str(exceptions[0])
565 if all(str(exc) == model for exc in exceptions):
566 raise exceptions[0]
567 # Raise a combined exception so the user can see all
568 # the various error messages.
569 raise OSError('Multiple exceptions: {}'.format(
570 ', '.join(str(exc) for exc in exceptions)))
571
572 elif sock is None:
573 raise ValueError(
574 'host and port was not specified and no sock specified')
575
576 sock.setblocking(False)
577
Yury Selivanovb057c522014-02-18 12:15:06 -0500578 transport, protocol = yield from self._create_connection_transport(
579 sock, protocol_factory, ssl, server_hostname)
Victor Stinnere912e652014-07-12 03:11:53 +0200580 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +0200581 # Get the socket from the transport because SSL transport closes
582 # the old socket and creates a new SSL socket
583 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +0200584 logger.debug("%r connected to %s:%r: (%r, %r)",
585 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -0500586 return transport, protocol
587
Victor Stinnerf951d282014-06-29 00:46:45 +0200588 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500589 def _create_connection_transport(self, sock, protocol_factory, ssl,
590 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700591 protocol = protocol_factory()
592 waiter = futures.Future(loop=self)
593 if ssl:
594 sslcontext = None if isinstance(ssl, bool) else ssl
595 transport = self._make_ssl_transport(
596 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700597 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700598 else:
599 transport = self._make_socket_transport(sock, protocol, waiter)
600
601 yield from waiter
602 return transport, protocol
603
Victor Stinnerf951d282014-06-29 00:46:45 +0200604 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700605 def create_datagram_endpoint(self, protocol_factory,
606 local_addr=None, remote_addr=None, *,
607 family=0, proto=0, flags=0):
608 """Create datagram connection."""
609 if not (local_addr or remote_addr):
610 if family == 0:
611 raise ValueError('unexpected address family')
612 addr_pairs_info = (((family, proto), (None, None)),)
613 else:
Victor Stinneracdb7822014-07-14 18:33:40 +0200614 # join address by (family, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 addr_infos = collections.OrderedDict()
616 for idx, addr in ((0, local_addr), (1, remote_addr)):
617 if addr is not None:
618 assert isinstance(addr, tuple) and len(addr) == 2, (
619 '2-tuple is expected')
620
621 infos = yield from self.getaddrinfo(
622 *addr, family=family, type=socket.SOCK_DGRAM,
623 proto=proto, flags=flags)
624 if not infos:
625 raise OSError('getaddrinfo() returned empty list')
626
627 for fam, _, pro, _, address in infos:
628 key = (fam, pro)
629 if key not in addr_infos:
630 addr_infos[key] = [None, None]
631 addr_infos[key][idx] = address
632
633 # each addr has to have info for each (family, proto) pair
634 addr_pairs_info = [
635 (key, addr_pair) for key, addr_pair in addr_infos.items()
636 if not ((local_addr and addr_pair[0] is None) or
637 (remote_addr and addr_pair[1] is None))]
638
639 if not addr_pairs_info:
640 raise ValueError('can not get address information')
641
642 exceptions = []
643
644 for ((family, proto),
645 (local_address, remote_address)) in addr_pairs_info:
646 sock = None
647 r_addr = None
648 try:
649 sock = socket.socket(
650 family=family, type=socket.SOCK_DGRAM, proto=proto)
651 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
652 sock.setblocking(False)
653
654 if local_addr:
655 sock.bind(local_address)
656 if remote_addr:
657 yield from self.sock_connect(sock, remote_address)
658 r_addr = remote_address
659 except OSError as exc:
660 if sock is not None:
661 sock.close()
662 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200663 except:
664 if sock is not None:
665 sock.close()
666 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700667 else:
668 break
669 else:
670 raise exceptions[0]
671
672 protocol = protocol_factory()
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200673 waiter = futures.Future(loop=self)
674 transport = self._make_datagram_transport(sock, protocol, r_addr,
675 waiter)
Victor Stinnere912e652014-07-12 03:11:53 +0200676 if self._debug:
677 if local_addr:
678 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
679 "created: (%r, %r)",
680 local_addr, remote_addr, transport, protocol)
681 else:
682 logger.debug("Datagram endpoint remote_addr=%r created: "
683 "(%r, %r)",
684 remote_addr, transport, protocol)
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200685 yield from waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686 return transport, protocol
687
Victor Stinnerf951d282014-06-29 00:46:45 +0200688 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700689 def create_server(self, protocol_factory, host=None, port=None,
690 *,
691 family=socket.AF_UNSPEC,
692 flags=socket.AI_PASSIVE,
693 sock=None,
694 backlog=100,
695 ssl=None,
696 reuse_address=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200697 """Create a TCP server bound to host and port.
698
Victor Stinneracdb7822014-07-14 18:33:40 +0200699 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +0200700
701 This method is a coroutine.
702 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700703 if isinstance(ssl, bool):
704 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700705 if host is not None or port is not None:
706 if sock is not None:
707 raise ValueError(
708 'host/port and sock can not be specified at the same time')
709
710 AF_INET6 = getattr(socket, 'AF_INET6', 0)
711 if reuse_address is None:
712 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
713 sockets = []
714 if host == '':
715 host = None
716
717 infos = yield from self.getaddrinfo(
718 host, port, family=family,
719 type=socket.SOCK_STREAM, proto=0, flags=flags)
720 if not infos:
721 raise OSError('getaddrinfo() returned empty list')
722
723 completed = False
724 try:
725 for res in infos:
726 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700727 try:
728 sock = socket.socket(af, socktype, proto)
729 except socket.error:
730 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +0200731 if self._debug:
732 logger.warning('create_server() failed to create '
733 'socket.socket(%r, %r, %r)',
734 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -0700735 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700736 sockets.append(sock)
737 if reuse_address:
738 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
739 True)
740 # Disable IPv4/IPv6 dual stack support (enabled by
741 # default on Linux) which makes a single socket
742 # listen on both address families.
743 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
744 sock.setsockopt(socket.IPPROTO_IPV6,
745 socket.IPV6_V6ONLY,
746 True)
747 try:
748 sock.bind(sa)
749 except OSError as err:
750 raise OSError(err.errno, 'error while attempting '
751 'to bind on address %r: %s'
752 % (sa, err.strerror.lower()))
753 completed = True
754 finally:
755 if not completed:
756 for sock in sockets:
757 sock.close()
758 else:
759 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +0200760 raise ValueError('Neither host/port nor sock were specified')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700761 sockets = [sock]
762
763 server = Server(self, sockets)
764 for sock in sockets:
765 sock.listen(backlog)
766 sock.setblocking(False)
767 self._start_serving(protocol_factory, sock, ssl, server)
Victor Stinnere912e652014-07-12 03:11:53 +0200768 if self._debug:
769 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700770 return server
771
Victor Stinnerf951d282014-06-29 00:46:45 +0200772 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700773 def connect_read_pipe(self, protocol_factory, pipe):
774 protocol = protocol_factory()
775 waiter = futures.Future(loop=self)
776 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
777 yield from waiter
Victor Stinneracdb7822014-07-14 18:33:40 +0200778 if self._debug:
779 logger.debug('Read pipe %r connected: (%r, %r)',
780 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700781 return transport, protocol
782
Victor Stinnerf951d282014-06-29 00:46:45 +0200783 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700784 def connect_write_pipe(self, protocol_factory, pipe):
785 protocol = protocol_factory()
786 waiter = futures.Future(loop=self)
787 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
788 yield from waiter
Victor Stinneracdb7822014-07-14 18:33:40 +0200789 if self._debug:
790 logger.debug('Write pipe %r connected: (%r, %r)',
791 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700792 return transport, protocol
793
Victor Stinneracdb7822014-07-14 18:33:40 +0200794 def _log_subprocess(self, msg, stdin, stdout, stderr):
795 info = [msg]
796 if stdin is not None:
797 info.append('stdin=%s' % _format_pipe(stdin))
798 if stdout is not None and stderr == subprocess.STDOUT:
799 info.append('stdout=stderr=%s' % _format_pipe(stdout))
800 else:
801 if stdout is not None:
802 info.append('stdout=%s' % _format_pipe(stdout))
803 if stderr is not None:
804 info.append('stderr=%s' % _format_pipe(stderr))
805 logger.debug(' '.join(info))
806
Victor Stinnerf951d282014-06-29 00:46:45 +0200807 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700808 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
809 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
810 universal_newlines=False, shell=True, bufsize=0,
811 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100812 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800813 raise ValueError("cmd must be a string")
814 if universal_newlines:
815 raise ValueError("universal_newlines must be False")
816 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100817 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800818 if bufsize != 0:
819 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700820 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +0200821 if self._debug:
822 # don't log parameters: they may contain sensitive information
823 # (password) and may be too long
824 debug_log = 'run shell command %r' % cmd
825 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700826 transport = yield from self._make_subprocess_transport(
827 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +0200828 if self._debug:
829 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700830 return transport, protocol
831
Victor Stinnerf951d282014-06-29 00:46:45 +0200832 @coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500833 def subprocess_exec(self, protocol_factory, program, *args,
834 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
835 stderr=subprocess.PIPE, universal_newlines=False,
836 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800837 if universal_newlines:
838 raise ValueError("universal_newlines must be False")
839 if shell:
840 raise ValueError("shell must be False")
841 if bufsize != 0:
842 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100843 popen_args = (program,) + args
844 for arg in popen_args:
845 if not isinstance(arg, (str, bytes)):
846 raise TypeError("program arguments must be "
847 "a bytes or text string, not %s"
848 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700849 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +0200850 if self._debug:
851 # don't log parameters: they may contain sensitive information
852 # (password) and may be too long
853 debug_log = 'execute program %r' % program
854 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700855 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500856 protocol, popen_args, False, stdin, stdout, stderr,
857 bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +0200858 if self._debug:
859 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700860 return transport, protocol
861
Yury Selivanov569efa22014-02-18 18:02:19 -0500862 def set_exception_handler(self, handler):
863 """Set handler as the new event loop exception handler.
864
865 If handler is None, the default exception handler will
866 be set.
867
868 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +0200869 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -0500870 will be a reference to the active event loop, 'context'
871 will be a dict object (see `call_exception_handler()`
872 documentation for details about context).
873 """
874 if handler is not None and not callable(handler):
875 raise TypeError('A callable object or None is expected, '
876 'got {!r}'.format(handler))
877 self._exception_handler = handler
878
879 def default_exception_handler(self, context):
880 """Default exception handler.
881
882 This is called when an exception occurs and no exception
883 handler is set, and can be called by a custom exception
884 handler that wants to defer to the default behavior.
885
Victor Stinneracdb7822014-07-14 18:33:40 +0200886 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -0500887 `call_exception_handler()`.
888 """
889 message = context.get('message')
890 if not message:
891 message = 'Unhandled exception in event loop'
892
893 exception = context.get('exception')
894 if exception is not None:
895 exc_info = (type(exception), exception, exception.__traceback__)
896 else:
897 exc_info = False
898
899 log_lines = [message]
900 for key in sorted(context):
901 if key in {'message', 'exception'}:
902 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +0200903 value = context[key]
904 if key == 'source_traceback':
905 tb = ''.join(traceback.format_list(value))
906 value = 'Object created at (most recent call last):\n'
907 value += tb.rstrip()
908 else:
909 value = repr(value)
910 log_lines.append('{}: {}'.format(key, value))
Yury Selivanov569efa22014-02-18 18:02:19 -0500911
912 logger.error('\n'.join(log_lines), exc_info=exc_info)
913
914 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +0200915 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -0500916
Victor Stinneracdb7822014-07-14 18:33:40 +0200917 The context argument is a dict containing the following keys:
918
Yury Selivanov569efa22014-02-18 18:02:19 -0500919 - 'message': Error message;
920 - 'exception' (optional): Exception object;
921 - 'future' (optional): Future instance;
922 - 'handle' (optional): Handle instance;
923 - 'protocol' (optional): Protocol instance;
924 - 'transport' (optional): Transport instance;
925 - 'socket' (optional): Socket instance.
926
Victor Stinneracdb7822014-07-14 18:33:40 +0200927 New keys maybe introduced in the future.
928
929 Note: do not overload this method in an event loop subclass.
930 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -0500931 `set_exception_handler()` method.
932 """
933 if self._exception_handler is None:
934 try:
935 self.default_exception_handler(context)
936 except Exception:
937 # Second protection layer for unexpected errors
938 # in the default implementation, as well as for subclassed
939 # event loops with overloaded "default_exception_handler".
940 logger.error('Exception in default exception handler',
941 exc_info=True)
942 else:
943 try:
944 self._exception_handler(self, context)
945 except Exception as exc:
946 # Exception in the user set custom exception handler.
947 try:
948 # Let's try default handler.
949 self.default_exception_handler({
950 'message': 'Unhandled error in exception handler',
951 'exception': exc,
952 'context': context,
953 })
954 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +0200955 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -0500956 # overloaded.
957 logger.error('Exception in default exception handler '
958 'while handling an unexpected error '
959 'in custom exception handler',
960 exc_info=True)
961
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700962 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +0200963 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700964 assert isinstance(handle, events.Handle), 'A Handle is required here'
965 if handle._cancelled:
966 return
967 if isinstance(handle, events.TimerHandle):
968 heapq.heappush(self._scheduled, handle)
969 else:
970 self._ready.append(handle)
971
972 def _add_callback_signalsafe(self, handle):
973 """Like _add_callback() but called from a signal handler."""
974 self._add_callback(handle)
975 self._write_to_self()
976
977 def _run_once(self):
978 """Run one full iteration of the event loop.
979
980 This calls all currently ready callbacks, polls for I/O,
981 schedules the resulting callbacks, and finally schedules
982 'call_later' callbacks.
983 """
984 # Remove delayed calls that were cancelled from head of queue.
985 while self._scheduled and self._scheduled[0]._cancelled:
986 heapq.heappop(self._scheduled)
987
988 timeout = None
989 if self._ready:
990 timeout = 0
991 elif self._scheduled:
992 # Compute the desired timeout.
993 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -0700994 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700995
Victor Stinner770e48d2014-07-11 11:58:33 +0200996 if self._debug and timeout != 0:
Victor Stinner22463aa2014-01-20 23:56:40 +0100997 t0 = self.time()
998 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200999 dt = self.time() - t0
Victor Stinner770e48d2014-07-11 11:58:33 +02001000 if dt >= 1.0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001001 level = logging.INFO
1002 else:
1003 level = logging.DEBUG
Victor Stinner770e48d2014-07-11 11:58:33 +02001004 nevent = len(event_list)
1005 if timeout is None:
1006 logger.log(level, 'poll took %.3f ms: %s events',
1007 dt * 1e3, nevent)
1008 elif nevent:
1009 logger.log(level,
1010 'poll %.3f ms took %.3f ms: %s events',
1011 timeout * 1e3, dt * 1e3, nevent)
1012 elif dt >= 1.0:
1013 logger.log(level,
1014 'poll %.3f ms took %.3f ms: timeout',
1015 timeout * 1e3, dt * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001016 else:
Victor Stinner22463aa2014-01-20 23:56:40 +01001017 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001018 self._process_events(event_list)
1019
1020 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001021 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001022 while self._scheduled:
1023 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001024 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001025 break
1026 handle = heapq.heappop(self._scheduled)
1027 self._ready.append(handle)
1028
1029 # This is the only place where callbacks are actually *called*.
1030 # All other places just add them to ready.
1031 # Note: We run all currently scheduled callbacks, but not any
1032 # callbacks scheduled by callbacks run this time around --
1033 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001034 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001035 ntodo = len(self._ready)
1036 for i in range(ntodo):
1037 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001038 if handle._cancelled:
1039 continue
1040 if self._debug:
1041 t0 = self.time()
1042 handle._run()
1043 dt = self.time() - t0
1044 if dt >= self.slow_callback_duration:
1045 logger.warning('Executing %s took %.3f seconds',
1046 _format_handle(handle), dt)
1047 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001048 handle._run()
1049 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001050
1051 def get_debug(self):
1052 return self._debug
1053
1054 def set_debug(self, enabled):
1055 self._debug = enabled