blob: b6b712393bce56031686dcafe01665006f363b72 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
Victor Stinner0e6f52a2014-06-20 17:34:15 +020020import inspect
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
24import subprocess
25import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020026import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027import sys
28
Victor Stinnerf951d282014-06-29 00:46:45 +020029from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030from . import events
31from . import futures
32from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020033from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070034from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
36
37__all__ = ['BaseEventLoop', 'Server']
38
39
40# Argument for default thread pool executor creation.
41_MAX_WORKERS = 5
42
Yury Selivanov592ada92014-09-25 12:07:56 -040043# Minimum number of _scheduled timer handles before cleanup of
44# cancelled handles is performed.
45_MIN_SCHEDULED_TIMER_HANDLES = 100
46
47# Minimum fraction of _scheduled timer handles that are cancelled
48# before cleanup of cancelled handles is performed.
49_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
Victor Stinner0e6f52a2014-06-20 17:34:15 +020051def _format_handle(handle):
52 cb = handle._callback
53 if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
54 # format the task
55 return repr(cb.__self__)
56 else:
57 return str(handle)
58
59
Victor Stinneracdb7822014-07-14 18:33:40 +020060def _format_pipe(fd):
61 if fd == subprocess.PIPE:
62 return '<pipe>'
63 elif fd == subprocess.STDOUT:
64 return '<stdout>'
65 else:
66 return repr(fd)
67
68
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069class _StopError(BaseException):
70 """Raised to stop the event loop."""
71
72
Victor Stinner1b0580b2014-02-13 09:24:37 +010073def _check_resolved_address(sock, address):
74 # Ensure that the address is already resolved to avoid the trap of hanging
75 # the entire event loop when the address requires doing a DNS lookup.
76 family = sock.family
Victor Stinnerd1a727a2014-02-20 16:43:09 +010077 if family == socket.AF_INET:
78 host, port = address
79 elif family == socket.AF_INET6:
Victor Stinner934c8852014-02-20 21:59:38 +010080 host, port = address[:2]
Victor Stinnerd1a727a2014-02-20 16:43:09 +010081 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010082 return
83
Victor Stinner1b0580b2014-02-13 09:24:37 +010084 type_mask = 0
85 if hasattr(socket, 'SOCK_NONBLOCK'):
86 type_mask |= socket.SOCK_NONBLOCK
87 if hasattr(socket, 'SOCK_CLOEXEC'):
88 type_mask |= socket.SOCK_CLOEXEC
Victor Stinneracdb7822014-07-14 18:33:40 +020089 # Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
Victor Stinner1b0580b2014-02-13 09:24:37 +010090 # already resolved.
91 try:
92 socket.getaddrinfo(host, port,
93 family=family,
94 type=(sock.type & ~type_mask),
95 proto=sock.proto,
96 flags=socket.AI_NUMERICHOST)
97 except socket.gaierror as err:
98 raise ValueError("address must be resolved (IP address), got %r: %s"
99 % (address, err))
100
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700101def _raise_stop_error(*args):
102 raise _StopError
103
104
105class Server(events.AbstractServer):
106
107 def __init__(self, loop, sockets):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200108 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700109 self.sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200110 self._active_count = 0
111 self._waiters = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700112
Victor Stinnere912e652014-07-12 03:11:53 +0200113 def __repr__(self):
114 return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
115
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200116 def _attach(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117 assert self.sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200118 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200120 def _detach(self):
121 assert self._active_count > 0
122 self._active_count -= 1
123 if self._active_count == 0 and self.sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124 self._wakeup()
125
126 def close(self):
127 sockets = self.sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200128 if sockets is None:
129 return
130 self.sockets = None
131 for sock in sockets:
132 self._loop._stop_serving(sock)
133 if self._active_count == 0:
134 self._wakeup()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700135
136 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200137 waiters = self._waiters
138 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700139 for waiter in waiters:
140 if not waiter.done():
141 waiter.set_result(waiter)
142
Victor Stinnerf951d282014-06-29 00:46:45 +0200143 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700144 def wait_closed(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200145 if self.sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146 return
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200147 waiter = futures.Future(loop=self._loop)
148 self._waiters.append(waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149 yield from waiter
150
151
152class BaseEventLoop(events.AbstractEventLoop):
153
154 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400155 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200156 self._closed = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700157 self._ready = collections.deque()
158 self._scheduled = []
159 self._default_executor = None
160 self._internal_fds = 0
161 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100162 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500163 self._exception_handler = None
Victor Stinner7b7120e2014-06-23 00:12:14 +0200164 self._debug = (not sys.flags.ignore_environment
165 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200166 # In debug mode, if the execution of a callback or a step of a task
167 # exceed this duration in seconds, the slow callback/task is logged.
168 self.slow_callback_duration = 0.1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700169
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200170 def __repr__(self):
171 return ('<%s running=%s closed=%s debug=%s>'
172 % (self.__class__.__name__, self.is_running(),
173 self.is_closed(), self.get_debug()))
174
Victor Stinner896a25a2014-07-08 11:29:25 +0200175 def create_task(self, coro):
176 """Schedule a coroutine object.
177
Victor Stinneracdb7822014-07-14 18:33:40 +0200178 Return a task object.
179 """
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200180 task = tasks.Task(coro, loop=self)
181 if task._source_traceback:
182 del task._source_traceback[-1]
183 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200184
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185 def _make_socket_transport(self, sock, protocol, waiter=None, *,
186 extra=None, server=None):
187 """Create socket transport."""
188 raise NotImplementedError
189
190 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
191 server_side=False, server_hostname=None,
192 extra=None, server=None):
193 """Create SSL transport."""
194 raise NotImplementedError
195
196 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200197 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700198 """Create datagram transport."""
199 raise NotImplementedError
200
201 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
202 extra=None):
203 """Create read pipe transport."""
204 raise NotImplementedError
205
206 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
207 extra=None):
208 """Create write pipe transport."""
209 raise NotImplementedError
210
Victor Stinnerf951d282014-06-29 00:46:45 +0200211 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700212 def _make_subprocess_transport(self, protocol, args, shell,
213 stdin, stdout, stderr, bufsize,
214 extra=None, **kwargs):
215 """Create subprocess transport."""
216 raise NotImplementedError
217
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200219 """Write a byte to self-pipe, to wake up the event loop.
220
221 This may be called from a different thread.
222
223 The subclass is responsible for implementing the self-pipe.
224 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700225 raise NotImplementedError
226
227 def _process_events(self, event_list):
228 """Process selector events."""
229 raise NotImplementedError
230
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200231 def _check_closed(self):
232 if self._closed:
233 raise RuntimeError('Event loop is closed')
234
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235 def run_forever(self):
236 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200237 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700238 if self._running:
239 raise RuntimeError('Event loop is running.')
240 self._running = True
241 try:
242 while True:
243 try:
244 self._run_once()
245 except _StopError:
246 break
247 finally:
248 self._running = False
249
250 def run_until_complete(self, future):
251 """Run until the Future is done.
252
253 If the argument is a coroutine, it is wrapped in a Task.
254
Victor Stinneracdb7822014-07-14 18:33:40 +0200255 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700256 with the same coroutine twice -- it would wrap it in two
257 different Tasks and that can't be good.
258
259 Return the Future's result, or raise its exception.
260 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200261 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200262
263 new_task = not isinstance(future, futures.Future)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264 future = tasks.async(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200265 if new_task:
266 # An exception is raised if the future didn't complete, so there
267 # is no need to log the "destroy pending task" message
268 future._log_destroy_pending = False
269
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700270 future.add_done_callback(_raise_stop_error)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200271 try:
272 self.run_forever()
273 except:
274 if new_task and future.done() and not future.cancelled():
275 # The coroutine raised a BaseException. Consume the exception
276 # to not log a warning, the caller doesn't have access to the
277 # local task.
278 future.exception()
279 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700280 future.remove_done_callback(_raise_stop_error)
281 if not future.done():
282 raise RuntimeError('Event loop stopped before Future completed.')
283
284 return future.result()
285
286 def stop(self):
287 """Stop running the event loop.
288
Victor Stinner5006b1f2014-07-24 11:34:11 +0200289 Every callback scheduled before stop() is called will run. Callbacks
290 scheduled after stop() is called will not run. However, those callbacks
291 will run if run_forever is called again later.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700292 """
293 self.call_soon(_raise_stop_error)
294
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200295 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700296 """Close the event loop.
297
298 This clears the queues and shuts down the executor,
299 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200300
301 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700302 """
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200303 if self._running:
Victor Stinneracdb7822014-07-14 18:33:40 +0200304 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200305 if self._closed:
306 return
Victor Stinnere912e652014-07-12 03:11:53 +0200307 if self._debug:
308 logger.debug("Close %r", self)
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200309 self._closed = True
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200310 self._ready.clear()
311 self._scheduled.clear()
312 executor = self._default_executor
313 if executor is not None:
314 self._default_executor = None
315 executor.shutdown(wait=False)
316
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200317 def is_closed(self):
318 """Returns True if the event loop was closed."""
319 return self._closed
320
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700321 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200322 """Returns True if the event loop is running."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323 return self._running
324
325 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200326 """Return the time according to the event loop's clock.
327
328 This is a float expressed in seconds since an epoch, but the
329 epoch, precision, accuracy and drift are unspecified and may
330 differ per event loop.
331 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 return time.monotonic()
333
334 def call_later(self, delay, callback, *args):
335 """Arrange for a callback to be called at a given time.
336
337 Return a Handle: an opaque object with a cancel() method that
338 can be used to cancel the call.
339
340 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200341 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342
343 Each callback will be called exactly once. If two callbacks
344 are scheduled for exactly the same time, it undefined which
345 will be called first.
346
347 Any positional arguments after the callback will be passed to
348 the callback when it is called.
349 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200350 timer = self.call_at(self.time() + delay, callback, *args)
351 if timer._source_traceback:
352 del timer._source_traceback[-1]
353 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354
355 def call_at(self, when, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200356 """Like call_later(), but uses an absolute time.
357
358 Absolute time corresponds to the event loop's time() method.
359 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200360 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100361 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinner93569c22014-03-21 10:00:52 +0100362 if self._debug:
363 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500364 timer = events.TimerHandle(when, callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200365 if timer._source_traceback:
366 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400368 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700369 return timer
370
371 def call_soon(self, callback, *args):
372 """Arrange for a callback to be called as soon as possible.
373
Victor Stinneracdb7822014-07-14 18:33:40 +0200374 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375 order in which they are registered. Each callback will be
376 called exactly once.
377
378 Any positional arguments after the callback will be passed to
379 the callback when it is called.
380 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200381 handle = self._call_soon(callback, args, check_loop=True)
382 if handle._source_traceback:
383 del handle._source_traceback[-1]
384 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100385
386 def _call_soon(self, callback, args, check_loop):
Victor Stinnerf951d282014-06-29 00:46:45 +0200387 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100388 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinner93569c22014-03-21 10:00:52 +0100389 if self._debug and check_loop:
390 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500391 handle = events.Handle(callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200392 if handle._source_traceback:
393 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700394 self._ready.append(handle)
395 return handle
396
Victor Stinner93569c22014-03-21 10:00:52 +0100397 def _assert_is_current_event_loop(self):
398 """Asserts that this event loop is the current event loop.
399
Victor Stinneracdb7822014-07-14 18:33:40 +0200400 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100401 likely behave incorrectly when the assumption is violated.
402
Victor Stinneracdb7822014-07-14 18:33:40 +0200403 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100404 responsible for checking this condition for performance reasons.
405 """
Victor Stinner751c7c02014-06-23 15:14:13 +0200406 try:
407 current = events.get_event_loop()
408 except AssertionError:
409 return
410 if current is not self:
Victor Stinner93569c22014-03-21 10:00:52 +0100411 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200412 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100413 "than the current one")
414
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700415 def call_soon_threadsafe(self, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200416 """Like call_soon(), but thread-safe."""
Victor Stinner93569c22014-03-21 10:00:52 +0100417 handle = self._call_soon(callback, args, check_loop=False)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200418 if handle._source_traceback:
419 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700420 self._write_to_self()
421 return handle
422
423 def run_in_executor(self, executor, callback, *args):
Victor Stinnerf951d282014-06-29 00:46:45 +0200424 if coroutines.iscoroutinefunction(callback):
Victor Stinneracdb7822014-07-14 18:33:40 +0200425 raise TypeError("Coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700426 if isinstance(callback, events.Handle):
427 assert not args
428 assert not isinstance(callback, events.TimerHandle)
429 if callback._cancelled:
430 f = futures.Future(loop=self)
431 f.set_result(None)
432 return f
433 callback, args = callback._callback, callback._args
434 if executor is None:
435 executor = self._default_executor
436 if executor is None:
437 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
438 self._default_executor = executor
439 return futures.wrap_future(executor.submit(callback, *args), loop=self)
440
441 def set_default_executor(self, executor):
442 self._default_executor = executor
443
Victor Stinnere912e652014-07-12 03:11:53 +0200444 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
445 msg = ["%s:%r" % (host, port)]
446 if family:
447 msg.append('family=%r' % family)
448 if type:
449 msg.append('type=%r' % type)
450 if proto:
451 msg.append('proto=%r' % proto)
452 if flags:
453 msg.append('flags=%r' % flags)
454 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200455 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200456
457 t0 = self.time()
458 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
459 dt = self.time() - t0
460
Victor Stinneracdb7822014-07-14 18:33:40 +0200461 msg = ('Getting address info %s took %.3f ms: %r'
Victor Stinnere912e652014-07-12 03:11:53 +0200462 % (msg, dt * 1e3, addrinfo))
463 if dt >= self.slow_callback_duration:
464 logger.info(msg)
465 else:
466 logger.debug(msg)
467 return addrinfo
468
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469 def getaddrinfo(self, host, port, *,
470 family=0, type=0, proto=0, flags=0):
Victor Stinnere912e652014-07-12 03:11:53 +0200471 if self._debug:
472 return self.run_in_executor(None, self._getaddrinfo_debug,
473 host, port, family, type, proto, flags)
474 else:
475 return self.run_in_executor(None, socket.getaddrinfo,
476 host, port, family, type, proto, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700477
478 def getnameinfo(self, sockaddr, flags=0):
479 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
480
Victor Stinnerf951d282014-06-29 00:46:45 +0200481 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482 def create_connection(self, protocol_factory, host=None, port=None, *,
483 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700484 local_addr=None, server_hostname=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200485 """Connect to a TCP server.
486
487 Create a streaming transport connection to a given Internet host and
488 port: socket family AF_INET or socket.AF_INET6 depending on host (or
489 family if specified), socket type SOCK_STREAM. protocol_factory must be
490 a callable returning a protocol instance.
491
492 This method is a coroutine which will try to establish the connection
493 in the background. When successful, the coroutine returns a
494 (transport, protocol) pair.
495 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700496 if server_hostname is not None and not ssl:
497 raise ValueError('server_hostname is only meaningful with ssl')
498
499 if server_hostname is None and ssl:
500 # Use host as default for server_hostname. It is an error
501 # if host is empty or not set, e.g. when an
502 # already-connected socket was passed or when only a port
503 # is given. To avoid this error, you can pass
504 # server_hostname='' -- this will bypass the hostname
505 # check. (This also means that if host is a numeric
506 # IP/IPv6 address, we will attempt to verify that exact
507 # address; this will probably fail, but it is possible to
508 # create a certificate for a specific IP address, so we
509 # don't judge it here.)
510 if not host:
511 raise ValueError('You must set server_hostname '
512 'when using ssl without a host')
513 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700514
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700515 if host is not None or port is not None:
516 if sock is not None:
517 raise ValueError(
518 'host/port and sock can not be specified at the same time')
519
520 f1 = self.getaddrinfo(
521 host, port, family=family,
522 type=socket.SOCK_STREAM, proto=proto, flags=flags)
523 fs = [f1]
524 if local_addr is not None:
525 f2 = self.getaddrinfo(
526 *local_addr, family=family,
527 type=socket.SOCK_STREAM, proto=proto, flags=flags)
528 fs.append(f2)
529 else:
530 f2 = None
531
532 yield from tasks.wait(fs, loop=self)
533
534 infos = f1.result()
535 if not infos:
536 raise OSError('getaddrinfo() returned empty list')
537 if f2 is not None:
538 laddr_infos = f2.result()
539 if not laddr_infos:
540 raise OSError('getaddrinfo() returned empty list')
541
542 exceptions = []
543 for family, type, proto, cname, address in infos:
544 try:
545 sock = socket.socket(family=family, type=type, proto=proto)
546 sock.setblocking(False)
547 if f2 is not None:
548 for _, _, _, _, laddr in laddr_infos:
549 try:
550 sock.bind(laddr)
551 break
552 except OSError as exc:
553 exc = OSError(
554 exc.errno, 'error while '
555 'attempting to bind on address '
556 '{!r}: {}'.format(
557 laddr, exc.strerror.lower()))
558 exceptions.append(exc)
559 else:
560 sock.close()
561 sock = None
562 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200563 if self._debug:
564 logger.debug("connect %r to %r", sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700565 yield from self.sock_connect(sock, address)
566 except OSError as exc:
567 if sock is not None:
568 sock.close()
569 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200570 except:
571 if sock is not None:
572 sock.close()
573 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700574 else:
575 break
576 else:
577 if len(exceptions) == 1:
578 raise exceptions[0]
579 else:
580 # If they all have the same str(), raise one.
581 model = str(exceptions[0])
582 if all(str(exc) == model for exc in exceptions):
583 raise exceptions[0]
584 # Raise a combined exception so the user can see all
585 # the various error messages.
586 raise OSError('Multiple exceptions: {}'.format(
587 ', '.join(str(exc) for exc in exceptions)))
588
589 elif sock is None:
590 raise ValueError(
591 'host and port was not specified and no sock specified')
592
593 sock.setblocking(False)
594
Yury Selivanovb057c522014-02-18 12:15:06 -0500595 transport, protocol = yield from self._create_connection_transport(
596 sock, protocol_factory, ssl, server_hostname)
Victor Stinnere912e652014-07-12 03:11:53 +0200597 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +0200598 # Get the socket from the transport because SSL transport closes
599 # the old socket and creates a new SSL socket
600 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +0200601 logger.debug("%r connected to %s:%r: (%r, %r)",
602 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -0500603 return transport, protocol
604
Victor Stinnerf951d282014-06-29 00:46:45 +0200605 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500606 def _create_connection_transport(self, sock, protocol_factory, ssl,
607 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700608 protocol = protocol_factory()
609 waiter = futures.Future(loop=self)
610 if ssl:
611 sslcontext = None if isinstance(ssl, bool) else ssl
612 transport = self._make_ssl_transport(
613 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700614 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 else:
616 transport = self._make_socket_transport(sock, protocol, waiter)
617
618 yield from waiter
619 return transport, protocol
620
Victor Stinnerf951d282014-06-29 00:46:45 +0200621 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700622 def create_datagram_endpoint(self, protocol_factory,
623 local_addr=None, remote_addr=None, *,
624 family=0, proto=0, flags=0):
625 """Create datagram connection."""
626 if not (local_addr or remote_addr):
627 if family == 0:
628 raise ValueError('unexpected address family')
629 addr_pairs_info = (((family, proto), (None, None)),)
630 else:
Victor Stinneracdb7822014-07-14 18:33:40 +0200631 # join address by (family, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632 addr_infos = collections.OrderedDict()
633 for idx, addr in ((0, local_addr), (1, remote_addr)):
634 if addr is not None:
635 assert isinstance(addr, tuple) and len(addr) == 2, (
636 '2-tuple is expected')
637
638 infos = yield from self.getaddrinfo(
639 *addr, family=family, type=socket.SOCK_DGRAM,
640 proto=proto, flags=flags)
641 if not infos:
642 raise OSError('getaddrinfo() returned empty list')
643
644 for fam, _, pro, _, address in infos:
645 key = (fam, pro)
646 if key not in addr_infos:
647 addr_infos[key] = [None, None]
648 addr_infos[key][idx] = address
649
650 # each addr has to have info for each (family, proto) pair
651 addr_pairs_info = [
652 (key, addr_pair) for key, addr_pair in addr_infos.items()
653 if not ((local_addr and addr_pair[0] is None) or
654 (remote_addr and addr_pair[1] is None))]
655
656 if not addr_pairs_info:
657 raise ValueError('can not get address information')
658
659 exceptions = []
660
661 for ((family, proto),
662 (local_address, remote_address)) in addr_pairs_info:
663 sock = None
664 r_addr = None
665 try:
666 sock = socket.socket(
667 family=family, type=socket.SOCK_DGRAM, proto=proto)
668 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
669 sock.setblocking(False)
670
671 if local_addr:
672 sock.bind(local_address)
673 if remote_addr:
674 yield from self.sock_connect(sock, remote_address)
675 r_addr = remote_address
676 except OSError as exc:
677 if sock is not None:
678 sock.close()
679 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200680 except:
681 if sock is not None:
682 sock.close()
683 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700684 else:
685 break
686 else:
687 raise exceptions[0]
688
689 protocol = protocol_factory()
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200690 waiter = futures.Future(loop=self)
691 transport = self._make_datagram_transport(sock, protocol, r_addr,
692 waiter)
Victor Stinnere912e652014-07-12 03:11:53 +0200693 if self._debug:
694 if local_addr:
695 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
696 "created: (%r, %r)",
697 local_addr, remote_addr, transport, protocol)
698 else:
699 logger.debug("Datagram endpoint remote_addr=%r created: "
700 "(%r, %r)",
701 remote_addr, transport, protocol)
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200702 yield from waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700703 return transport, protocol
704
Victor Stinnerf951d282014-06-29 00:46:45 +0200705 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700706 def create_server(self, protocol_factory, host=None, port=None,
707 *,
708 family=socket.AF_UNSPEC,
709 flags=socket.AI_PASSIVE,
710 sock=None,
711 backlog=100,
712 ssl=None,
713 reuse_address=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200714 """Create a TCP server bound to host and port.
715
Victor Stinneracdb7822014-07-14 18:33:40 +0200716 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +0200717
718 This method is a coroutine.
719 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700720 if isinstance(ssl, bool):
721 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700722 if host is not None or port is not None:
723 if sock is not None:
724 raise ValueError(
725 'host/port and sock can not be specified at the same time')
726
727 AF_INET6 = getattr(socket, 'AF_INET6', 0)
728 if reuse_address is None:
729 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
730 sockets = []
731 if host == '':
732 host = None
733
734 infos = yield from self.getaddrinfo(
735 host, port, family=family,
736 type=socket.SOCK_STREAM, proto=0, flags=flags)
737 if not infos:
738 raise OSError('getaddrinfo() returned empty list')
739
740 completed = False
741 try:
742 for res in infos:
743 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700744 try:
745 sock = socket.socket(af, socktype, proto)
746 except socket.error:
747 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +0200748 if self._debug:
749 logger.warning('create_server() failed to create '
750 'socket.socket(%r, %r, %r)',
751 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -0700752 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700753 sockets.append(sock)
754 if reuse_address:
755 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
756 True)
757 # Disable IPv4/IPv6 dual stack support (enabled by
758 # default on Linux) which makes a single socket
759 # listen on both address families.
760 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
761 sock.setsockopt(socket.IPPROTO_IPV6,
762 socket.IPV6_V6ONLY,
763 True)
764 try:
765 sock.bind(sa)
766 except OSError as err:
767 raise OSError(err.errno, 'error while attempting '
768 'to bind on address %r: %s'
769 % (sa, err.strerror.lower()))
770 completed = True
771 finally:
772 if not completed:
773 for sock in sockets:
774 sock.close()
775 else:
776 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +0200777 raise ValueError('Neither host/port nor sock were specified')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700778 sockets = [sock]
779
780 server = Server(self, sockets)
781 for sock in sockets:
782 sock.listen(backlog)
783 sock.setblocking(False)
784 self._start_serving(protocol_factory, sock, ssl, server)
Victor Stinnere912e652014-07-12 03:11:53 +0200785 if self._debug:
786 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700787 return server
788
Victor Stinnerf951d282014-06-29 00:46:45 +0200789 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700790 def connect_read_pipe(self, protocol_factory, pipe):
791 protocol = protocol_factory()
792 waiter = futures.Future(loop=self)
793 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
794 yield from waiter
Victor Stinneracdb7822014-07-14 18:33:40 +0200795 if self._debug:
796 logger.debug('Read pipe %r connected: (%r, %r)',
797 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700798 return transport, protocol
799
Victor Stinnerf951d282014-06-29 00:46:45 +0200800 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700801 def connect_write_pipe(self, protocol_factory, pipe):
802 protocol = protocol_factory()
803 waiter = futures.Future(loop=self)
804 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
805 yield from waiter
Victor Stinneracdb7822014-07-14 18:33:40 +0200806 if self._debug:
807 logger.debug('Write pipe %r connected: (%r, %r)',
808 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700809 return transport, protocol
810
Victor Stinneracdb7822014-07-14 18:33:40 +0200811 def _log_subprocess(self, msg, stdin, stdout, stderr):
812 info = [msg]
813 if stdin is not None:
814 info.append('stdin=%s' % _format_pipe(stdin))
815 if stdout is not None and stderr == subprocess.STDOUT:
816 info.append('stdout=stderr=%s' % _format_pipe(stdout))
817 else:
818 if stdout is not None:
819 info.append('stdout=%s' % _format_pipe(stdout))
820 if stderr is not None:
821 info.append('stderr=%s' % _format_pipe(stderr))
822 logger.debug(' '.join(info))
823
Victor Stinnerf951d282014-06-29 00:46:45 +0200824 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700825 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
826 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
827 universal_newlines=False, shell=True, bufsize=0,
828 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100829 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800830 raise ValueError("cmd must be a string")
831 if universal_newlines:
832 raise ValueError("universal_newlines must be False")
833 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100834 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800835 if bufsize != 0:
836 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700837 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +0200838 if self._debug:
839 # don't log parameters: they may contain sensitive information
840 # (password) and may be too long
841 debug_log = 'run shell command %r' % cmd
842 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700843 transport = yield from self._make_subprocess_transport(
844 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +0200845 if self._debug:
846 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700847 return transport, protocol
848
Victor Stinnerf951d282014-06-29 00:46:45 +0200849 @coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500850 def subprocess_exec(self, protocol_factory, program, *args,
851 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
852 stderr=subprocess.PIPE, universal_newlines=False,
853 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800854 if universal_newlines:
855 raise ValueError("universal_newlines must be False")
856 if shell:
857 raise ValueError("shell must be False")
858 if bufsize != 0:
859 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100860 popen_args = (program,) + args
861 for arg in popen_args:
862 if not isinstance(arg, (str, bytes)):
863 raise TypeError("program arguments must be "
864 "a bytes or text string, not %s"
865 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700866 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +0200867 if self._debug:
868 # don't log parameters: they may contain sensitive information
869 # (password) and may be too long
870 debug_log = 'execute program %r' % program
871 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700872 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500873 protocol, popen_args, False, stdin, stdout, stderr,
874 bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +0200875 if self._debug:
876 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700877 return transport, protocol
878
Yury Selivanov569efa22014-02-18 18:02:19 -0500879 def set_exception_handler(self, handler):
880 """Set handler as the new event loop exception handler.
881
882 If handler is None, the default exception handler will
883 be set.
884
885 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +0200886 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -0500887 will be a reference to the active event loop, 'context'
888 will be a dict object (see `call_exception_handler()`
889 documentation for details about context).
890 """
891 if handler is not None and not callable(handler):
892 raise TypeError('A callable object or None is expected, '
893 'got {!r}'.format(handler))
894 self._exception_handler = handler
895
896 def default_exception_handler(self, context):
897 """Default exception handler.
898
899 This is called when an exception occurs and no exception
900 handler is set, and can be called by a custom exception
901 handler that wants to defer to the default behavior.
902
Victor Stinneracdb7822014-07-14 18:33:40 +0200903 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -0500904 `call_exception_handler()`.
905 """
906 message = context.get('message')
907 if not message:
908 message = 'Unhandled exception in event loop'
909
910 exception = context.get('exception')
911 if exception is not None:
912 exc_info = (type(exception), exception, exception.__traceback__)
913 else:
914 exc_info = False
915
916 log_lines = [message]
917 for key in sorted(context):
918 if key in {'message', 'exception'}:
919 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +0200920 value = context[key]
921 if key == 'source_traceback':
922 tb = ''.join(traceback.format_list(value))
923 value = 'Object created at (most recent call last):\n'
924 value += tb.rstrip()
925 else:
926 value = repr(value)
927 log_lines.append('{}: {}'.format(key, value))
Yury Selivanov569efa22014-02-18 18:02:19 -0500928
929 logger.error('\n'.join(log_lines), exc_info=exc_info)
930
931 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +0200932 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -0500933
Victor Stinneracdb7822014-07-14 18:33:40 +0200934 The context argument is a dict containing the following keys:
935
Yury Selivanov569efa22014-02-18 18:02:19 -0500936 - 'message': Error message;
937 - 'exception' (optional): Exception object;
938 - 'future' (optional): Future instance;
939 - 'handle' (optional): Handle instance;
940 - 'protocol' (optional): Protocol instance;
941 - 'transport' (optional): Transport instance;
942 - 'socket' (optional): Socket instance.
943
Victor Stinneracdb7822014-07-14 18:33:40 +0200944 New keys maybe introduced in the future.
945
946 Note: do not overload this method in an event loop subclass.
947 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -0500948 `set_exception_handler()` method.
949 """
950 if self._exception_handler is None:
951 try:
952 self.default_exception_handler(context)
953 except Exception:
954 # Second protection layer for unexpected errors
955 # in the default implementation, as well as for subclassed
956 # event loops with overloaded "default_exception_handler".
957 logger.error('Exception in default exception handler',
958 exc_info=True)
959 else:
960 try:
961 self._exception_handler(self, context)
962 except Exception as exc:
963 # Exception in the user set custom exception handler.
964 try:
965 # Let's try default handler.
966 self.default_exception_handler({
967 'message': 'Unhandled error in exception handler',
968 'exception': exc,
969 'context': context,
970 })
971 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +0200972 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -0500973 # overloaded.
974 logger.error('Exception in default exception handler '
975 'while handling an unexpected error '
976 'in custom exception handler',
977 exc_info=True)
978
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700979 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +0200980 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700981 assert isinstance(handle, events.Handle), 'A Handle is required here'
982 if handle._cancelled:
983 return
Yury Selivanov592ada92014-09-25 12:07:56 -0400984 assert not isinstance(handle, events.TimerHandle)
985 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700986
987 def _add_callback_signalsafe(self, handle):
988 """Like _add_callback() but called from a signal handler."""
989 self._add_callback(handle)
990 self._write_to_self()
991
Yury Selivanov592ada92014-09-25 12:07:56 -0400992 def _timer_handle_cancelled(self, handle):
993 """Notification that a TimerHandle has been cancelled."""
994 if handle._scheduled:
995 self._timer_cancelled_count += 1
996
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700997 def _run_once(self):
998 """Run one full iteration of the event loop.
999
1000 This calls all currently ready callbacks, polls for I/O,
1001 schedules the resulting callbacks, and finally schedules
1002 'call_later' callbacks.
1003 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001004
Yury Selivanov592ada92014-09-25 12:07:56 -04001005 sched_count = len(self._scheduled)
1006 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1007 self._timer_cancelled_count / sched_count >
1008 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001009 # Remove delayed calls that were cancelled if their number
1010 # is too high
1011 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001012 for handle in self._scheduled:
1013 if handle._cancelled:
1014 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001015 else:
1016 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001017
Victor Stinner68da8fc2014-09-30 18:08:36 +02001018 heapq.heapify(new_scheduled)
1019 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001020 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001021 else:
1022 # Remove delayed calls that were cancelled from head of queue.
1023 while self._scheduled and self._scheduled[0]._cancelled:
1024 self._timer_cancelled_count -= 1
1025 handle = heapq.heappop(self._scheduled)
1026 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001027
1028 timeout = None
1029 if self._ready:
1030 timeout = 0
1031 elif self._scheduled:
1032 # Compute the desired timeout.
1033 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -07001034 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001035
Victor Stinner770e48d2014-07-11 11:58:33 +02001036 if self._debug and timeout != 0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001037 t0 = self.time()
1038 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001039 dt = self.time() - t0
Victor Stinner770e48d2014-07-11 11:58:33 +02001040 if dt >= 1.0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001041 level = logging.INFO
1042 else:
1043 level = logging.DEBUG
Victor Stinner770e48d2014-07-11 11:58:33 +02001044 nevent = len(event_list)
1045 if timeout is None:
1046 logger.log(level, 'poll took %.3f ms: %s events',
1047 dt * 1e3, nevent)
1048 elif nevent:
1049 logger.log(level,
1050 'poll %.3f ms took %.3f ms: %s events',
1051 timeout * 1e3, dt * 1e3, nevent)
1052 elif dt >= 1.0:
1053 logger.log(level,
1054 'poll %.3f ms took %.3f ms: timeout',
1055 timeout * 1e3, dt * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001056 else:
Victor Stinner22463aa2014-01-20 23:56:40 +01001057 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001058 self._process_events(event_list)
1059
1060 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001061 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001062 while self._scheduled:
1063 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001064 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001065 break
1066 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001067 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001068 self._ready.append(handle)
1069
1070 # This is the only place where callbacks are actually *called*.
1071 # All other places just add them to ready.
1072 # Note: We run all currently scheduled callbacks, but not any
1073 # callbacks scheduled by callbacks run this time around --
1074 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001075 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001076 ntodo = len(self._ready)
1077 for i in range(ntodo):
1078 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001079 if handle._cancelled:
1080 continue
1081 if self._debug:
1082 t0 = self.time()
1083 handle._run()
1084 dt = self.time() - t0
1085 if dt >= self.slow_callback_duration:
1086 logger.warning('Executing %s took %.3f seconds',
1087 _format_handle(handle), dt)
1088 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001089 handle._run()
1090 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001091
1092 def get_debug(self):
1093 return self._debug
1094
1095 def set_debug(self, enabled):
1096 self._debug = enabled