blob: 3cff72abb5286500732a72d9af3ff354c1a56337 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
Victor Stinneracdb7822014-07-14 18:33:40 +02004responsible for notifying us of I/O events) and the event loop proper,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07005which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
Victor Stinner0e6f52a2014-06-20 17:34:15 +020020import inspect
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021import logging
Victor Stinnerb75380f2014-06-30 14:39:11 +020022import os
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070023import socket
24import subprocess
25import time
Victor Stinnerb75380f2014-06-30 14:39:11 +020026import traceback
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027import sys
28
Victor Stinnerf951d282014-06-29 00:46:45 +020029from . import coroutines
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030from . import events
31from . import futures
32from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020033from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070034from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
36
37__all__ = ['BaseEventLoop', 'Server']
38
39
40# Argument for default thread pool executor creation.
41_MAX_WORKERS = 5
42
Yury Selivanov592ada92014-09-25 12:07:56 -040043# Minimum number of _scheduled timer handles before cleanup of
44# cancelled handles is performed.
45_MIN_SCHEDULED_TIMER_HANDLES = 100
46
47# Minimum fraction of _scheduled timer handles that are cancelled
48# before cleanup of cancelled handles is performed.
49_MIN_CANCELLED_TIMER_HANDLES_FRACTION = 0.5
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050
Victor Stinner0e6f52a2014-06-20 17:34:15 +020051def _format_handle(handle):
52 cb = handle._callback
53 if inspect.ismethod(cb) and isinstance(cb.__self__, tasks.Task):
54 # format the task
55 return repr(cb.__self__)
56 else:
57 return str(handle)
58
59
Victor Stinneracdb7822014-07-14 18:33:40 +020060def _format_pipe(fd):
61 if fd == subprocess.PIPE:
62 return '<pipe>'
63 elif fd == subprocess.STDOUT:
64 return '<stdout>'
65 else:
66 return repr(fd)
67
68
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069class _StopError(BaseException):
70 """Raised to stop the event loop."""
71
72
Victor Stinner1b0580b2014-02-13 09:24:37 +010073def _check_resolved_address(sock, address):
74 # Ensure that the address is already resolved to avoid the trap of hanging
75 # the entire event loop when the address requires doing a DNS lookup.
76 family = sock.family
Victor Stinnerd1a727a2014-02-20 16:43:09 +010077 if family == socket.AF_INET:
78 host, port = address
79 elif family == socket.AF_INET6:
Victor Stinner934c8852014-02-20 21:59:38 +010080 host, port = address[:2]
Victor Stinnerd1a727a2014-02-20 16:43:09 +010081 else:
Victor Stinner1b0580b2014-02-13 09:24:37 +010082 return
83
Victor Stinner1b0580b2014-02-13 09:24:37 +010084 type_mask = 0
85 if hasattr(socket, 'SOCK_NONBLOCK'):
86 type_mask |= socket.SOCK_NONBLOCK
87 if hasattr(socket, 'SOCK_CLOEXEC'):
88 type_mask |= socket.SOCK_CLOEXEC
Victor Stinneracdb7822014-07-14 18:33:40 +020089 # Use getaddrinfo(flags=AI_NUMERICHOST) to ensure that the address is
Victor Stinner1b0580b2014-02-13 09:24:37 +010090 # already resolved.
91 try:
92 socket.getaddrinfo(host, port,
93 family=family,
94 type=(sock.type & ~type_mask),
95 proto=sock.proto,
96 flags=socket.AI_NUMERICHOST)
97 except socket.gaierror as err:
98 raise ValueError("address must be resolved (IP address), got %r: %s"
99 % (address, err))
100
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700101def _raise_stop_error(*args):
102 raise _StopError
103
104
105class Server(events.AbstractServer):
106
107 def __init__(self, loop, sockets):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200108 self._loop = loop
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700109 self.sockets = sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200110 self._active_count = 0
111 self._waiters = []
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700112
Victor Stinnere912e652014-07-12 03:11:53 +0200113 def __repr__(self):
114 return '<%s sockets=%r>' % (self.__class__.__name__, self.sockets)
115
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200116 def _attach(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117 assert self.sockets is not None
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200118 self._active_count += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200120 def _detach(self):
121 assert self._active_count > 0
122 self._active_count -= 1
123 if self._active_count == 0 and self.sockets is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124 self._wakeup()
125
126 def close(self):
127 sockets = self.sockets
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200128 if sockets is None:
129 return
130 self.sockets = None
131 for sock in sockets:
132 self._loop._stop_serving(sock)
133 if self._active_count == 0:
134 self._wakeup()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700135
136 def _wakeup(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200137 waiters = self._waiters
138 self._waiters = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700139 for waiter in waiters:
140 if not waiter.done():
141 waiter.set_result(waiter)
142
Victor Stinnerf951d282014-06-29 00:46:45 +0200143 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700144 def wait_closed(self):
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200145 if self.sockets is None or self._waiters is None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700146 return
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200147 waiter = futures.Future(loop=self._loop)
148 self._waiters.append(waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149 yield from waiter
150
151
152class BaseEventLoop(events.AbstractEventLoop):
153
154 def __init__(self):
Yury Selivanov592ada92014-09-25 12:07:56 -0400155 self._timer_cancelled_count = 0
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200156 self._closed = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700157 self._ready = collections.deque()
158 self._scheduled = []
159 self._default_executor = None
160 self._internal_fds = 0
161 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100162 self._clock_resolution = time.get_clock_info('monotonic').resolution
Yury Selivanov569efa22014-02-18 18:02:19 -0500163 self._exception_handler = None
Victor Stinner7b7120e2014-06-23 00:12:14 +0200164 self._debug = (not sys.flags.ignore_environment
165 and bool(os.environ.get('PYTHONASYNCIODEBUG')))
Victor Stinner0e6f52a2014-06-20 17:34:15 +0200166 # In debug mode, if the execution of a callback or a step of a task
167 # exceed this duration in seconds, the slow callback/task is logged.
168 self.slow_callback_duration = 0.1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700169
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200170 def __repr__(self):
171 return ('<%s running=%s closed=%s debug=%s>'
172 % (self.__class__.__name__, self.is_running(),
173 self.is_closed(), self.get_debug()))
174
Victor Stinner896a25a2014-07-08 11:29:25 +0200175 def create_task(self, coro):
176 """Schedule a coroutine object.
177
Victor Stinneracdb7822014-07-14 18:33:40 +0200178 Return a task object.
179 """
Victor Stinnerc39ba7d2014-07-11 00:21:27 +0200180 task = tasks.Task(coro, loop=self)
181 if task._source_traceback:
182 del task._source_traceback[-1]
183 return task
Victor Stinner896a25a2014-07-08 11:29:25 +0200184
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700185 def _make_socket_transport(self, sock, protocol, waiter=None, *,
186 extra=None, server=None):
187 """Create socket transport."""
188 raise NotImplementedError
189
190 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
191 server_side=False, server_hostname=None,
192 extra=None, server=None):
193 """Create SSL transport."""
194 raise NotImplementedError
195
196 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200197 address=None, waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700198 """Create datagram transport."""
199 raise NotImplementedError
200
201 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
202 extra=None):
203 """Create read pipe transport."""
204 raise NotImplementedError
205
206 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
207 extra=None):
208 """Create write pipe transport."""
209 raise NotImplementedError
210
Victor Stinnerf951d282014-06-29 00:46:45 +0200211 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700212 def _make_subprocess_transport(self, protocol, args, shell,
213 stdin, stdout, stderr, bufsize,
214 extra=None, **kwargs):
215 """Create subprocess transport."""
216 raise NotImplementedError
217
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218 def _write_to_self(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200219 """Write a byte to self-pipe, to wake up the event loop.
220
221 This may be called from a different thread.
222
223 The subclass is responsible for implementing the self-pipe.
224 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700225 raise NotImplementedError
226
227 def _process_events(self, event_list):
228 """Process selector events."""
229 raise NotImplementedError
230
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200231 def _check_closed(self):
232 if self._closed:
233 raise RuntimeError('Event loop is closed')
234
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700235 def run_forever(self):
236 """Run until stop() is called."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200237 self._check_closed()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700238 if self._running:
239 raise RuntimeError('Event loop is running.')
240 self._running = True
241 try:
242 while True:
243 try:
244 self._run_once()
245 except _StopError:
246 break
247 finally:
248 self._running = False
249
250 def run_until_complete(self, future):
251 """Run until the Future is done.
252
253 If the argument is a coroutine, it is wrapped in a Task.
254
Victor Stinneracdb7822014-07-14 18:33:40 +0200255 WARNING: It would be disastrous to call run_until_complete()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700256 with the same coroutine twice -- it would wrap it in two
257 different Tasks and that can't be good.
258
259 Return the Future's result, or raise its exception.
260 """
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200261 self._check_closed()
Victor Stinner98b63912014-06-30 14:51:04 +0200262
263 new_task = not isinstance(future, futures.Future)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264 future = tasks.async(future, loop=self)
Victor Stinner98b63912014-06-30 14:51:04 +0200265 if new_task:
266 # An exception is raised if the future didn't complete, so there
267 # is no need to log the "destroy pending task" message
268 future._log_destroy_pending = False
269
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700270 future.add_done_callback(_raise_stop_error)
271 self.run_forever()
272 future.remove_done_callback(_raise_stop_error)
273 if not future.done():
274 raise RuntimeError('Event loop stopped before Future completed.')
275
276 return future.result()
277
278 def stop(self):
279 """Stop running the event loop.
280
Victor Stinner5006b1f2014-07-24 11:34:11 +0200281 Every callback scheduled before stop() is called will run. Callbacks
282 scheduled after stop() is called will not run. However, those callbacks
283 will run if run_forever is called again later.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284 """
285 self.call_soon(_raise_stop_error)
286
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200287 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700288 """Close the event loop.
289
290 This clears the queues and shuts down the executor,
291 but does not wait for the executor to finish.
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200292
293 The event loop must not be running.
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700294 """
Victor Stinnerf328c7d2014-06-23 01:02:37 +0200295 if self._running:
Victor Stinneracdb7822014-07-14 18:33:40 +0200296 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200297 if self._closed:
298 return
Victor Stinnere912e652014-07-12 03:11:53 +0200299 if self._debug:
300 logger.debug("Close %r", self)
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200301 self._closed = True
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200302 self._ready.clear()
303 self._scheduled.clear()
304 executor = self._default_executor
305 if executor is not None:
306 self._default_executor = None
307 executor.shutdown(wait=False)
308
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200309 def is_closed(self):
310 """Returns True if the event loop was closed."""
311 return self._closed
312
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700313 def is_running(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200314 """Returns True if the event loop is running."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315 return self._running
316
317 def time(self):
Victor Stinneracdb7822014-07-14 18:33:40 +0200318 """Return the time according to the event loop's clock.
319
320 This is a float expressed in seconds since an epoch, but the
321 epoch, precision, accuracy and drift are unspecified and may
322 differ per event loop.
323 """
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324 return time.monotonic()
325
326 def call_later(self, delay, callback, *args):
327 """Arrange for a callback to be called at a given time.
328
329 Return a Handle: an opaque object with a cancel() method that
330 can be used to cancel the call.
331
332 The delay can be an int or float, expressed in seconds. It is
Victor Stinneracdb7822014-07-14 18:33:40 +0200333 always relative to the current time.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700334
335 Each callback will be called exactly once. If two callbacks
336 are scheduled for exactly the same time, it undefined which
337 will be called first.
338
339 Any positional arguments after the callback will be passed to
340 the callback when it is called.
341 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200342 timer = self.call_at(self.time() + delay, callback, *args)
343 if timer._source_traceback:
344 del timer._source_traceback[-1]
345 return timer
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700346
347 def call_at(self, when, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200348 """Like call_later(), but uses an absolute time.
349
350 Absolute time corresponds to the event loop's time() method.
351 """
Victor Stinnerf951d282014-06-29 00:46:45 +0200352 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100353 raise TypeError("coroutines cannot be used with call_at()")
Victor Stinner93569c22014-03-21 10:00:52 +0100354 if self._debug:
355 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500356 timer = events.TimerHandle(when, callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200357 if timer._source_traceback:
358 del timer._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700359 heapq.heappush(self._scheduled, timer)
Yury Selivanov592ada92014-09-25 12:07:56 -0400360 timer._scheduled = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700361 return timer
362
363 def call_soon(self, callback, *args):
364 """Arrange for a callback to be called as soon as possible.
365
Victor Stinneracdb7822014-07-14 18:33:40 +0200366 This operates as a FIFO queue: callbacks are called in the
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700367 order in which they are registered. Each callback will be
368 called exactly once.
369
370 Any positional arguments after the callback will be passed to
371 the callback when it is called.
372 """
Victor Stinner80f53aa2014-06-27 13:52:20 +0200373 handle = self._call_soon(callback, args, check_loop=True)
374 if handle._source_traceback:
375 del handle._source_traceback[-1]
376 return handle
Victor Stinner93569c22014-03-21 10:00:52 +0100377
378 def _call_soon(self, callback, args, check_loop):
Victor Stinnerf951d282014-06-29 00:46:45 +0200379 if coroutines.iscoroutinefunction(callback):
Victor Stinner9af4a242014-02-11 11:34:30 +0100380 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinner93569c22014-03-21 10:00:52 +0100381 if self._debug and check_loop:
382 self._assert_is_current_event_loop()
Yury Selivanov569efa22014-02-18 18:02:19 -0500383 handle = events.Handle(callback, args, self)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200384 if handle._source_traceback:
385 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 self._ready.append(handle)
387 return handle
388
Victor Stinner93569c22014-03-21 10:00:52 +0100389 def _assert_is_current_event_loop(self):
390 """Asserts that this event loop is the current event loop.
391
Victor Stinneracdb7822014-07-14 18:33:40 +0200392 Non-thread-safe methods of this class make this assumption and will
Victor Stinner93569c22014-03-21 10:00:52 +0100393 likely behave incorrectly when the assumption is violated.
394
Victor Stinneracdb7822014-07-14 18:33:40 +0200395 Should only be called when (self._debug == True). The caller is
Victor Stinner93569c22014-03-21 10:00:52 +0100396 responsible for checking this condition for performance reasons.
397 """
Victor Stinner751c7c02014-06-23 15:14:13 +0200398 try:
399 current = events.get_event_loop()
400 except AssertionError:
401 return
402 if current is not self:
Victor Stinner93569c22014-03-21 10:00:52 +0100403 raise RuntimeError(
Victor Stinneracdb7822014-07-14 18:33:40 +0200404 "Non-thread-safe operation invoked on an event loop other "
Victor Stinner93569c22014-03-21 10:00:52 +0100405 "than the current one")
406
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700407 def call_soon_threadsafe(self, callback, *args):
Victor Stinneracdb7822014-07-14 18:33:40 +0200408 """Like call_soon(), but thread-safe."""
Victor Stinner93569c22014-03-21 10:00:52 +0100409 handle = self._call_soon(callback, args, check_loop=False)
Victor Stinner80f53aa2014-06-27 13:52:20 +0200410 if handle._source_traceback:
411 del handle._source_traceback[-1]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700412 self._write_to_self()
413 return handle
414
415 def run_in_executor(self, executor, callback, *args):
Victor Stinnerf951d282014-06-29 00:46:45 +0200416 if coroutines.iscoroutinefunction(callback):
Victor Stinneracdb7822014-07-14 18:33:40 +0200417 raise TypeError("Coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700418 if isinstance(callback, events.Handle):
419 assert not args
420 assert not isinstance(callback, events.TimerHandle)
421 if callback._cancelled:
422 f = futures.Future(loop=self)
423 f.set_result(None)
424 return f
425 callback, args = callback._callback, callback._args
426 if executor is None:
427 executor = self._default_executor
428 if executor is None:
429 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
430 self._default_executor = executor
431 return futures.wrap_future(executor.submit(callback, *args), loop=self)
432
433 def set_default_executor(self, executor):
434 self._default_executor = executor
435
Victor Stinnere912e652014-07-12 03:11:53 +0200436 def _getaddrinfo_debug(self, host, port, family, type, proto, flags):
437 msg = ["%s:%r" % (host, port)]
438 if family:
439 msg.append('family=%r' % family)
440 if type:
441 msg.append('type=%r' % type)
442 if proto:
443 msg.append('proto=%r' % proto)
444 if flags:
445 msg.append('flags=%r' % flags)
446 msg = ', '.join(msg)
Victor Stinneracdb7822014-07-14 18:33:40 +0200447 logger.debug('Get address info %s', msg)
Victor Stinnere912e652014-07-12 03:11:53 +0200448
449 t0 = self.time()
450 addrinfo = socket.getaddrinfo(host, port, family, type, proto, flags)
451 dt = self.time() - t0
452
Victor Stinneracdb7822014-07-14 18:33:40 +0200453 msg = ('Getting address info %s took %.3f ms: %r'
Victor Stinnere912e652014-07-12 03:11:53 +0200454 % (msg, dt * 1e3, addrinfo))
455 if dt >= self.slow_callback_duration:
456 logger.info(msg)
457 else:
458 logger.debug(msg)
459 return addrinfo
460
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700461 def getaddrinfo(self, host, port, *,
462 family=0, type=0, proto=0, flags=0):
Victor Stinnere912e652014-07-12 03:11:53 +0200463 if self._debug:
464 return self.run_in_executor(None, self._getaddrinfo_debug,
465 host, port, family, type, proto, flags)
466 else:
467 return self.run_in_executor(None, socket.getaddrinfo,
468 host, port, family, type, proto, flags)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469
470 def getnameinfo(self, sockaddr, flags=0):
471 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
472
Victor Stinnerf951d282014-06-29 00:46:45 +0200473 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474 def create_connection(self, protocol_factory, host=None, port=None, *,
475 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700476 local_addr=None, server_hostname=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200477 """Connect to a TCP server.
478
479 Create a streaming transport connection to a given Internet host and
480 port: socket family AF_INET or socket.AF_INET6 depending on host (or
481 family if specified), socket type SOCK_STREAM. protocol_factory must be
482 a callable returning a protocol instance.
483
484 This method is a coroutine which will try to establish the connection
485 in the background. When successful, the coroutine returns a
486 (transport, protocol) pair.
487 """
Guido van Rossum21c85a72013-11-01 14:16:54 -0700488 if server_hostname is not None and not ssl:
489 raise ValueError('server_hostname is only meaningful with ssl')
490
491 if server_hostname is None and ssl:
492 # Use host as default for server_hostname. It is an error
493 # if host is empty or not set, e.g. when an
494 # already-connected socket was passed or when only a port
495 # is given. To avoid this error, you can pass
496 # server_hostname='' -- this will bypass the hostname
497 # check. (This also means that if host is a numeric
498 # IP/IPv6 address, we will attempt to verify that exact
499 # address; this will probably fail, but it is possible to
500 # create a certificate for a specific IP address, so we
501 # don't judge it here.)
502 if not host:
503 raise ValueError('You must set server_hostname '
504 'when using ssl without a host')
505 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700506
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700507 if host is not None or port is not None:
508 if sock is not None:
509 raise ValueError(
510 'host/port and sock can not be specified at the same time')
511
512 f1 = self.getaddrinfo(
513 host, port, family=family,
514 type=socket.SOCK_STREAM, proto=proto, flags=flags)
515 fs = [f1]
516 if local_addr is not None:
517 f2 = self.getaddrinfo(
518 *local_addr, family=family,
519 type=socket.SOCK_STREAM, proto=proto, flags=flags)
520 fs.append(f2)
521 else:
522 f2 = None
523
524 yield from tasks.wait(fs, loop=self)
525
526 infos = f1.result()
527 if not infos:
528 raise OSError('getaddrinfo() returned empty list')
529 if f2 is not None:
530 laddr_infos = f2.result()
531 if not laddr_infos:
532 raise OSError('getaddrinfo() returned empty list')
533
534 exceptions = []
535 for family, type, proto, cname, address in infos:
536 try:
537 sock = socket.socket(family=family, type=type, proto=proto)
538 sock.setblocking(False)
539 if f2 is not None:
540 for _, _, _, _, laddr in laddr_infos:
541 try:
542 sock.bind(laddr)
543 break
544 except OSError as exc:
545 exc = OSError(
546 exc.errno, 'error while '
547 'attempting to bind on address '
548 '{!r}: {}'.format(
549 laddr, exc.strerror.lower()))
550 exceptions.append(exc)
551 else:
552 sock.close()
553 sock = None
554 continue
Victor Stinnere912e652014-07-12 03:11:53 +0200555 if self._debug:
556 logger.debug("connect %r to %r", sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700557 yield from self.sock_connect(sock, address)
558 except OSError as exc:
559 if sock is not None:
560 sock.close()
561 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200562 except:
563 if sock is not None:
564 sock.close()
565 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566 else:
567 break
568 else:
569 if len(exceptions) == 1:
570 raise exceptions[0]
571 else:
572 # If they all have the same str(), raise one.
573 model = str(exceptions[0])
574 if all(str(exc) == model for exc in exceptions):
575 raise exceptions[0]
576 # Raise a combined exception so the user can see all
577 # the various error messages.
578 raise OSError('Multiple exceptions: {}'.format(
579 ', '.join(str(exc) for exc in exceptions)))
580
581 elif sock is None:
582 raise ValueError(
583 'host and port was not specified and no sock specified')
584
585 sock.setblocking(False)
586
Yury Selivanovb057c522014-02-18 12:15:06 -0500587 transport, protocol = yield from self._create_connection_transport(
588 sock, protocol_factory, ssl, server_hostname)
Victor Stinnere912e652014-07-12 03:11:53 +0200589 if self._debug:
Victor Stinnerb2614752014-08-25 23:20:52 +0200590 # Get the socket from the transport because SSL transport closes
591 # the old socket and creates a new SSL socket
592 sock = transport.get_extra_info('socket')
Victor Stinneracdb7822014-07-14 18:33:40 +0200593 logger.debug("%r connected to %s:%r: (%r, %r)",
594 sock, host, port, transport, protocol)
Yury Selivanovb057c522014-02-18 12:15:06 -0500595 return transport, protocol
596
Victor Stinnerf951d282014-06-29 00:46:45 +0200597 @coroutine
Yury Selivanovb057c522014-02-18 12:15:06 -0500598 def _create_connection_transport(self, sock, protocol_factory, ssl,
599 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700600 protocol = protocol_factory()
601 waiter = futures.Future(loop=self)
602 if ssl:
603 sslcontext = None if isinstance(ssl, bool) else ssl
604 transport = self._make_ssl_transport(
605 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700606 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607 else:
608 transport = self._make_socket_transport(sock, protocol, waiter)
609
610 yield from waiter
611 return transport, protocol
612
Victor Stinnerf951d282014-06-29 00:46:45 +0200613 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700614 def create_datagram_endpoint(self, protocol_factory,
615 local_addr=None, remote_addr=None, *,
616 family=0, proto=0, flags=0):
617 """Create datagram connection."""
618 if not (local_addr or remote_addr):
619 if family == 0:
620 raise ValueError('unexpected address family')
621 addr_pairs_info = (((family, proto), (None, None)),)
622 else:
Victor Stinneracdb7822014-07-14 18:33:40 +0200623 # join address by (family, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700624 addr_infos = collections.OrderedDict()
625 for idx, addr in ((0, local_addr), (1, remote_addr)):
626 if addr is not None:
627 assert isinstance(addr, tuple) and len(addr) == 2, (
628 '2-tuple is expected')
629
630 infos = yield from self.getaddrinfo(
631 *addr, family=family, type=socket.SOCK_DGRAM,
632 proto=proto, flags=flags)
633 if not infos:
634 raise OSError('getaddrinfo() returned empty list')
635
636 for fam, _, pro, _, address in infos:
637 key = (fam, pro)
638 if key not in addr_infos:
639 addr_infos[key] = [None, None]
640 addr_infos[key][idx] = address
641
642 # each addr has to have info for each (family, proto) pair
643 addr_pairs_info = [
644 (key, addr_pair) for key, addr_pair in addr_infos.items()
645 if not ((local_addr and addr_pair[0] is None) or
646 (remote_addr and addr_pair[1] is None))]
647
648 if not addr_pairs_info:
649 raise ValueError('can not get address information')
650
651 exceptions = []
652
653 for ((family, proto),
654 (local_address, remote_address)) in addr_pairs_info:
655 sock = None
656 r_addr = None
657 try:
658 sock = socket.socket(
659 family=family, type=socket.SOCK_DGRAM, proto=proto)
660 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
661 sock.setblocking(False)
662
663 if local_addr:
664 sock.bind(local_address)
665 if remote_addr:
666 yield from self.sock_connect(sock, remote_address)
667 r_addr = remote_address
668 except OSError as exc:
669 if sock is not None:
670 sock.close()
671 exceptions.append(exc)
Victor Stinner223a6242014-06-04 00:11:52 +0200672 except:
673 if sock is not None:
674 sock.close()
675 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700676 else:
677 break
678 else:
679 raise exceptions[0]
680
681 protocol = protocol_factory()
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200682 waiter = futures.Future(loop=self)
683 transport = self._make_datagram_transport(sock, protocol, r_addr,
684 waiter)
Victor Stinnere912e652014-07-12 03:11:53 +0200685 if self._debug:
686 if local_addr:
687 logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
688 "created: (%r, %r)",
689 local_addr, remote_addr, transport, protocol)
690 else:
691 logger.debug("Datagram endpoint remote_addr=%r created: "
692 "(%r, %r)",
693 remote_addr, transport, protocol)
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200694 yield from waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700695 return transport, protocol
696
Victor Stinnerf951d282014-06-29 00:46:45 +0200697 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700698 def create_server(self, protocol_factory, host=None, port=None,
699 *,
700 family=socket.AF_UNSPEC,
701 flags=socket.AI_PASSIVE,
702 sock=None,
703 backlog=100,
704 ssl=None,
705 reuse_address=None):
Victor Stinnerd1432092014-06-19 17:11:49 +0200706 """Create a TCP server bound to host and port.
707
Victor Stinneracdb7822014-07-14 18:33:40 +0200708 Return a Server object which can be used to stop the service.
Victor Stinnerd1432092014-06-19 17:11:49 +0200709
710 This method is a coroutine.
711 """
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700712 if isinstance(ssl, bool):
713 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700714 if host is not None or port is not None:
715 if sock is not None:
716 raise ValueError(
717 'host/port and sock can not be specified at the same time')
718
719 AF_INET6 = getattr(socket, 'AF_INET6', 0)
720 if reuse_address is None:
721 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
722 sockets = []
723 if host == '':
724 host = None
725
726 infos = yield from self.getaddrinfo(
727 host, port, family=family,
728 type=socket.SOCK_STREAM, proto=0, flags=flags)
729 if not infos:
730 raise OSError('getaddrinfo() returned empty list')
731
732 completed = False
733 try:
734 for res in infos:
735 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700736 try:
737 sock = socket.socket(af, socktype, proto)
738 except socket.error:
739 # Assume it's a bad family/type/protocol combination.
Victor Stinnerb2614752014-08-25 23:20:52 +0200740 if self._debug:
741 logger.warning('create_server() failed to create '
742 'socket.socket(%r, %r, %r)',
743 af, socktype, proto, exc_info=True)
Guido van Rossum32e46852013-10-19 17:04:25 -0700744 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700745 sockets.append(sock)
746 if reuse_address:
747 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
748 True)
749 # Disable IPv4/IPv6 dual stack support (enabled by
750 # default on Linux) which makes a single socket
751 # listen on both address families.
752 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
753 sock.setsockopt(socket.IPPROTO_IPV6,
754 socket.IPV6_V6ONLY,
755 True)
756 try:
757 sock.bind(sa)
758 except OSError as err:
759 raise OSError(err.errno, 'error while attempting '
760 'to bind on address %r: %s'
761 % (sa, err.strerror.lower()))
762 completed = True
763 finally:
764 if not completed:
765 for sock in sockets:
766 sock.close()
767 else:
768 if sock is None:
Victor Stinneracdb7822014-07-14 18:33:40 +0200769 raise ValueError('Neither host/port nor sock were specified')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700770 sockets = [sock]
771
772 server = Server(self, sockets)
773 for sock in sockets:
774 sock.listen(backlog)
775 sock.setblocking(False)
776 self._start_serving(protocol_factory, sock, ssl, server)
Victor Stinnere912e652014-07-12 03:11:53 +0200777 if self._debug:
778 logger.info("%r is serving", server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700779 return server
780
Victor Stinnerf951d282014-06-29 00:46:45 +0200781 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700782 def connect_read_pipe(self, protocol_factory, pipe):
783 protocol = protocol_factory()
784 waiter = futures.Future(loop=self)
785 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
786 yield from waiter
Victor Stinneracdb7822014-07-14 18:33:40 +0200787 if self._debug:
788 logger.debug('Read pipe %r connected: (%r, %r)',
789 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700790 return transport, protocol
791
Victor Stinnerf951d282014-06-29 00:46:45 +0200792 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700793 def connect_write_pipe(self, protocol_factory, pipe):
794 protocol = protocol_factory()
795 waiter = futures.Future(loop=self)
796 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
797 yield from waiter
Victor Stinneracdb7822014-07-14 18:33:40 +0200798 if self._debug:
799 logger.debug('Write pipe %r connected: (%r, %r)',
800 pipe.fileno(), transport, protocol)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700801 return transport, protocol
802
Victor Stinneracdb7822014-07-14 18:33:40 +0200803 def _log_subprocess(self, msg, stdin, stdout, stderr):
804 info = [msg]
805 if stdin is not None:
806 info.append('stdin=%s' % _format_pipe(stdin))
807 if stdout is not None and stderr == subprocess.STDOUT:
808 info.append('stdout=stderr=%s' % _format_pipe(stdout))
809 else:
810 if stdout is not None:
811 info.append('stdout=%s' % _format_pipe(stdout))
812 if stderr is not None:
813 info.append('stderr=%s' % _format_pipe(stderr))
814 logger.debug(' '.join(info))
815
Victor Stinnerf951d282014-06-29 00:46:45 +0200816 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700817 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
818 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
819 universal_newlines=False, shell=True, bufsize=0,
820 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100821 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800822 raise ValueError("cmd must be a string")
823 if universal_newlines:
824 raise ValueError("universal_newlines must be False")
825 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100826 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800827 if bufsize != 0:
828 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700829 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +0200830 if self._debug:
831 # don't log parameters: they may contain sensitive information
832 # (password) and may be too long
833 debug_log = 'run shell command %r' % cmd
834 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700835 transport = yield from self._make_subprocess_transport(
836 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +0200837 if self._debug:
838 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700839 return transport, protocol
840
Victor Stinnerf951d282014-06-29 00:46:45 +0200841 @coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500842 def subprocess_exec(self, protocol_factory, program, *args,
843 stdin=subprocess.PIPE, stdout=subprocess.PIPE,
844 stderr=subprocess.PIPE, universal_newlines=False,
845 shell=False, bufsize=0, **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800846 if universal_newlines:
847 raise ValueError("universal_newlines must be False")
848 if shell:
849 raise ValueError("shell must be False")
850 if bufsize != 0:
851 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100852 popen_args = (program,) + args
853 for arg in popen_args:
854 if not isinstance(arg, (str, bytes)):
855 raise TypeError("program arguments must be "
856 "a bytes or text string, not %s"
857 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700858 protocol = protocol_factory()
Victor Stinneracdb7822014-07-14 18:33:40 +0200859 if self._debug:
860 # don't log parameters: they may contain sensitive information
861 # (password) and may be too long
862 debug_log = 'execute program %r' % program
863 self._log_subprocess(debug_log, stdin, stdout, stderr)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700864 transport = yield from self._make_subprocess_transport(
Yury Selivanov57797522014-02-18 22:56:15 -0500865 protocol, popen_args, False, stdin, stdout, stderr,
866 bufsize, **kwargs)
Victor Stinneracdb7822014-07-14 18:33:40 +0200867 if self._debug:
868 logger.info('%s: %r' % (debug_log, transport))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700869 return transport, protocol
870
Yury Selivanov569efa22014-02-18 18:02:19 -0500871 def set_exception_handler(self, handler):
872 """Set handler as the new event loop exception handler.
873
874 If handler is None, the default exception handler will
875 be set.
876
877 If handler is a callable object, it should have a
Victor Stinneracdb7822014-07-14 18:33:40 +0200878 signature matching '(loop, context)', where 'loop'
Yury Selivanov569efa22014-02-18 18:02:19 -0500879 will be a reference to the active event loop, 'context'
880 will be a dict object (see `call_exception_handler()`
881 documentation for details about context).
882 """
883 if handler is not None and not callable(handler):
884 raise TypeError('A callable object or None is expected, '
885 'got {!r}'.format(handler))
886 self._exception_handler = handler
887
888 def default_exception_handler(self, context):
889 """Default exception handler.
890
891 This is called when an exception occurs and no exception
892 handler is set, and can be called by a custom exception
893 handler that wants to defer to the default behavior.
894
Victor Stinneracdb7822014-07-14 18:33:40 +0200895 The context parameter has the same meaning as in
Yury Selivanov569efa22014-02-18 18:02:19 -0500896 `call_exception_handler()`.
897 """
898 message = context.get('message')
899 if not message:
900 message = 'Unhandled exception in event loop'
901
902 exception = context.get('exception')
903 if exception is not None:
904 exc_info = (type(exception), exception, exception.__traceback__)
905 else:
906 exc_info = False
907
908 log_lines = [message]
909 for key in sorted(context):
910 if key in {'message', 'exception'}:
911 continue
Victor Stinner80f53aa2014-06-27 13:52:20 +0200912 value = context[key]
913 if key == 'source_traceback':
914 tb = ''.join(traceback.format_list(value))
915 value = 'Object created at (most recent call last):\n'
916 value += tb.rstrip()
917 else:
918 value = repr(value)
919 log_lines.append('{}: {}'.format(key, value))
Yury Selivanov569efa22014-02-18 18:02:19 -0500920
921 logger.error('\n'.join(log_lines), exc_info=exc_info)
922
923 def call_exception_handler(self, context):
Victor Stinneracdb7822014-07-14 18:33:40 +0200924 """Call the current event loop's exception handler.
Yury Selivanov569efa22014-02-18 18:02:19 -0500925
Victor Stinneracdb7822014-07-14 18:33:40 +0200926 The context argument is a dict containing the following keys:
927
Yury Selivanov569efa22014-02-18 18:02:19 -0500928 - 'message': Error message;
929 - 'exception' (optional): Exception object;
930 - 'future' (optional): Future instance;
931 - 'handle' (optional): Handle instance;
932 - 'protocol' (optional): Protocol instance;
933 - 'transport' (optional): Transport instance;
934 - 'socket' (optional): Socket instance.
935
Victor Stinneracdb7822014-07-14 18:33:40 +0200936 New keys maybe introduced in the future.
937
938 Note: do not overload this method in an event loop subclass.
939 For custom exception handling, use the
Yury Selivanov569efa22014-02-18 18:02:19 -0500940 `set_exception_handler()` method.
941 """
942 if self._exception_handler is None:
943 try:
944 self.default_exception_handler(context)
945 except Exception:
946 # Second protection layer for unexpected errors
947 # in the default implementation, as well as for subclassed
948 # event loops with overloaded "default_exception_handler".
949 logger.error('Exception in default exception handler',
950 exc_info=True)
951 else:
952 try:
953 self._exception_handler(self, context)
954 except Exception as exc:
955 # Exception in the user set custom exception handler.
956 try:
957 # Let's try default handler.
958 self.default_exception_handler({
959 'message': 'Unhandled error in exception handler',
960 'exception': exc,
961 'context': context,
962 })
963 except Exception:
Victor Stinneracdb7822014-07-14 18:33:40 +0200964 # Guard 'default_exception_handler' in case it is
Yury Selivanov569efa22014-02-18 18:02:19 -0500965 # overloaded.
966 logger.error('Exception in default exception handler '
967 'while handling an unexpected error '
968 'in custom exception handler',
969 exc_info=True)
970
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700971 def _add_callback(self, handle):
Victor Stinneracdb7822014-07-14 18:33:40 +0200972 """Add a Handle to _scheduled (TimerHandle) or _ready."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700973 assert isinstance(handle, events.Handle), 'A Handle is required here'
974 if handle._cancelled:
975 return
Yury Selivanov592ada92014-09-25 12:07:56 -0400976 assert not isinstance(handle, events.TimerHandle)
977 self._ready.append(handle)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700978
979 def _add_callback_signalsafe(self, handle):
980 """Like _add_callback() but called from a signal handler."""
981 self._add_callback(handle)
982 self._write_to_self()
983
Yury Selivanov592ada92014-09-25 12:07:56 -0400984 def _timer_handle_cancelled(self, handle):
985 """Notification that a TimerHandle has been cancelled."""
986 if handle._scheduled:
987 self._timer_cancelled_count += 1
988
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700989 def _run_once(self):
990 """Run one full iteration of the event loop.
991
992 This calls all currently ready callbacks, polls for I/O,
993 schedules the resulting callbacks, and finally schedules
994 'call_later' callbacks.
995 """
Yury Selivanov592ada92014-09-25 12:07:56 -0400996
Yury Selivanov592ada92014-09-25 12:07:56 -0400997 sched_count = len(self._scheduled)
998 if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
999 self._timer_cancelled_count / sched_count >
1000 _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
Victor Stinner68da8fc2014-09-30 18:08:36 +02001001 # Remove delayed calls that were cancelled if their number
1002 # is too high
1003 new_scheduled = []
Yury Selivanov592ada92014-09-25 12:07:56 -04001004 for handle in self._scheduled:
1005 if handle._cancelled:
1006 handle._scheduled = False
Victor Stinner68da8fc2014-09-30 18:08:36 +02001007 else:
1008 new_scheduled.append(handle)
Yury Selivanov592ada92014-09-25 12:07:56 -04001009
Victor Stinner68da8fc2014-09-30 18:08:36 +02001010 heapq.heapify(new_scheduled)
1011 self._scheduled = new_scheduled
Yury Selivanov592ada92014-09-25 12:07:56 -04001012 self._timer_cancelled_count = 0
Yury Selivanov592ada92014-09-25 12:07:56 -04001013 else:
1014 # Remove delayed calls that were cancelled from head of queue.
1015 while self._scheduled and self._scheduled[0]._cancelled:
1016 self._timer_cancelled_count -= 1
1017 handle = heapq.heappop(self._scheduled)
1018 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001019
1020 timeout = None
1021 if self._ready:
1022 timeout = 0
1023 elif self._scheduled:
1024 # Compute the desired timeout.
1025 when = self._scheduled[0]._when
Guido van Rossum3d1bc602014-05-10 15:47:15 -07001026 timeout = max(0, when - self.time())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001027
Victor Stinner770e48d2014-07-11 11:58:33 +02001028 if self._debug and timeout != 0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001029 t0 = self.time()
1030 event_list = self._selector.select(timeout)
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001031 dt = self.time() - t0
Victor Stinner770e48d2014-07-11 11:58:33 +02001032 if dt >= 1.0:
Victor Stinner22463aa2014-01-20 23:56:40 +01001033 level = logging.INFO
1034 else:
1035 level = logging.DEBUG
Victor Stinner770e48d2014-07-11 11:58:33 +02001036 nevent = len(event_list)
1037 if timeout is None:
1038 logger.log(level, 'poll took %.3f ms: %s events',
1039 dt * 1e3, nevent)
1040 elif nevent:
1041 logger.log(level,
1042 'poll %.3f ms took %.3f ms: %s events',
1043 timeout * 1e3, dt * 1e3, nevent)
1044 elif dt >= 1.0:
1045 logger.log(level,
1046 'poll %.3f ms took %.3f ms: timeout',
1047 timeout * 1e3, dt * 1e3)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001048 else:
Victor Stinner22463aa2014-01-20 23:56:40 +01001049 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001050 self._process_events(event_list)
1051
1052 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +01001053 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001054 while self._scheduled:
1055 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +01001056 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001057 break
1058 handle = heapq.heappop(self._scheduled)
Yury Selivanov592ada92014-09-25 12:07:56 -04001059 handle._scheduled = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001060 self._ready.append(handle)
1061
1062 # This is the only place where callbacks are actually *called*.
1063 # All other places just add them to ready.
1064 # Note: We run all currently scheduled callbacks, but not any
1065 # callbacks scheduled by callbacks run this time around --
1066 # they will be run the next time (after another I/O poll).
Victor Stinneracdb7822014-07-14 18:33:40 +02001067 # Use an idiom that is thread-safe without using locks.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001068 ntodo = len(self._ready)
1069 for i in range(ntodo):
1070 handle = self._ready.popleft()
Victor Stinner0e6f52a2014-06-20 17:34:15 +02001071 if handle._cancelled:
1072 continue
1073 if self._debug:
1074 t0 = self.time()
1075 handle._run()
1076 dt = self.time() - t0
1077 if dt >= self.slow_callback_duration:
1078 logger.warning('Executing %s took %.3f seconds',
1079 _format_handle(handle), dt)
1080 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001081 handle._run()
1082 handle = None # Needed to break cycles when an exception occurs.
Victor Stinner0f3e6bc2014-02-19 23:15:02 +01001083
1084 def get_debug(self):
1085 return self._debug
1086
1087 def set_debug(self, enabled):
1088 self._debug = enabled