blob: 40dd66827b66cb067f8b5d7ed87d0d801e94eb10 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
Victor Stinner0e6f52a2014-06-20 17:34:15 +020020import inspect
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
24import subprocess
25import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020026import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027import sys
28
Victor Stinnerf951d282014-06-29 00:46:45 +020029from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030from . import events
31from . import futures
32from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020033from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070034from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
36
37__all__ = ['BaseEventLoop', 'Server']
38
39
40# Argument for default thread pool executor creation.
41_MAX_WORKERS = 5
42
Yury Selivanov592ada92014-09-25 12:07:56 -040043# Minimum number of _scheduled timer handles before cleanup of
44# cancelled handles is performed.
45_MIN_SCHEDULED_TIMER_HANDLES = 100
46
47# Minimum fraction of _scheduled timer handles that are cancelled
48# before cleanup of cancelled handles is performed.
49_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
Victor Stinner0e6f52a2014-06-20 17:34:15 +020051def _format_handle(handle):
52 cb = handle._callback
53 if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
54 # format the task
55 return repr(cb.__self__)
56 else:
57 return str(handle)
58
59
Victor Stinneracdb7822014-07-14 18:33:40 +020060def _format_pipe(fd):
61 if fd == subprocess.PIPE:
62 return '<pipe>'
63 elif fd == subprocess.STDOUT:
64 return '<stdout>'
65 else:
66 return repr(fd)
67
68
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069class _StopError(BaseException):
70 """Raised to stop the event loop."""
71
72
Victor Stinner1b0580b2014-02-13 09:24:37 +010073def _check_resolved_address(sock, address):
74 # Ensure that the address is already resolved to avoid the trap of hanging
75 # the entire event loop when the address requires doing a DNS lookup.
76 family = sock.family
Victor Stinnerd1a727a2014-02-20 16:43:09 +010077 if family == socket.AF_INET:
78 host, port = address
79 elif family == socket.AF_INET6:
Victor Stinner934c8852014-02-20 21:59:38 +010080 host, port = address[:2]
Victor Stinnerd1a727a2014-02-20 16:43:09 +010081 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010082 return
83
Victor Stinner1b0580b2014-02-13 09:24:37 +010084 type_mask = 0
85 if hasattr(socket, 'SOCK_NONBLOCK'):
86 type_mask |= socket.SOCK_NONBLOCK
87 if hasattr(socket, 'SOCK_CLOEXEC'):
88 type_mask |= socket.SOCK_CLOEXEC
Victor Stinneracdb7822014-07-14 18:33:40 +020089 # Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
Victor Stinner1b0580b2014-02-13 09:24:37 +010090 # already resolved.
91 try:
92 socket.getaddrinfo(host, port,
93 family=family,
94 type=(sock.type & ~type_mask),
95 proto=sock.proto,
96 flags=socket.AI_NUMERICHOST)
97 except socket.gaierror as err:
98 raise ValueError("address must be resolved (IP address), got %r: %s"
99 % (address, err))
100
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700101def _raise_stop_error(*args):
102 raise _StopError
103
104
105class Server(events.AbstractServer):
106
107 def __init__(self, loop, sockets):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200108 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700109 self.sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200110 self._active_count = 0
111 self._waiters = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700112
Victor Stinnere912e652014-07-12 03:11:53 +0200113 def __repr__(self):
114 return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
115
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200116 def _attach(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117 assert self.sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200118 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200120 def _detach(self):
121 assert self._active_count > 0
122 self._active_count -= 1
123 if self._active_count == 0 and self.sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124 self._wakeup()
125
126 def close(self):
127 sockets = self.sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200128 if sockets is None:
129 return
130 self.sockets = None
131 for sock in sockets:
132 self._loop._stop_serving(sock)
133 if self._active_count == 0:
134 self._wakeup()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700135
136 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200137 waiters = self._waiters
138 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700139 for waiter in waiters:
140 if not waiter.done():
141 waiter.set_result(waiter)
142
Victor Stinnerf951d282014-06-29 00:46:45 +0200143 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700144 def wait_closed(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200145 if self.sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146 return
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200147 waiter = futures.Future(loop=self._loop)
148 self._waiters.append(waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149 yield from waiter
150
151
152class BaseEventLoop(events.AbstractEventLoop):
153
154 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400155 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200156 self._closed = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700157 self._ready = collections.deque()
158 self._scheduled = []
159 self._default_executor = None
160 self._internal_fds = 0
161 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100162 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500163 self._exception_handler = None
Victor Stinner7b7120e2014-06-23 00:12:14 +0200164 self._debug = (not sys.flags.ignore_environment
165 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200166 # In debug mode, if the execution of a callback or a step of a task
167 # exceed this duration in seconds, the slow callback/task is logged.
168 self.slow_callback_duration = 0.1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700169
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200170 def __repr__(self):
171 return ('<%s running=%s closed=%s debug=%s>'
172 % (self.__class__.__name__, self.is_running(),
173 self.is_closed(), self.get_debug()))
174
Victor Stinner896a25a2014-07-08 11:29:25 +0200175 def create_task(self, coro):
176 """Schedule a coroutine object.
177
Victor Stinneracdb7822014-07-14 18:33:40 +0200178 Return a task object.
179 """
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200180 task = tasks.Task(coro, loop=self)
181 if task._source_traceback:
182 del task._source_traceback[-1]
183 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200184
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185 def _make_socket_transport(self, sock, protocol, waiter=None, *,
186 extra=None, server=None):
187 """Create socket transport."""
188 raise NotImplementedError
189
190 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
191 server_side=False, server_hostname=None,
192 extra=None, server=None):
193 """Create SSL transport."""
194 raise NotImplementedError
195
196 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200197 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700198 """Create datagram transport."""
199 raise NotImplementedError
200
201 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
202 extra=None):
203 """Create read pipe transport."""
204 raise NotImplementedError
205
206 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
207 extra=None):
208 """Create write pipe transport."""
209 raise NotImplementedError
210
Victor Stinnerf951d282014-06-29 00:46:45 +0200211 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700212 def _make_subprocess_transport(self, protocol, args, shell,
213 stdin, stdout, stderr, bufsize,
214 extra=None, **kwargs):
215 """Create subprocess transport."""
216 raise NotImplementedError
217
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200219 """Write a byte to self-pipe, to wake up the event loop.
220
221 This may be called from a different thread.
222
223 The subclass is responsible for implementing the self-pipe.
224 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700225 raise NotImplementedError
226
227 def _process_events(self, event_list):
228 """Process selector events."""
229 raise NotImplementedError
230
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200231 def _check_closed(self):
232 if self._closed:
233 raise RuntimeError('Event loop is closed')
234
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235 def run_forever(self):
236 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200237 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700238 if self._running:
239 raise RuntimeError('Event loop is running.')
240 self._running = True
241 try:
242 while True:
243 try:
244 self._run_once()
245 except _StopError:
246 break
247 finally:
248 self._running = False
249
250 def run_until_complete(self, future):
251 """Run until the Future is done.
252
253 If the argument is a coroutine, it is wrapped in a Task.
254
Victor Stinneracdb7822014-07-14 18:33:40 +0200255 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700256 with the same coroutine twice -- it would wrap it in two
257 different Tasks and that can't be good.
258
259 Return the Future's result, or raise its exception.
260 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200261 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200262
263 new_task = not isinstance(future, futures.Future)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264 future = tasks.async(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200265 if new_task:
266 # An exception is raised if the future didn't complete, so there
267 # is no need to log the "destroy pending task" message
268 future._log_destroy_pending = False
269
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700270 future.add_done_callback(_raise_stop_error)
Victor Stinnerc8bd53f2014-10-11 14:30:18 +0200271 try:
272 self.run_forever()
273 except:
274 if new_task and future.done() and not future.cancelled():
275 # The coroutine raised a BaseException. Consume the exception
276 # to not log a warning, the caller doesn't have access to the
277 # local task.
278 future.exception()
279 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700280 future.remove_done_callback(_raise_stop_error)
281 if not future.done():
282 raise RuntimeError('Event loop stopped before Future completed.')
283
284 return future.result()
285
286 def stop(self):
287 """Stop running the event loop.
288
Victor Stinner5006b1f2014-07-24 11:34:11 +0200289 Every callback scheduled before stop() is called will run. Callbacks
290 scheduled after stop() is called will not run. However, those callbacks
291 will run if run_forever is called again later.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700292 """
293 self.call_soon(_raise_stop_error)
294
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200295 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700296 """Close the event loop.
297
298 This clears the queues and shuts down the executor,
299 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200300
301 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700302 """
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200303 if self._running:
Victor Stinneracdb7822014-07-14 18:33:40 +0200304 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200305 if self._closed:
306 return
Victor Stinnere912e652014-07-12 03:11:53 +0200307 if self._debug:
308 logger.debug("Close %r", self)
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200309 self._closed = True
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200310 self._ready.clear()
311 self._scheduled.clear()
312 executor = self._default_executor
313 if executor is not None:
314 self._default_executor = None
315 executor.shutdown(wait=False)
316
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200317 def is_closed(self):
318 """Returns True if the event loop was closed."""
319 return self._closed
320
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700321 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200322 """Returns True if the event loop is running."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700323 return self._running
324
325 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200326 """Return the time according to the event loop's clock.
327
328 This is a float expressed in seconds since an epoch, but the
329 epoch, precision, accuracy and drift are unspecified and may
330 differ per event loop.
331 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700332 return time.monotonic()
333
334 def call_later(self, delay, callback, *args):
335 """Arrange for a callback to be called at a given time.
336
337 Return a Handle: an opaque object with a cancel() method that
338 can be used to cancel the call.
339
340 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200341 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700342
343 Each callback will be called exactly once. If two callbacks
344 are scheduled for exactly the same time, it undefined which
345 will be called first.
346
347 Any positional arguments after the callback will be passed to
348 the callback when it is called.
349 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200350 timer = self.call_at(self.time() + delay, callback, *args)
351 if timer._source_traceback:
352 del timer._source_traceback[-1]
353 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354
355 def call_at(self, when, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200356 """Like call_later(), but uses an absolute time.
357
358 Absolute time corresponds to the event loop's time() method.
359 """
Victor Stinner2d99d932014-11-20 15:03:52 +0100360 if (coroutines.iscoroutine(callback)
361 or coroutines.iscoroutinefunction(callback)):
Victor Stinner9af4a242014-02-11 11:34:30 +0100362 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinner93569c22014-03-21 10:00:52 +0100363 if self._debug:
364 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500365 timer = events.TimerHandle(when, callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200366 if timer._source_traceback:
367 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700368 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400369 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700370 return timer
371
372 def call_soon(self, callback, *args):
373 """Arrange for a callback to be called as soon as possible.
374
Victor Stinneracdb7822014-07-14 18:33:40 +0200375 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700376 order in which they are registered. Each callback will be
377 called exactly once.
378
379 Any positional arguments after the callback will be passed to
380 the callback when it is called.
381 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200382 handle = self._call_soon(callback, args, check_loop=True)
383 if handle._source_traceback:
384 del handle._source_traceback[-1]
385 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100386
387 def _call_soon(self, callback, args, check_loop):
Victor Stinner2d99d932014-11-20 15:03:52 +0100388 if (coroutines.iscoroutine(callback)
389 or coroutines.iscoroutinefunction(callback)):
Victor Stinner9af4a242014-02-11 11:34:30 +0100390 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinner93569c22014-03-21 10:00:52 +0100391 if self._debug and check_loop:
392 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500393 handle = events.Handle(callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200394 if handle._source_traceback:
395 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700396 self._ready.append(handle)
397 return handle
398
Victor Stinner93569c22014-03-21 10:00:52 +0100399 def _assert_is_current_event_loop(self):
400 """Asserts that this event loop is the current event loop.
401
Victor Stinneracdb7822014-07-14 18:33:40 +0200402 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100403 likely behave incorrectly when the assumption is violated.
404
Victor Stinneracdb7822014-07-14 18:33:40 +0200405 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100406 responsible for checking this condition for performance reasons.
407 """
Victor Stinner751c7c02014-06-23 15:14:13 +0200408 try:
409 current = events.get_event_loop()
410 except AssertionError:
411 return
412 if current is not self:
Victor Stinner93569c22014-03-21 10:00:52 +0100413 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200414 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100415 "than the current one")
416
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417 def call_soon_threadsafe(self, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200418 """Like call_soon(), but thread-safe."""
Victor Stinner93569c22014-03-21 10:00:52 +0100419 handle = self._call_soon(callback, args, check_loop=False)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200420 if handle._source_traceback:
421 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700422 self._write_to_self()
423 return handle
424
425 def run_in_executor(self, executor, callback, *args):
Victor Stinner2d99d932014-11-20 15:03:52 +0100426 if (coroutines.iscoroutine(callback)
427 or coroutines.iscoroutinefunction(callback)):
428 raise TypeError("coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700429 if isinstance(callback, events.Handle):
430 assert not args
431 assert not isinstance(callback, events.TimerHandle)
432 if callback._cancelled:
433 f = futures.Future(loop=self)
434 f.set_result(None)
435 return f
436 callback, args = callback._callback, callback._args
437 if executor is None:
438 executor = self._default_executor
439 if executor is None:
440 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
441 self._default_executor = executor
442 return futures.wrap_future(executor.submit(callback, *args), loop=self)
443
444 def set_default_executor(self, executor):
445 self._default_executor = executor
446
Victor Stinnere912e652014-07-12 03:11:53 +0200447 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
448 msg = ["%s:%r" % (host, port)]
449 if family:
450 msg.append('family=%r' % family)
451 if type:
452 msg.append('type=%r' % type)
453 if proto:
454 msg.append('proto=%r' % proto)
455 if flags:
456 msg.append('flags=%r' % flags)
457 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200458 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200459
460 t0 = self.time()
461 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
462 dt = self.time() - t0
463
Victor Stinneracdb7822014-07-14 18:33:40 +0200464 msg = ('Getting address info %s took %.3f ms: %r'
Victor Stinnere912e652014-07-12 03:11:53 +0200465 % (msg, dt * 1e3, addrinfo))
466 if dt >= self.slow_callback_duration:
467 logger.info(msg)
468 else:
469 logger.debug(msg)
470 return addrinfo
471
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700472 def getaddrinfo(self, host, port, *,
473 family=0, type=0, proto=0, flags=0):
Victor Stinnere912e652014-07-12 03:11:53 +0200474 if self._debug:
475 return self.run_in_executor(None, self._getaddrinfo_debug,
476 host, port, family, type, proto, flags)
477 else:
478 return self.run_in_executor(None, socket.getaddrinfo,
479 host, port, family, type, proto, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480
481 def getnameinfo(self, sockaddr, flags=0):
482 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
483
Victor Stinnerf951d282014-06-29 00:46:45 +0200484 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 def create_connection(self, protocol_factory, host=None, port=None, *,
486 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700487 local_addr=None, server_hostname=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200488 """Connect to a TCP server.
489
490 Create a streaming transport connection to a given Internet host and
491 port: socket family AF_INET or socket.AF_INET6 depending on host (or
492 family if specified), socket type SOCK_STREAM. protocol_factory must be
493 a callable returning a protocol instance.
494
495 This method is a coroutine which will try to establish the connection
496 in the background. When successful, the coroutine returns a
497 (transport, protocol) pair.
498 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700499 if server_hostname is not None and not ssl:
500 raise ValueError('server_hostname is only meaningful with ssl')
501
502 if server_hostname is None and ssl:
503 # Use host as default for server_hostname. It is an error
504 # if host is empty or not set, e.g. when an
505 # already-connected socket was passed or when only a port
506 # is given. To avoid this error, you can pass
507 # server_hostname='' -- this will bypass the hostname
508 # check. (This also means that if host is a numeric
509 # IP/IPv6 address, we will attempt to verify that exact
510 # address; this will probably fail, but it is possible to
511 # create a certificate for a specific IP address, so we
512 # don't judge it here.)
513 if not host:
514 raise ValueError('You must set server_hostname '
515 'when using ssl without a host')
516 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700517
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518 if host is not None or port is not None:
519 if sock is not None:
520 raise ValueError(
521 'host/port and sock can not be specified at the same time')
522
523 f1 = self.getaddrinfo(
524 host, port, family=family,
525 type=socket.SOCK_STREAM, proto=proto, flags=flags)
526 fs = [f1]
527 if local_addr is not None:
528 f2 = self.getaddrinfo(
529 *local_addr, family=family,
530 type=socket.SOCK_STREAM, proto=proto, flags=flags)
531 fs.append(f2)
532 else:
533 f2 = None
534
535 yield from tasks.wait(fs, loop=self)
536
537 infos = f1.result()
538 if not infos:
539 raise OSError('getaddrinfo() returned empty list')
540 if f2 is not None:
541 laddr_infos = f2.result()
542 if not laddr_infos:
543 raise OSError('getaddrinfo() returned empty list')
544
545 exceptions = []
546 for family, type, proto, cname, address in infos:
547 try:
548 sock = socket.socket(family=family, type=type, proto=proto)
549 sock.setblocking(False)
550 if f2 is not None:
551 for _, _, _, _, laddr in laddr_infos:
552 try:
553 sock.bind(laddr)
554 break
555 except OSError as exc:
556 exc = OSError(
557 exc.errno, 'error while '
558 'attempting to bind on address '
559 '{!r}: {}'.format(
560 laddr, exc.strerror.lower()))
561 exceptions.append(exc)
562 else:
563 sock.close()
564 sock = None
565 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200566 if self._debug:
567 logger.debug("connect %r to %r", sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568 yield from self.sock_connect(sock, address)
569 except OSError as exc:
570 if sock is not None:
571 sock.close()
572 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200573 except:
574 if sock is not None:
575 sock.close()
576 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700577 else:
578 break
579 else:
580 if len(exceptions) == 1:
581 raise exceptions[0]
582 else:
583 # If they all have the same str(), raise one.
584 model = str(exceptions[0])
585 if all(str(exc) == model for exc in exceptions):
586 raise exceptions[0]
587 # Raise a combined exception so the user can see all
588 # the various error messages.
589 raise OSError('Multiple exceptions: {}'.format(
590 ', '.join(str(exc) for exc in exceptions)))
591
592 elif sock is None:
593 raise ValueError(
594 'host and port was not specified and no sock specified')
595
596 sock.setblocking(False)
597
Yury Selivanovb057c522014-02-18 12:15:06 -0500598 transport, protocol = yield from self._create_connection_transport(
599 sock, protocol_factory, ssl, server_hostname)
Victor Stinnere912e652014-07-12 03:11:53 +0200600 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +0200601 # Get the socket from the transport because SSL transport closes
602 # the old socket and creates a new SSL socket
603 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +0200604 logger.debug("%r connected to %s:%r: (%r, %r)",
605 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -0500606 return transport, protocol
607
Victor Stinnerf951d282014-06-29 00:46:45 +0200608 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500609 def _create_connection_transport(self, sock, protocol_factory, ssl,
610 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700611 protocol = protocol_factory()
612 waiter = futures.Future(loop=self)
613 if ssl:
614 sslcontext = None if isinstance(ssl, bool) else ssl
615 transport = self._make_ssl_transport(
616 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700617 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700618 else:
619 transport = self._make_socket_transport(sock, protocol, waiter)
620
621 yield from waiter
622 return transport, protocol
623
Victor Stinnerf951d282014-06-29 00:46:45 +0200624 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700625 def create_datagram_endpoint(self, protocol_factory,
626 local_addr=None, remote_addr=None, *,
627 family=0, proto=0, flags=0):
628 """Create datagram connection."""
629 if not (local_addr or remote_addr):
630 if family == 0:
631 raise ValueError('unexpected address family')
632 addr_pairs_info = (((family, proto), (None, None)),)
633 else:
Victor Stinneracdb7822014-07-14 18:33:40 +0200634 # join address by (family, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700635 addr_infos = collections.OrderedDict()
636 for idx, addr in ((0, local_addr), (1, remote_addr)):
637 if addr is not None:
638 assert isinstance(addr, tuple) and len(addr) == 2, (
639 '2-tuple is expected')
640
641 infos = yield from self.getaddrinfo(
642 *addr, family=family, type=socket.SOCK_DGRAM,
643 proto=proto, flags=flags)
644 if not infos:
645 raise OSError('getaddrinfo() returned empty list')
646
647 for fam, _, pro, _, address in infos:
648 key = (fam, pro)
649 if key not in addr_infos:
650 addr_infos[key] = [None, None]
651 addr_infos[key][idx] = address
652
653 # each addr has to have info for each (family, proto) pair
654 addr_pairs_info = [
655 (key, addr_pair) for key, addr_pair in addr_infos.items()
656 if not ((local_addr and addr_pair[0] is None) or
657 (remote_addr and addr_pair[1] is None))]
658
659 if not addr_pairs_info:
660 raise ValueError('can not get address information')
661
662 exceptions = []
663
664 for ((family, proto),
665 (local_address, remote_address)) in addr_pairs_info:
666 sock = None
667 r_addr = None
668 try:
669 sock = socket.socket(
670 family=family, type=socket.SOCK_DGRAM, proto=proto)
671 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
672 sock.setblocking(False)
673
674 if local_addr:
675 sock.bind(local_address)
676 if remote_addr:
677 yield from self.sock_connect(sock, remote_address)
678 r_addr = remote_address
679 except OSError as exc:
680 if sock is not None:
681 sock.close()
682 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200683 except:
684 if sock is not None:
685 sock.close()
686 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700687 else:
688 break
689 else:
690 raise exceptions[0]
691
692 protocol = protocol_factory()
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200693 waiter = futures.Future(loop=self)
694 transport = self._make_datagram_transport(sock, protocol, r_addr,
695 waiter)
Victor Stinnere912e652014-07-12 03:11:53 +0200696 if self._debug:
697 if local_addr:
698 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
699 "created: (%r, %r)",
700 local_addr, remote_addr, transport, protocol)
701 else:
702 logger.debug("Datagram endpoint remote_addr=%r created: "
703 "(%r, %r)",
704 remote_addr, transport, protocol)
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200705 yield from waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700706 return transport, protocol
707
Victor Stinnerf951d282014-06-29 00:46:45 +0200708 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700709 def create_server(self, protocol_factory, host=None, port=None,
710 *,
711 family=socket.AF_UNSPEC,
712 flags=socket.AI_PASSIVE,
713 sock=None,
714 backlog=100,
715 ssl=None,
716 reuse_address=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200717 """Create a TCP server bound to host and port.
718
Victor Stinneracdb7822014-07-14 18:33:40 +0200719 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +0200720
721 This method is a coroutine.
722 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700723 if isinstance(ssl, bool):
724 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700725 if host is not None or port is not None:
726 if sock is not None:
727 raise ValueError(
728 'host/port and sock can not be specified at the same time')
729
730 AF_INET6 = getattr(socket, 'AF_INET6', 0)
731 if reuse_address is None:
732 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
733 sockets = []
734 if host == '':
735 host = None
736
737 infos = yield from self.getaddrinfo(
738 host, port, family=family,
739 type=socket.SOCK_STREAM, proto=0, flags=flags)
740 if not infos:
741 raise OSError('getaddrinfo() returned empty list')
742
743 completed = False
744 try:
745 for res in infos:
746 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700747 try:
748 sock = socket.socket(af, socktype, proto)
749 except socket.error:
750 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +0200751 if self._debug:
752 logger.warning('create_server() failed to create '
753 'socket.socket(%r, %r, %r)',
754 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -0700755 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700756 sockets.append(sock)
757 if reuse_address:
758 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
759 True)
760 # Disable IPv4/IPv6 dual stack support (enabled by
761 # default on Linux) which makes a single socket
762 # listen on both address families.
763 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
764 sock.setsockopt(socket.IPPROTO_IPV6,
765 socket.IPV6_V6ONLY,
766 True)
767 try:
768 sock.bind(sa)
769 except OSError as err:
770 raise OSError(err.errno, 'error while attempting '
771 'to bind on address %r: %s'
772 % (sa, err.strerror.lower()))
773 completed = True
774 finally:
775 if not completed:
776 for sock in sockets:
777 sock.close()
778 else:
779 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +0200780 raise ValueError('Neither host/port nor sock were specified')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700781 sockets = [sock]
782
783 server = Server(self, sockets)
784 for sock in sockets:
785 sock.listen(backlog)
786 sock.setblocking(False)
787 self._start_serving(protocol_factory, sock, ssl, server)
Victor Stinnere912e652014-07-12 03:11:53 +0200788 if self._debug:
789 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700790 return server
791
Victor Stinnerf951d282014-06-29 00:46:45 +0200792 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700793 def connect_read_pipe(self, protocol_factory, pipe):
794 protocol = protocol_factory()
795 waiter = futures.Future(loop=self)
796 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
797 yield from waiter
Victor Stinneracdb7822014-07-14 18:33:40 +0200798 if self._debug:
799 logger.debug('Read pipe %r connected: (%r, %r)',
800 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700801 return transport, protocol
802
Victor Stinnerf951d282014-06-29 00:46:45 +0200803 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700804 def connect_write_pipe(self, protocol_factory, pipe):
805 protocol = protocol_factory()
806 waiter = futures.Future(loop=self)
807 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
808 yield from waiter
Victor Stinneracdb7822014-07-14 18:33:40 +0200809 if self._debug:
810 logger.debug('Write pipe %r connected: (%r, %r)',
811 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700812 return transport, protocol
813
Victor Stinneracdb7822014-07-14 18:33:40 +0200814 def _log_subprocess(self, msg, stdin, stdout, stderr):
815 info = [msg]
816 if stdin is not None:
817 info.append('stdin=%s' % _format_pipe(stdin))
818 if stdout is not None and stderr == subprocess.STDOUT:
819 info.append('stdout=stderr=%s' % _format_pipe(stdout))
820 else:
821 if stdout is not None:
822 info.append('stdout=%s' % _format_pipe(stdout))
823 if stderr is not None:
824 info.append('stderr=%s' % _format_pipe(stderr))
825 logger.debug(' '.join(info))
826
Victor Stinnerf951d282014-06-29 00:46:45 +0200827 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700828 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
829 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
830 universal_newlines=False, shell=True, bufsize=0,
831 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100832 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800833 raise ValueError("cmd must be a string")
834 if universal_newlines:
835 raise ValueError("universal_newlines must be False")
836 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100837 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800838 if bufsize != 0:
839 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700840 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +0200841 if self._debug:
842 # don't log parameters: they may contain sensitive information
843 # (password) and may be too long
844 debug_log = 'run shell command %r' % cmd
845 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700846 transport = yield from self._make_subprocess_transport(
847 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +0200848 if self._debug:
849 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700850 return transport, protocol
851
Victor Stinnerf951d282014-06-29 00:46:45 +0200852 @coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500853 def subprocess_exec(self, protocol_factory, program, *args,
854 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
855 stderr=subprocess.PIPE, universal_newlines=False,
856 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800857 if universal_newlines:
858 raise ValueError("universal_newlines must be False")
859 if shell:
860 raise ValueError("shell must be False")
861 if bufsize != 0:
862 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100863 popen_args = (program,) + args
864 for arg in popen_args:
865 if not isinstance(arg, (str, bytes)):
866 raise TypeError("program arguments must be "
867 "a bytes or text string, not %s"
868 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700869 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +0200870 if self._debug:
871 # don't log parameters: they may contain sensitive information
872 # (password) and may be too long
873 debug_log = 'execute program %r' % program
874 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700875 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500876 protocol, popen_args, False, stdin, stdout, stderr,
877 bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +0200878 if self._debug:
879 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700880 return transport, protocol
881
Yury Selivanov569efa22014-02-18 18:02:19 -0500882 def set_exception_handler(self, handler):
883 """Set handler as the new event loop exception handler.
884
885 If handler is None, the default exception handler will
886 be set.
887
888 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +0200889 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -0500890 will be a reference to the active event loop, 'context'
891 will be a dict object (see `call_exception_handler()`
892 documentation for details about context).
893 """
894 if handler is not None and not callable(handler):
895 raise TypeError('A callable object or None is expected, '
896 'got {!r}'.format(handler))
897 self._exception_handler = handler
898
899 def default_exception_handler(self, context):
900 """Default exception handler.
901
902 This is called when an exception occurs and no exception
903 handler is set, and can be called by a custom exception
904 handler that wants to defer to the default behavior.
905
Victor Stinneracdb7822014-07-14 18:33:40 +0200906 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -0500907 `call_exception_handler()`.
908 """
909 message = context.get('message')
910 if not message:
911 message = 'Unhandled exception in event loop'
912
913 exception = context.get('exception')
914 if exception is not None:
915 exc_info = (type(exception), exception, exception.__traceback__)
916 else:
917 exc_info = False
918
919 log_lines = [message]
920 for key in sorted(context):
921 if key in {'message', 'exception'}:
922 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +0200923 value = context[key]
924 if key == 'source_traceback':
925 tb = ''.join(traceback.format_list(value))
926 value = 'Object created at (most recent call last):\n'
927 value += tb.rstrip()
928 else:
929 value = repr(value)
930 log_lines.append('{}: {}'.format(key, value))
Yury Selivanov569efa22014-02-18 18:02:19 -0500931
932 logger.error('\n'.join(log_lines), exc_info=exc_info)
933
934 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +0200935 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -0500936
Victor Stinneracdb7822014-07-14 18:33:40 +0200937 The context argument is a dict containing the following keys:
938
Yury Selivanov569efa22014-02-18 18:02:19 -0500939 - 'message': Error message;
940 - 'exception' (optional): Exception object;
941 - 'future' (optional): Future instance;
942 - 'handle' (optional): Handle instance;
943 - 'protocol' (optional): Protocol instance;
944 - 'transport' (optional): Transport instance;
945 - 'socket' (optional): Socket instance.
946
Victor Stinneracdb7822014-07-14 18:33:40 +0200947 New keys maybe introduced in the future.
948
949 Note: do not overload this method in an event loop subclass.
950 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -0500951 `set_exception_handler()` method.
952 """
953 if self._exception_handler is None:
954 try:
955 self.default_exception_handler(context)
956 except Exception:
957 # Second protection layer for unexpected errors
958 # in the default implementation, as well as for subclassed
959 # event loops with overloaded "default_exception_handler".
960 logger.error('Exception in default exception handler',
961 exc_info=True)
962 else:
963 try:
964 self._exception_handler(self, context)
965 except Exception as exc:
966 # Exception in the user set custom exception handler.
967 try:
968 # Let's try default handler.
969 self.default_exception_handler({
970 'message': 'Unhandled error in exception handler',
971 'exception': exc,
972 'context': context,
973 })
974 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +0200975 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -0500976 # overloaded.
977 logger.error('Exception in default exception handler '
978 'while handling an unexpected error '
979 'in custom exception handler',
980 exc_info=True)
981
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700982 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +0200983 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700984 assert isinstance(handle, events.Handle), 'A Handle is required here'
985 if handle._cancelled:
986 return
Yury Selivanov592ada92014-09-25 12:07:56 -0400987 assert not isinstance(handle, events.TimerHandle)
988 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700989
990 def _add_callback_signalsafe(self, handle):
991 """Like _add_callback() but called from a signal handler."""
992 self._add_callback(handle)
993 self._write_to_self()
994
Yury Selivanov592ada92014-09-25 12:07:56 -0400995 def _timer_handle_cancelled(self, handle):
996 """Notification that a TimerHandle has been cancelled."""
997 if handle._scheduled:
998 self._timer_cancelled_count += 1
999
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001000 def _run_once(self):
1001 """Run one full iteration of the event loop.
1002
1003 This calls all currently ready callbacks, polls for I/O,
1004 schedules the resulting callbacks, and finally schedules
1005 'call_later' callbacks.
1006 """
Yury Selivanov592ada92014-09-25 12:07:56 -04001007
Yury Selivanov592ada92014-09-25 12:07:56 -04001008 sched_count = len(self._scheduled)
1009 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
1010 self._timer_cancelled_count / sched_count >
1011 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001012 # Remove delayed calls that were cancelled if their number
1013 # is too high
1014 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001015 for handle in self._scheduled:
1016 if handle._cancelled:
1017 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001018 else:
1019 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001020
Victor Stinner68da8fc2014-09-30 18:08:36 +02001021 heapq.heapify(new_scheduled)
1022 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001023 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001024 else:
1025 # Remove delayed calls that were cancelled from head of queue.
1026 while self._scheduled and self._scheduled[0]._cancelled:
1027 self._timer_cancelled_count -= 1
1028 handle = heapq.heappop(self._scheduled)
1029 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001030
1031 timeout = None
1032 if self._ready:
1033 timeout = 0
1034 elif self._scheduled:
1035 # Compute the desired timeout.
1036 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -07001037 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001038
Victor Stinner770e48d2014-07-11 11:58:33 +02001039 if self._debug and timeout != 0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001040 t0 = self.time()
1041 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001042 dt = self.time() - t0
Victor Stinner770e48d2014-07-11 11:58:33 +02001043 if dt >= 1.0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001044 level = logging.INFO
1045 else:
1046 level = logging.DEBUG
Victor Stinner770e48d2014-07-11 11:58:33 +02001047 nevent = len(event_list)
1048 if timeout is None:
1049 logger.log(level, 'poll took %.3f ms: %s events',
1050 dt * 1e3, nevent)
1051 elif nevent:
1052 logger.log(level,
1053 'poll %.3f ms took %.3f ms: %s events',
1054 timeout * 1e3, dt * 1e3, nevent)
1055 elif dt >= 1.0:
1056 logger.log(level,
1057 'poll %.3f ms took %.3f ms: timeout',
1058 timeout * 1e3, dt * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001059 else:
Victor Stinner22463aa2014-01-20 23:56:40 +01001060 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001061 self._process_events(event_list)
1062
1063 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001064 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001065 while self._scheduled:
1066 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001067 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001068 break
1069 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001070 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001071 self._ready.append(handle)
1072
1073 # This is the only place where callbacks are actually *called*.
1074 # All other places just add them to ready.
1075 # Note: We run all currently scheduled callbacks, but not any
1076 # callbacks scheduled by callbacks run this time around --
1077 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001078 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001079 ntodo = len(self._ready)
1080 for i in range(ntodo):
1081 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001082 if handle._cancelled:
1083 continue
1084 if self._debug:
1085 t0 = self.time()
1086 handle._run()
1087 dt = self.time() - t0
1088 if dt >= self.slow_callback_duration:
1089 logger.warning('Executing %s took %.3f seconds',
1090 _format_handle(handle), dt)
1091 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001092 handle._run()
1093 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001094
1095 def get_debug(self):
1096 return self._debug
1097
1098 def set_debug(self, enabled):
1099 self._debug = enabled