blob: d0a337bdf5199f8d8ff099136d0322462600fad9 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
Victor Stinner0e6f52a2014-06-20 17:34:15 +020020import inspect
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
24import subprocess
25import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020026import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027import sys
28
Victor Stinnerf951d282014-06-29 00:46:45 +020029from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030from . import events
31from . import futures
32from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020033from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070034from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
36
37__all__ = ['BaseEventLoop', 'Server']
38
39
40# Argument for default thread pool executor creation.
41_MAX_WORKERS = 5
42
43
Victor Stinner0e6f52a2014-06-20 17:34:15 +020044def _format_handle(handle):
45 cb = handle._callback
46 if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
47 # format the task
48 return repr(cb.__self__)
49 else:
50 return str(handle)
51
52
Victor Stinneracdb7822014-07-14 18:33:40 +020053def _format_pipe(fd):
54 if fd == subprocess.PIPE:
55 return '<pipe>'
56 elif fd == subprocess.STDOUT:
57 return '<stdout>'
58 else:
59 return repr(fd)
60
61
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062class _StopError(BaseException):
63 """Raised to stop the event loop."""
64
65
Victor Stinner1b0580b2014-02-13 09:24:37 +010066def _check_resolved_address(sock, address):
67 # Ensure that the address is already resolved to avoid the trap of hanging
68 # the entire event loop when the address requires doing a DNS lookup.
69 family = sock.family
Victor Stinnerd1a727a2014-02-20 16:43:09 +010070 if family == socket.AF_INET:
71 host, port = address
72 elif family == socket.AF_INET6:
Victor Stinner934c8852014-02-20 21:59:38 +010073 host, port = address[:2]
Victor Stinnerd1a727a2014-02-20 16:43:09 +010074 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010075 return
76
Victor Stinner1b0580b2014-02-13 09:24:37 +010077 type_mask = 0
78 if hasattr(socket, 'SOCK_NONBLOCK'):
79 type_mask |= socket.SOCK_NONBLOCK
80 if hasattr(socket, 'SOCK_CLOEXEC'):
81 type_mask |= socket.SOCK_CLOEXEC
Victor Stinneracdb7822014-07-14 18:33:40 +020082 # Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
Victor Stinner1b0580b2014-02-13 09:24:37 +010083 # already resolved.
84 try:
85 socket.getaddrinfo(host, port,
86 family=family,
87 type=(sock.type & ~type_mask),
88 proto=sock.proto,
89 flags=socket.AI_NUMERICHOST)
90 except socket.gaierror as err:
91 raise ValueError("address must be resolved (IP address), got %r: %s"
92 % (address, err))
93
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070094def _raise_stop_error(*args):
95 raise _StopError
96
97
98class Server(events.AbstractServer):
99
100 def __init__(self, loop, sockets):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200101 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700102 self.sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200103 self._active_count = 0
104 self._waiters = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700105
Victor Stinnere912e652014-07-12 03:11:53 +0200106 def __repr__(self):
107 return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
108
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200109 def _attach(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110 assert self.sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200111 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700112
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200113 def _detach(self):
114 assert self._active_count > 0
115 self._active_count -= 1
116 if self._active_count == 0 and self.sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117 self._wakeup()
118
119 def close(self):
120 sockets = self.sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200121 if sockets is None:
122 return
123 self.sockets = None
124 for sock in sockets:
125 self._loop._stop_serving(sock)
126 if self._active_count == 0:
127 self._wakeup()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700128
129 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200130 waiters = self._waiters
131 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700132 for waiter in waiters:
133 if not waiter.done():
134 waiter.set_result(waiter)
135
Victor Stinnerf951d282014-06-29 00:46:45 +0200136 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700137 def wait_closed(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200138 if self.sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700139 return
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200140 waiter = futures.Future(loop=self._loop)
141 self._waiters.append(waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700142 yield from waiter
143
144
145class BaseEventLoop(events.AbstractEventLoop):
146
147 def __init__(self):
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200148 self._closed = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149 self._ready = collections.deque()
150 self._scheduled = []
151 self._default_executor = None
152 self._internal_fds = 0
153 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100154 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500155 self._exception_handler = None
Victor Stinner7b7120e2014-06-23 00:12:14 +0200156 self._debug = (not sys.flags.ignore_environment
157 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200158 # In debug mode, if the execution of a callback or a step of a task
159 # exceed this duration in seconds, the slow callback/task is logged.
160 self.slow_callback_duration = 0.1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700161
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200162 def __repr__(self):
163 return ('<%s running=%s closed=%s debug=%s>'
164 % (self.__class__.__name__, self.is_running(),
165 self.is_closed(), self.get_debug()))
166
Victor Stinner896a25a2014-07-08 11:29:25 +0200167 def create_task(self, coro):
168 """Schedule a coroutine object.
169
Victor Stinneracdb7822014-07-14 18:33:40 +0200170 Return a task object.
171 """
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200172 task = tasks.Task(coro, loop=self)
173 if task._source_traceback:
174 del task._source_traceback[-1]
175 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200176
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700177 def _make_socket_transport(self, sock, protocol, waiter=None, *,
178 extra=None, server=None):
179 """Create socket transport."""
180 raise NotImplementedError
181
182 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
183 server_side=False, server_hostname=None,
184 extra=None, server=None):
185 """Create SSL transport."""
186 raise NotImplementedError
187
188 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200189 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700190 """Create datagram transport."""
191 raise NotImplementedError
192
193 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
194 extra=None):
195 """Create read pipe transport."""
196 raise NotImplementedError
197
198 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
199 extra=None):
200 """Create write pipe transport."""
201 raise NotImplementedError
202
Victor Stinnerf951d282014-06-29 00:46:45 +0200203 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204 def _make_subprocess_transport(self, protocol, args, shell,
205 stdin, stdout, stderr, bufsize,
206 extra=None, **kwargs):
207 """Create subprocess transport."""
208 raise NotImplementedError
209
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700210 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200211 """Write a byte to self-pipe, to wake up the event loop.
212
213 This may be called from a different thread.
214
215 The subclass is responsible for implementing the self-pipe.
216 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700217 raise NotImplementedError
218
219 def _process_events(self, event_list):
220 """Process selector events."""
221 raise NotImplementedError
222
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200223 def _check_closed(self):
224 if self._closed:
225 raise RuntimeError('Event loop is closed')
226
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227 def run_forever(self):
228 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200229 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700230 if self._running:
231 raise RuntimeError('Event loop is running.')
232 self._running = True
233 try:
234 while True:
235 try:
236 self._run_once()
237 except _StopError:
238 break
239 finally:
240 self._running = False
241
242 def run_until_complete(self, future):
243 """Run until the Future is done.
244
245 If the argument is a coroutine, it is wrapped in a Task.
246
Victor Stinneracdb7822014-07-14 18:33:40 +0200247 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700248 with the same coroutine twice -- it would wrap it in two
249 different Tasks and that can't be good.
250
251 Return the Future's result, or raise its exception.
252 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200253 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200254
255 new_task = not isinstance(future, futures.Future)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700256 future = tasks.async(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200257 if new_task:
258 # An exception is raised if the future didn't complete, so there
259 # is no need to log the "destroy pending task" message
260 future._log_destroy_pending = False
261
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700262 future.add_done_callback(_raise_stop_error)
263 self.run_forever()
264 future.remove_done_callback(_raise_stop_error)
265 if not future.done():
266 raise RuntimeError('Event loop stopped before Future completed.')
267
268 return future.result()
269
270 def stop(self):
271 """Stop running the event loop.
272
Victor Stinner5006b1f2014-07-24 11:34:11 +0200273 Every callback scheduled before stop() is called will run. Callbacks
274 scheduled after stop() is called will not run. However, those callbacks
275 will run if run_forever is called again later.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700276 """
277 self.call_soon(_raise_stop_error)
278
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200279 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700280 """Close the event loop.
281
282 This clears the queues and shuts down the executor,
283 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200284
285 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700286 """
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200287 if self._running:
Victor Stinneracdb7822014-07-14 18:33:40 +0200288 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200289 if self._closed:
290 return
Victor Stinnere912e652014-07-12 03:11:53 +0200291 if self._debug:
292 logger.debug("Close %r", self)
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200293 self._closed = True
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200294 self._ready.clear()
295 self._scheduled.clear()
296 executor = self._default_executor
297 if executor is not None:
298 self._default_executor = None
299 executor.shutdown(wait=False)
300
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200301 def is_closed(self):
302 """Returns True if the event loop was closed."""
303 return self._closed
304
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700305 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200306 """Returns True if the event loop is running."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700307 return self._running
308
309 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200310 """Return the time according to the event loop's clock.
311
312 This is a float expressed in seconds since an epoch, but the
313 epoch, precision, accuracy and drift are unspecified and may
314 differ per event loop.
315 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 return time.monotonic()
317
318 def call_later(self, delay, callback, *args):
319 """Arrange for a callback to be called at a given time.
320
321 Return a Handle: an opaque object with a cancel() method that
322 can be used to cancel the call.
323
324 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200325 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700326
327 Each callback will be called exactly once. If two callbacks
328 are scheduled for exactly the same time, it undefined which
329 will be called first.
330
331 Any positional arguments after the callback will be passed to
332 the callback when it is called.
333 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200334 timer = self.call_at(self.time() + delay, callback, *args)
335 if timer._source_traceback:
336 del timer._source_traceback[-1]
337 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700338
339 def call_at(self, when, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200340 """Like call_later(), but uses an absolute time.
341
342 Absolute time corresponds to the event loop's time() method.
343 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200344 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100345 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinner93569c22014-03-21 10:00:52 +0100346 if self._debug:
347 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500348 timer = events.TimerHandle(when, callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200349 if timer._source_traceback:
350 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700351 heapq.heappush(self._scheduled, timer)
352 return timer
353
354 def call_soon(self, callback, *args):
355 """Arrange for a callback to be called as soon as possible.
356
Victor Stinneracdb7822014-07-14 18:33:40 +0200357 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700358 order in which they are registered. Each callback will be
359 called exactly once.
360
361 Any positional arguments after the callback will be passed to
362 the callback when it is called.
363 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200364 handle = self._call_soon(callback, args, check_loop=True)
365 if handle._source_traceback:
366 del handle._source_traceback[-1]
367 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100368
369 def _call_soon(self, callback, args, check_loop):
Victor Stinnerf951d282014-06-29 00:46:45 +0200370 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100371 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinner93569c22014-03-21 10:00:52 +0100372 if self._debug and check_loop:
373 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500374 handle = events.Handle(callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200375 if handle._source_traceback:
376 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377 self._ready.append(handle)
378 return handle
379
Victor Stinner93569c22014-03-21 10:00:52 +0100380 def _assert_is_current_event_loop(self):
381 """Asserts that this event loop is the current event loop.
382
Victor Stinneracdb7822014-07-14 18:33:40 +0200383 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100384 likely behave incorrectly when the assumption is violated.
385
Victor Stinneracdb7822014-07-14 18:33:40 +0200386 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100387 responsible for checking this condition for performance reasons.
388 """
Victor Stinner751c7c02014-06-23 15:14:13 +0200389 try:
390 current = events.get_event_loop()
391 except AssertionError:
392 return
393 if current is not self:
Victor Stinner93569c22014-03-21 10:00:52 +0100394 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200395 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100396 "than the current one")
397
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700398 def call_soon_threadsafe(self, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200399 """Like call_soon(), but thread-safe."""
Victor Stinner93569c22014-03-21 10:00:52 +0100400 handle = self._call_soon(callback, args, check_loop=False)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200401 if handle._source_traceback:
402 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700403 self._write_to_self()
404 return handle
405
406 def run_in_executor(self, executor, callback, *args):
Victor Stinnerf951d282014-06-29 00:46:45 +0200407 if coroutines.iscoroutinefunction(callback):
Victor Stinneracdb7822014-07-14 18:33:40 +0200408 raise TypeError("Coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700409 if isinstance(callback, events.Handle):
410 assert not args
411 assert not isinstance(callback, events.TimerHandle)
412 if callback._cancelled:
413 f = futures.Future(loop=self)
414 f.set_result(None)
415 return f
416 callback, args = callback._callback, callback._args
417 if executor is None:
418 executor = self._default_executor
419 if executor is None:
420 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
421 self._default_executor = executor
422 return futures.wrap_future(executor.submit(callback, *args), loop=self)
423
424 def set_default_executor(self, executor):
425 self._default_executor = executor
426
Victor Stinnere912e652014-07-12 03:11:53 +0200427 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
428 msg = ["%s:%r" % (host, port)]
429 if family:
430 msg.append('family=%r' % family)
431 if type:
432 msg.append('type=%r' % type)
433 if proto:
434 msg.append('proto=%r' % proto)
435 if flags:
436 msg.append('flags=%r' % flags)
437 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200438 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200439
440 t0 = self.time()
441 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
442 dt = self.time() - t0
443
Victor Stinneracdb7822014-07-14 18:33:40 +0200444 msg = ('Getting address info %s took %.3f ms: %r'
Victor Stinnere912e652014-07-12 03:11:53 +0200445 % (msg, dt * 1e3, addrinfo))
446 if dt >= self.slow_callback_duration:
447 logger.info(msg)
448 else:
449 logger.debug(msg)
450 return addrinfo
451
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452 def getaddrinfo(self, host, port, *,
453 family=0, type=0, proto=0, flags=0):
Victor Stinnere912e652014-07-12 03:11:53 +0200454 if self._debug:
455 return self.run_in_executor(None, self._getaddrinfo_debug,
456 host, port, family, type, proto, flags)
457 else:
458 return self.run_in_executor(None, socket.getaddrinfo,
459 host, port, family, type, proto, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460
461 def getnameinfo(self, sockaddr, flags=0):
462 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
463
Victor Stinnerf951d282014-06-29 00:46:45 +0200464 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700465 def create_connection(self, protocol_factory, host=None, port=None, *,
466 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700467 local_addr=None, server_hostname=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200468 """Connect to a TCP server.
469
470 Create a streaming transport connection to a given Internet host and
471 port: socket family AF_INET or socket.AF_INET6 depending on host (or
472 family if specified), socket type SOCK_STREAM. protocol_factory must be
473 a callable returning a protocol instance.
474
475 This method is a coroutine which will try to establish the connection
476 in the background. When successful, the coroutine returns a
477 (transport, protocol) pair.
478 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700479 if server_hostname is not None and not ssl:
480 raise ValueError('server_hostname is only meaningful with ssl')
481
482 if server_hostname is None and ssl:
483 # Use host as default for server_hostname. It is an error
484 # if host is empty or not set, e.g. when an
485 # already-connected socket was passed or when only a port
486 # is given. To avoid this error, you can pass
487 # server_hostname='' -- this will bypass the hostname
488 # check. (This also means that if host is a numeric
489 # IP/IPv6 address, we will attempt to verify that exact
490 # address; this will probably fail, but it is possible to
491 # create a certificate for a specific IP address, so we
492 # don't judge it here.)
493 if not host:
494 raise ValueError('You must set server_hostname '
495 'when using ssl without a host')
496 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700497
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 if host is not None or port is not None:
499 if sock is not None:
500 raise ValueError(
501 'host/port and sock can not be specified at the same time')
502
503 f1 = self.getaddrinfo(
504 host, port, family=family,
505 type=socket.SOCK_STREAM, proto=proto, flags=flags)
506 fs = [f1]
507 if local_addr is not None:
508 f2 = self.getaddrinfo(
509 *local_addr, family=family,
510 type=socket.SOCK_STREAM, proto=proto, flags=flags)
511 fs.append(f2)
512 else:
513 f2 = None
514
515 yield from tasks.wait(fs, loop=self)
516
517 infos = f1.result()
518 if not infos:
519 raise OSError('getaddrinfo() returned empty list')
520 if f2 is not None:
521 laddr_infos = f2.result()
522 if not laddr_infos:
523 raise OSError('getaddrinfo() returned empty list')
524
525 exceptions = []
526 for family, type, proto, cname, address in infos:
527 try:
528 sock = socket.socket(family=family, type=type, proto=proto)
529 sock.setblocking(False)
530 if f2 is not None:
531 for _, _, _, _, laddr in laddr_infos:
532 try:
533 sock.bind(laddr)
534 break
535 except OSError as exc:
536 exc = OSError(
537 exc.errno, 'error while '
538 'attempting to bind on address '
539 '{!r}: {}'.format(
540 laddr, exc.strerror.lower()))
541 exceptions.append(exc)
542 else:
543 sock.close()
544 sock = None
545 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200546 if self._debug:
547 logger.debug("connect %r to %r", sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700548 yield from self.sock_connect(sock, address)
549 except OSError as exc:
550 if sock is not None:
551 sock.close()
552 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200553 except:
554 if sock is not None:
555 sock.close()
556 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700557 else:
558 break
559 else:
560 if len(exceptions) == 1:
561 raise exceptions[0]
562 else:
563 # If they all have the same str(), raise one.
564 model = str(exceptions[0])
565 if all(str(exc) == model for exc in exceptions):
566 raise exceptions[0]
567 # Raise a combined exception so the user can see all
568 # the various error messages.
569 raise OSError('Multiple exceptions: {}'.format(
570 ', '.join(str(exc) for exc in exceptions)))
571
572 elif sock is None:
573 raise ValueError(
574 'host and port was not specified and no sock specified')
575
576 sock.setblocking(False)
577
Yury Selivanovb057c522014-02-18 12:15:06 -0500578 transport, protocol = yield from self._create_connection_transport(
579 sock, protocol_factory, ssl, server_hostname)
Victor Stinnere912e652014-07-12 03:11:53 +0200580 if self._debug:
Victor Stinneracdb7822014-07-14 18:33:40 +0200581 logger.debug("%r connected to %s:%r: (%r, %r)",
582 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -0500583 return transport, protocol
584
Victor Stinnerf951d282014-06-29 00:46:45 +0200585 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500586 def _create_connection_transport(self, sock, protocol_factory, ssl,
587 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700588 protocol = protocol_factory()
589 waiter = futures.Future(loop=self)
590 if ssl:
591 sslcontext = None if isinstance(ssl, bool) else ssl
592 transport = self._make_ssl_transport(
593 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700594 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700595 else:
596 transport = self._make_socket_transport(sock, protocol, waiter)
597
598 yield from waiter
599 return transport, protocol
600
Victor Stinnerf951d282014-06-29 00:46:45 +0200601 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700602 def create_datagram_endpoint(self, protocol_factory,
603 local_addr=None, remote_addr=None, *,
604 family=0, proto=0, flags=0):
605 """Create datagram connection."""
606 if not (local_addr or remote_addr):
607 if family == 0:
608 raise ValueError('unexpected address family')
609 addr_pairs_info = (((family, proto), (None, None)),)
610 else:
Victor Stinneracdb7822014-07-14 18:33:40 +0200611 # join address by (family, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700612 addr_infos = collections.OrderedDict()
613 for idx, addr in ((0, local_addr), (1, remote_addr)):
614 if addr is not None:
615 assert isinstance(addr, tuple) and len(addr) == 2, (
616 '2-tuple is expected')
617
618 infos = yield from self.getaddrinfo(
619 *addr, family=family, type=socket.SOCK_DGRAM,
620 proto=proto, flags=flags)
621 if not infos:
622 raise OSError('getaddrinfo() returned empty list')
623
624 for fam, _, pro, _, address in infos:
625 key = (fam, pro)
626 if key not in addr_infos:
627 addr_infos[key] = [None, None]
628 addr_infos[key][idx] = address
629
630 # each addr has to have info for each (family, proto) pair
631 addr_pairs_info = [
632 (key, addr_pair) for key, addr_pair in addr_infos.items()
633 if not ((local_addr and addr_pair[0] is None) or
634 (remote_addr and addr_pair[1] is None))]
635
636 if not addr_pairs_info:
637 raise ValueError('can not get address information')
638
639 exceptions = []
640
641 for ((family, proto),
642 (local_address, remote_address)) in addr_pairs_info:
643 sock = None
644 r_addr = None
645 try:
646 sock = socket.socket(
647 family=family, type=socket.SOCK_DGRAM, proto=proto)
648 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
649 sock.setblocking(False)
650
651 if local_addr:
652 sock.bind(local_address)
653 if remote_addr:
654 yield from self.sock_connect(sock, remote_address)
655 r_addr = remote_address
656 except OSError as exc:
657 if sock is not None:
658 sock.close()
659 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200660 except:
661 if sock is not None:
662 sock.close()
663 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700664 else:
665 break
666 else:
667 raise exceptions[0]
668
669 protocol = protocol_factory()
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200670 waiter = futures.Future(loop=self)
671 transport = self._make_datagram_transport(sock, protocol, r_addr,
672 waiter)
Victor Stinnere912e652014-07-12 03:11:53 +0200673 if self._debug:
674 if local_addr:
675 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
676 "created: (%r, %r)",
677 local_addr, remote_addr, transport, protocol)
678 else:
679 logger.debug("Datagram endpoint remote_addr=%r created: "
680 "(%r, %r)",
681 remote_addr, transport, protocol)
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200682 yield from waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700683 return transport, protocol
684
Victor Stinnerf951d282014-06-29 00:46:45 +0200685 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686 def create_server(self, protocol_factory, host=None, port=None,
687 *,
688 family=socket.AF_UNSPEC,
689 flags=socket.AI_PASSIVE,
690 sock=None,
691 backlog=100,
692 ssl=None,
693 reuse_address=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200694 """Create a TCP server bound to host and port.
695
Victor Stinneracdb7822014-07-14 18:33:40 +0200696 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +0200697
698 This method is a coroutine.
699 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700700 if isinstance(ssl, bool):
701 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700702 if host is not None or port is not None:
703 if sock is not None:
704 raise ValueError(
705 'host/port and sock can not be specified at the same time')
706
707 AF_INET6 = getattr(socket, 'AF_INET6', 0)
708 if reuse_address is None:
709 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
710 sockets = []
711 if host == '':
712 host = None
713
714 infos = yield from self.getaddrinfo(
715 host, port, family=family,
716 type=socket.SOCK_STREAM, proto=0, flags=flags)
717 if not infos:
718 raise OSError('getaddrinfo() returned empty list')
719
720 completed = False
721 try:
722 for res in infos:
723 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700724 try:
725 sock = socket.socket(af, socktype, proto)
726 except socket.error:
727 # Assume it's a bad family/type/protocol combination.
728 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700729 sockets.append(sock)
730 if reuse_address:
731 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
732 True)
733 # Disable IPv4/IPv6 dual stack support (enabled by
734 # default on Linux) which makes a single socket
735 # listen on both address families.
736 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
737 sock.setsockopt(socket.IPPROTO_IPV6,
738 socket.IPV6_V6ONLY,
739 True)
740 try:
741 sock.bind(sa)
742 except OSError as err:
743 raise OSError(err.errno, 'error while attempting '
744 'to bind on address %r: %s'
745 % (sa, err.strerror.lower()))
746 completed = True
747 finally:
748 if not completed:
749 for sock in sockets:
750 sock.close()
751 else:
752 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +0200753 raise ValueError('Neither host/port nor sock were specified')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700754 sockets = [sock]
755
756 server = Server(self, sockets)
757 for sock in sockets:
758 sock.listen(backlog)
759 sock.setblocking(False)
760 self._start_serving(protocol_factory, sock, ssl, server)
Victor Stinnere912e652014-07-12 03:11:53 +0200761 if self._debug:
762 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700763 return server
764
Victor Stinnerf951d282014-06-29 00:46:45 +0200765 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700766 def connect_read_pipe(self, protocol_factory, pipe):
767 protocol = protocol_factory()
768 waiter = futures.Future(loop=self)
769 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
770 yield from waiter
Victor Stinneracdb7822014-07-14 18:33:40 +0200771 if self._debug:
772 logger.debug('Read pipe %r connected: (%r, %r)',
773 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700774 return transport, protocol
775
Victor Stinnerf951d282014-06-29 00:46:45 +0200776 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700777 def connect_write_pipe(self, protocol_factory, pipe):
778 protocol = protocol_factory()
779 waiter = futures.Future(loop=self)
780 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
781 yield from waiter
Victor Stinneracdb7822014-07-14 18:33:40 +0200782 if self._debug:
783 logger.debug('Write pipe %r connected: (%r, %r)',
784 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700785 return transport, protocol
786
Victor Stinneracdb7822014-07-14 18:33:40 +0200787 def _log_subprocess(self, msg, stdin, stdout, stderr):
788 info = [msg]
789 if stdin is not None:
790 info.append('stdin=%s' % _format_pipe(stdin))
791 if stdout is not None and stderr == subprocess.STDOUT:
792 info.append('stdout=stderr=%s' % _format_pipe(stdout))
793 else:
794 if stdout is not None:
795 info.append('stdout=%s' % _format_pipe(stdout))
796 if stderr is not None:
797 info.append('stderr=%s' % _format_pipe(stderr))
798 logger.debug(' '.join(info))
799
Victor Stinnerf951d282014-06-29 00:46:45 +0200800 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700801 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
802 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
803 universal_newlines=False, shell=True, bufsize=0,
804 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100805 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800806 raise ValueError("cmd must be a string")
807 if universal_newlines:
808 raise ValueError("universal_newlines must be False")
809 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100810 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800811 if bufsize != 0:
812 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700813 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +0200814 if self._debug:
815 # don't log parameters: they may contain sensitive information
816 # (password) and may be too long
817 debug_log = 'run shell command %r' % cmd
818 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700819 transport = yield from self._make_subprocess_transport(
820 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +0200821 if self._debug:
822 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700823 return transport, protocol
824
Victor Stinnerf951d282014-06-29 00:46:45 +0200825 @coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500826 def subprocess_exec(self, protocol_factory, program, *args,
827 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
828 stderr=subprocess.PIPE, universal_newlines=False,
829 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800830 if universal_newlines:
831 raise ValueError("universal_newlines must be False")
832 if shell:
833 raise ValueError("shell must be False")
834 if bufsize != 0:
835 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100836 popen_args = (program,) + args
837 for arg in popen_args:
838 if not isinstance(arg, (str, bytes)):
839 raise TypeError("program arguments must be "
840 "a bytes or text string, not %s"
841 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700842 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +0200843 if self._debug:
844 # don't log parameters: they may contain sensitive information
845 # (password) and may be too long
846 debug_log = 'execute program %r' % program
847 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700848 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500849 protocol, popen_args, False, stdin, stdout, stderr,
850 bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +0200851 if self._debug:
852 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700853 return transport, protocol
854
Yury Selivanov569efa22014-02-18 18:02:19 -0500855 def set_exception_handler(self, handler):
856 """Set handler as the new event loop exception handler.
857
858 If handler is None, the default exception handler will
859 be set.
860
861 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +0200862 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -0500863 will be a reference to the active event loop, 'context'
864 will be a dict object (see `call_exception_handler()`
865 documentation for details about context).
866 """
867 if handler is not None and not callable(handler):
868 raise TypeError('A callable object or None is expected, '
869 'got {!r}'.format(handler))
870 self._exception_handler = handler
871
872 def default_exception_handler(self, context):
873 """Default exception handler.
874
875 This is called when an exception occurs and no exception
876 handler is set, and can be called by a custom exception
877 handler that wants to defer to the default behavior.
878
Victor Stinneracdb7822014-07-14 18:33:40 +0200879 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -0500880 `call_exception_handler()`.
881 """
882 message = context.get('message')
883 if not message:
884 message = 'Unhandled exception in event loop'
885
886 exception = context.get('exception')
887 if exception is not None:
888 exc_info = (type(exception), exception, exception.__traceback__)
889 else:
890 exc_info = False
891
892 log_lines = [message]
893 for key in sorted(context):
894 if key in {'message', 'exception'}:
895 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +0200896 value = context[key]
897 if key == 'source_traceback':
898 tb = ''.join(traceback.format_list(value))
899 value = 'Object created at (most recent call last):\n'
900 value += tb.rstrip()
901 else:
902 value = repr(value)
903 log_lines.append('{}: {}'.format(key, value))
Yury Selivanov569efa22014-02-18 18:02:19 -0500904
905 logger.error('\n'.join(log_lines), exc_info=exc_info)
906
907 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +0200908 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -0500909
Victor Stinneracdb7822014-07-14 18:33:40 +0200910 The context argument is a dict containing the following keys:
911
Yury Selivanov569efa22014-02-18 18:02:19 -0500912 - 'message': Error message;
913 - 'exception' (optional): Exception object;
914 - 'future' (optional): Future instance;
915 - 'handle' (optional): Handle instance;
916 - 'protocol' (optional): Protocol instance;
917 - 'transport' (optional): Transport instance;
918 - 'socket' (optional): Socket instance.
919
Victor Stinneracdb7822014-07-14 18:33:40 +0200920 New keys maybe introduced in the future.
921
922 Note: do not overload this method in an event loop subclass.
923 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -0500924 `set_exception_handler()` method.
925 """
926 if self._exception_handler is None:
927 try:
928 self.default_exception_handler(context)
929 except Exception:
930 # Second protection layer for unexpected errors
931 # in the default implementation, as well as for subclassed
932 # event loops with overloaded "default_exception_handler".
933 logger.error('Exception in default exception handler',
934 exc_info=True)
935 else:
936 try:
937 self._exception_handler(self, context)
938 except Exception as exc:
939 # Exception in the user set custom exception handler.
940 try:
941 # Let's try default handler.
942 self.default_exception_handler({
943 'message': 'Unhandled error in exception handler',
944 'exception': exc,
945 'context': context,
946 })
947 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +0200948 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -0500949 # overloaded.
950 logger.error('Exception in default exception handler '
951 'while handling an unexpected error '
952 'in custom exception handler',
953 exc_info=True)
954
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700955 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +0200956 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700957 assert isinstance(handle, events.Handle), 'A Handle is required here'
958 if handle._cancelled:
959 return
960 if isinstance(handle, events.TimerHandle):
961 heapq.heappush(self._scheduled, handle)
962 else:
963 self._ready.append(handle)
964
965 def _add_callback_signalsafe(self, handle):
966 """Like _add_callback() but called from a signal handler."""
967 self._add_callback(handle)
968 self._write_to_self()
969
970 def _run_once(self):
971 """Run one full iteration of the event loop.
972
973 This calls all currently ready callbacks, polls for I/O,
974 schedules the resulting callbacks, and finally schedules
975 'call_later' callbacks.
976 """
977 # Remove delayed calls that were cancelled from head of queue.
978 while self._scheduled and self._scheduled[0]._cancelled:
979 heapq.heappop(self._scheduled)
980
981 timeout = None
982 if self._ready:
983 timeout = 0
984 elif self._scheduled:
985 # Compute the desired timeout.
986 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -0700987 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700988
Victor Stinner770e48d2014-07-11 11:58:33 +0200989 if self._debug and timeout != 0:
Victor Stinner22463aa2014-01-20 23:56:40 +0100990 t0 = self.time()
991 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200992 dt = self.time() - t0
Victor Stinner770e48d2014-07-11 11:58:33 +0200993 if dt >= 1.0:
Victor Stinner22463aa2014-01-20 23:56:40 +0100994 level = logging.INFO
995 else:
996 level = logging.DEBUG
Victor Stinner770e48d2014-07-11 11:58:33 +0200997 nevent = len(event_list)
998 if timeout is None:
999 logger.log(level, 'poll took %.3f ms: %s events',
1000 dt * 1e3, nevent)
1001 elif nevent:
1002 logger.log(level,
1003 'poll %.3f ms took %.3f ms: %s events',
1004 timeout * 1e3, dt * 1e3, nevent)
1005 elif dt >= 1.0:
1006 logger.log(level,
1007 'poll %.3f ms took %.3f ms: timeout',
1008 timeout * 1e3, dt * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001009 else:
Victor Stinner22463aa2014-01-20 23:56:40 +01001010 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001011 self._process_events(event_list)
1012
1013 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001014 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001015 while self._scheduled:
1016 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001017 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001018 break
1019 handle = heapq.heappop(self._scheduled)
1020 self._ready.append(handle)
1021
1022 # This is the only place where callbacks are actually *called*.
1023 # All other places just add them to ready.
1024 # Note: We run all currently scheduled callbacks, but not any
1025 # callbacks scheduled by callbacks run this time around --
1026 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001027 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001028 ntodo = len(self._ready)
1029 for i in range(ntodo):
1030 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001031 if handle._cancelled:
1032 continue
1033 if self._debug:
1034 t0 = self.time()
1035 handle._run()
1036 dt = self.time() - t0
1037 if dt >= self.slow_callback_duration:
1038 logger.warning('Executing %s took %.3f seconds',
1039 _format_handle(handle), dt)
1040 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001041 handle._run()
1042 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001043
1044 def get_debug(self):
1045 return self._debug
1046
1047 def set_debug(self, enabled):
1048 self._debug = enabled