blob: 5765f47c135690994f9c206d69ed283cb491f96e [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
20import logging
21import socket
22import subprocess
23import time
24import os
25import sys
26
27from . import events
28from . import futures
29from . import tasks
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070030from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33__all__ = ['BaseEventLoop', 'Server']
34
35
36# Argument for default thread pool executor creation.
37_MAX_WORKERS = 5
38
39
40class _StopError(BaseException):
41 """Raised to stop the event loop."""
42
43
44def _raise_stop_error(*args):
45 raise _StopError
46
47
48class Server(events.AbstractServer):
49
50 def __init__(self, loop, sockets):
51 self.loop = loop
52 self.sockets = sockets
53 self.active_count = 0
54 self.waiters = []
55
56 def attach(self, transport):
57 assert self.sockets is not None
58 self.active_count += 1
59
60 def detach(self, transport):
61 assert self.active_count > 0
62 self.active_count -= 1
63 if self.active_count == 0 and self.sockets is None:
64 self._wakeup()
65
66 def close(self):
67 sockets = self.sockets
68 if sockets is not None:
69 self.sockets = None
70 for sock in sockets:
71 self.loop._stop_serving(sock)
72 if self.active_count == 0:
73 self._wakeup()
74
75 def _wakeup(self):
76 waiters = self.waiters
77 self.waiters = None
78 for waiter in waiters:
79 if not waiter.done():
80 waiter.set_result(waiter)
81
82 @tasks.coroutine
83 def wait_closed(self):
84 if self.sockets is None or self.waiters is None:
85 return
86 waiter = futures.Future(loop=self.loop)
87 self.waiters.append(waiter)
88 yield from waiter
89
90
91class BaseEventLoop(events.AbstractEventLoop):
92
93 def __init__(self):
94 self._ready = collections.deque()
95 self._scheduled = []
96 self._default_executor = None
97 self._internal_fds = 0
98 self._running = False
Victor Stinner6cf5c962014-02-10 23:42:32 +010099 self._clock_resolution = time.get_clock_info('monotonic').resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700100
101 def _make_socket_transport(self, sock, protocol, waiter=None, *,
102 extra=None, server=None):
103 """Create socket transport."""
104 raise NotImplementedError
105
106 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
107 server_side=False, server_hostname=None,
108 extra=None, server=None):
109 """Create SSL transport."""
110 raise NotImplementedError
111
112 def _make_datagram_transport(self, sock, protocol,
113 address=None, extra=None):
114 """Create datagram transport."""
115 raise NotImplementedError
116
117 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
118 extra=None):
119 """Create read pipe transport."""
120 raise NotImplementedError
121
122 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
123 extra=None):
124 """Create write pipe transport."""
125 raise NotImplementedError
126
127 @tasks.coroutine
128 def _make_subprocess_transport(self, protocol, args, shell,
129 stdin, stdout, stderr, bufsize,
130 extra=None, **kwargs):
131 """Create subprocess transport."""
132 raise NotImplementedError
133
134 def _read_from_self(self):
135 """XXX"""
136 raise NotImplementedError
137
138 def _write_to_self(self):
139 """XXX"""
140 raise NotImplementedError
141
142 def _process_events(self, event_list):
143 """Process selector events."""
144 raise NotImplementedError
145
146 def run_forever(self):
147 """Run until stop() is called."""
148 if self._running:
149 raise RuntimeError('Event loop is running.')
150 self._running = True
151 try:
152 while True:
153 try:
154 self._run_once()
155 except _StopError:
156 break
157 finally:
158 self._running = False
159
160 def run_until_complete(self, future):
161 """Run until the Future is done.
162
163 If the argument is a coroutine, it is wrapped in a Task.
164
165 XXX TBD: It would be disastrous to call run_until_complete()
166 with the same coroutine twice -- it would wrap it in two
167 different Tasks and that can't be good.
168
169 Return the Future's result, or raise its exception.
170 """
171 future = tasks.async(future, loop=self)
172 future.add_done_callback(_raise_stop_error)
173 self.run_forever()
174 future.remove_done_callback(_raise_stop_error)
175 if not future.done():
176 raise RuntimeError('Event loop stopped before Future completed.')
177
178 return future.result()
179
180 def stop(self):
181 """Stop running the event loop.
182
183 Every callback scheduled before stop() is called will run.
184 Callback scheduled after stop() is called won't. However,
185 those callbacks will run if run() is called again later.
186 """
187 self.call_soon(_raise_stop_error)
188
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200189 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700190 """Close the event loop.
191
192 This clears the queues and shuts down the executor,
193 but does not wait for the executor to finish.
194 """
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200195 self._ready.clear()
196 self._scheduled.clear()
197 executor = self._default_executor
198 if executor is not None:
199 self._default_executor = None
200 executor.shutdown(wait=False)
201
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700202 def is_running(self):
203 """Returns running status of event loop."""
204 return self._running
205
206 def time(self):
207 """Return the time according to the event loop's clock."""
208 return time.monotonic()
209
210 def call_later(self, delay, callback, *args):
211 """Arrange for a callback to be called at a given time.
212
213 Return a Handle: an opaque object with a cancel() method that
214 can be used to cancel the call.
215
216 The delay can be an int or float, expressed in seconds. It is
217 always a relative time.
218
219 Each callback will be called exactly once. If two callbacks
220 are scheduled for exactly the same time, it undefined which
221 will be called first.
222
223 Any positional arguments after the callback will be passed to
224 the callback when it is called.
225 """
226 return self.call_at(self.time() + delay, callback, *args)
227
228 def call_at(self, when, callback, *args):
229 """Like call_later(), but uses an absolute time."""
Victor Stinnera1254972014-02-11 11:34:30 +0100230 if tasks.iscoroutinefunction(callback):
231 raise TypeError("coroutines cannot be used with call_at()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700232 timer = events.TimerHandle(when, callback, args)
233 heapq.heappush(self._scheduled, timer)
234 return timer
235
236 def call_soon(self, callback, *args):
237 """Arrange for a callback to be called as soon as possible.
238
239 This operates as a FIFO queue, callbacks are called in the
240 order in which they are registered. Each callback will be
241 called exactly once.
242
243 Any positional arguments after the callback will be passed to
244 the callback when it is called.
245 """
Victor Stinnera1254972014-02-11 11:34:30 +0100246 if tasks.iscoroutinefunction(callback):
247 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinnerdc62b7e2014-02-10 00:45:44 +0100248 handle = events.Handle(callback, args)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700249 self._ready.append(handle)
250 return handle
251
252 def call_soon_threadsafe(self, callback, *args):
253 """XXX"""
254 handle = self.call_soon(callback, *args)
255 self._write_to_self()
256 return handle
257
258 def run_in_executor(self, executor, callback, *args):
Victor Stinnera1254972014-02-11 11:34:30 +0100259 if tasks.iscoroutinefunction(callback):
260 raise TypeError("coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700261 if isinstance(callback, events.Handle):
262 assert not args
263 assert not isinstance(callback, events.TimerHandle)
264 if callback._cancelled:
265 f = futures.Future(loop=self)
266 f.set_result(None)
267 return f
268 callback, args = callback._callback, callback._args
269 if executor is None:
270 executor = self._default_executor
271 if executor is None:
272 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
273 self._default_executor = executor
274 return futures.wrap_future(executor.submit(callback, *args), loop=self)
275
276 def set_default_executor(self, executor):
277 self._default_executor = executor
278
279 def getaddrinfo(self, host, port, *,
280 family=0, type=0, proto=0, flags=0):
281 return self.run_in_executor(None, socket.getaddrinfo,
282 host, port, family, type, proto, flags)
283
284 def getnameinfo(self, sockaddr, flags=0):
285 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
286
287 @tasks.coroutine
288 def create_connection(self, protocol_factory, host=None, port=None, *,
289 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700290 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291 """XXX"""
Guido van Rossum21c85a72013-11-01 14:16:54 -0700292 if server_hostname is not None and not ssl:
293 raise ValueError('server_hostname is only meaningful with ssl')
294
295 if server_hostname is None and ssl:
296 # Use host as default for server_hostname. It is an error
297 # if host is empty or not set, e.g. when an
298 # already-connected socket was passed or when only a port
299 # is given. To avoid this error, you can pass
300 # server_hostname='' -- this will bypass the hostname
301 # check. (This also means that if host is a numeric
302 # IP/IPv6 address, we will attempt to verify that exact
303 # address; this will probably fail, but it is possible to
304 # create a certificate for a specific IP address, so we
305 # don't judge it here.)
306 if not host:
307 raise ValueError('You must set server_hostname '
308 'when using ssl without a host')
309 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700310
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700311 if host is not None or port is not None:
312 if sock is not None:
313 raise ValueError(
314 'host/port and sock can not be specified at the same time')
315
316 f1 = self.getaddrinfo(
317 host, port, family=family,
318 type=socket.SOCK_STREAM, proto=proto, flags=flags)
319 fs = [f1]
320 if local_addr is not None:
321 f2 = self.getaddrinfo(
322 *local_addr, family=family,
323 type=socket.SOCK_STREAM, proto=proto, flags=flags)
324 fs.append(f2)
325 else:
326 f2 = None
327
328 yield from tasks.wait(fs, loop=self)
329
330 infos = f1.result()
331 if not infos:
332 raise OSError('getaddrinfo() returned empty list')
333 if f2 is not None:
334 laddr_infos = f2.result()
335 if not laddr_infos:
336 raise OSError('getaddrinfo() returned empty list')
337
338 exceptions = []
339 for family, type, proto, cname, address in infos:
340 try:
341 sock = socket.socket(family=family, type=type, proto=proto)
342 sock.setblocking(False)
343 if f2 is not None:
344 for _, _, _, _, laddr in laddr_infos:
345 try:
346 sock.bind(laddr)
347 break
348 except OSError as exc:
349 exc = OSError(
350 exc.errno, 'error while '
351 'attempting to bind on address '
352 '{!r}: {}'.format(
353 laddr, exc.strerror.lower()))
354 exceptions.append(exc)
355 else:
356 sock.close()
357 sock = None
358 continue
359 yield from self.sock_connect(sock, address)
360 except OSError as exc:
361 if sock is not None:
362 sock.close()
363 exceptions.append(exc)
364 else:
365 break
366 else:
367 if len(exceptions) == 1:
368 raise exceptions[0]
369 else:
370 # If they all have the same str(), raise one.
371 model = str(exceptions[0])
372 if all(str(exc) == model for exc in exceptions):
373 raise exceptions[0]
374 # Raise a combined exception so the user can see all
375 # the various error messages.
376 raise OSError('Multiple exceptions: {}'.format(
377 ', '.join(str(exc) for exc in exceptions)))
378
379 elif sock is None:
380 raise ValueError(
381 'host and port was not specified and no sock specified')
382
383 sock.setblocking(False)
384
385 protocol = protocol_factory()
386 waiter = futures.Future(loop=self)
387 if ssl:
388 sslcontext = None if isinstance(ssl, bool) else ssl
389 transport = self._make_ssl_transport(
390 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700391 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700392 else:
393 transport = self._make_socket_transport(sock, protocol, waiter)
394
395 yield from waiter
396 return transport, protocol
397
398 @tasks.coroutine
399 def create_datagram_endpoint(self, protocol_factory,
400 local_addr=None, remote_addr=None, *,
401 family=0, proto=0, flags=0):
402 """Create datagram connection."""
403 if not (local_addr or remote_addr):
404 if family == 0:
405 raise ValueError('unexpected address family')
406 addr_pairs_info = (((family, proto), (None, None)),)
407 else:
408 # join addresss by (family, protocol)
409 addr_infos = collections.OrderedDict()
410 for idx, addr in ((0, local_addr), (1, remote_addr)):
411 if addr is not None:
412 assert isinstance(addr, tuple) and len(addr) == 2, (
413 '2-tuple is expected')
414
415 infos = yield from self.getaddrinfo(
416 *addr, family=family, type=socket.SOCK_DGRAM,
417 proto=proto, flags=flags)
418 if not infos:
419 raise OSError('getaddrinfo() returned empty list')
420
421 for fam, _, pro, _, address in infos:
422 key = (fam, pro)
423 if key not in addr_infos:
424 addr_infos[key] = [None, None]
425 addr_infos[key][idx] = address
426
427 # each addr has to have info for each (family, proto) pair
428 addr_pairs_info = [
429 (key, addr_pair) for key, addr_pair in addr_infos.items()
430 if not ((local_addr and addr_pair[0] is None) or
431 (remote_addr and addr_pair[1] is None))]
432
433 if not addr_pairs_info:
434 raise ValueError('can not get address information')
435
436 exceptions = []
437
438 for ((family, proto),
439 (local_address, remote_address)) in addr_pairs_info:
440 sock = None
441 r_addr = None
442 try:
443 sock = socket.socket(
444 family=family, type=socket.SOCK_DGRAM, proto=proto)
445 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
446 sock.setblocking(False)
447
448 if local_addr:
449 sock.bind(local_address)
450 if remote_addr:
451 yield from self.sock_connect(sock, remote_address)
452 r_addr = remote_address
453 except OSError as exc:
454 if sock is not None:
455 sock.close()
456 exceptions.append(exc)
457 else:
458 break
459 else:
460 raise exceptions[0]
461
462 protocol = protocol_factory()
463 transport = self._make_datagram_transport(sock, protocol, r_addr)
464 return transport, protocol
465
466 @tasks.coroutine
467 def create_server(self, protocol_factory, host=None, port=None,
468 *,
469 family=socket.AF_UNSPEC,
470 flags=socket.AI_PASSIVE,
471 sock=None,
472 backlog=100,
473 ssl=None,
474 reuse_address=None):
475 """XXX"""
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700476 if isinstance(ssl, bool):
477 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700478 if host is not None or port is not None:
479 if sock is not None:
480 raise ValueError(
481 'host/port and sock can not be specified at the same time')
482
483 AF_INET6 = getattr(socket, 'AF_INET6', 0)
484 if reuse_address is None:
485 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
486 sockets = []
487 if host == '':
488 host = None
489
490 infos = yield from self.getaddrinfo(
491 host, port, family=family,
492 type=socket.SOCK_STREAM, proto=0, flags=flags)
493 if not infos:
494 raise OSError('getaddrinfo() returned empty list')
495
496 completed = False
497 try:
498 for res in infos:
499 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700500 try:
501 sock = socket.socket(af, socktype, proto)
502 except socket.error:
503 # Assume it's a bad family/type/protocol combination.
504 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700505 sockets.append(sock)
506 if reuse_address:
507 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
508 True)
509 # Disable IPv4/IPv6 dual stack support (enabled by
510 # default on Linux) which makes a single socket
511 # listen on both address families.
512 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
513 sock.setsockopt(socket.IPPROTO_IPV6,
514 socket.IPV6_V6ONLY,
515 True)
516 try:
517 sock.bind(sa)
518 except OSError as err:
519 raise OSError(err.errno, 'error while attempting '
520 'to bind on address %r: %s'
521 % (sa, err.strerror.lower()))
522 completed = True
523 finally:
524 if not completed:
525 for sock in sockets:
526 sock.close()
527 else:
528 if sock is None:
529 raise ValueError(
530 'host and port was not specified and no sock specified')
531 sockets = [sock]
532
533 server = Server(self, sockets)
534 for sock in sockets:
535 sock.listen(backlog)
536 sock.setblocking(False)
537 self._start_serving(protocol_factory, sock, ssl, server)
538 return server
539
540 @tasks.coroutine
541 def connect_read_pipe(self, protocol_factory, pipe):
542 protocol = protocol_factory()
543 waiter = futures.Future(loop=self)
544 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
545 yield from waiter
546 return transport, protocol
547
548 @tasks.coroutine
549 def connect_write_pipe(self, protocol_factory, pipe):
550 protocol = protocol_factory()
551 waiter = futures.Future(loop=self)
552 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
553 yield from waiter
554 return transport, protocol
555
556 @tasks.coroutine
557 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
558 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
559 universal_newlines=False, shell=True, bufsize=0,
560 **kwargs):
Victor Stinner4e8d2f22014-02-11 11:44:56 +0100561 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800562 raise ValueError("cmd must be a string")
563 if universal_newlines:
564 raise ValueError("universal_newlines must be False")
565 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100566 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800567 if bufsize != 0:
568 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700569 protocol = protocol_factory()
570 transport = yield from self._make_subprocess_transport(
571 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
572 return transport, protocol
573
574 @tasks.coroutine
Victor Stinner4e8d2f22014-02-11 11:44:56 +0100575 def subprocess_exec(self, protocol_factory, program, *args, stdin=subprocess.PIPE,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
577 universal_newlines=False, shell=False, bufsize=0,
578 **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800579 if universal_newlines:
580 raise ValueError("universal_newlines must be False")
581 if shell:
582 raise ValueError("shell must be False")
583 if bufsize != 0:
584 raise ValueError("bufsize must be 0")
Victor Stinner4e8d2f22014-02-11 11:44:56 +0100585 popen_args = (program,) + args
586 for arg in popen_args:
587 if not isinstance(arg, (str, bytes)):
588 raise TypeError("program arguments must be "
589 "a bytes or text string, not %s"
590 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700591 protocol = protocol_factory()
592 transport = yield from self._make_subprocess_transport(
Victor Stinner4e8d2f22014-02-11 11:44:56 +0100593 protocol, popen_args, False, stdin, stdout, stderr, bufsize, **kwargs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700594 return transport, protocol
595
596 def _add_callback(self, handle):
597 """Add a Handle to ready or scheduled."""
598 assert isinstance(handle, events.Handle), 'A Handle is required here'
599 if handle._cancelled:
600 return
601 if isinstance(handle, events.TimerHandle):
602 heapq.heappush(self._scheduled, handle)
603 else:
604 self._ready.append(handle)
605
606 def _add_callback_signalsafe(self, handle):
607 """Like _add_callback() but called from a signal handler."""
608 self._add_callback(handle)
609 self._write_to_self()
610
611 def _run_once(self):
612 """Run one full iteration of the event loop.
613
614 This calls all currently ready callbacks, polls for I/O,
615 schedules the resulting callbacks, and finally schedules
616 'call_later' callbacks.
617 """
618 # Remove delayed calls that were cancelled from head of queue.
619 while self._scheduled and self._scheduled[0]._cancelled:
620 heapq.heappop(self._scheduled)
621
622 timeout = None
623 if self._ready:
624 timeout = 0
625 elif self._scheduled:
626 # Compute the desired timeout.
627 when = self._scheduled[0]._when
628 deadline = max(0, when - self.time())
629 if timeout is None:
630 timeout = deadline
631 else:
632 timeout = min(timeout, deadline)
633
634 # TODO: Instrumentation only in debug mode?
Victor Stinner49d0f4e2014-01-31 12:59:43 +0100635 if logger.isEnabledFor(logging.INFO):
Victor Stinner22463aa2014-01-20 23:56:40 +0100636 t0 = self.time()
637 event_list = self._selector.select(timeout)
638 t1 = self.time()
Victor Stinner22463aa2014-01-20 23:56:40 +0100639 if t1-t0 >= 1:
640 level = logging.INFO
641 else:
642 level = logging.DEBUG
Victor Stinner4a2dbeb2014-01-22 12:26:01 +0100643 if timeout is not None:
644 logger.log(level, 'poll %.3f took %.3f seconds',
645 timeout, t1-t0)
646 else:
647 logger.log(level, 'poll took %.3f seconds', t1-t0)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700648 else:
Victor Stinner7bff8e12014-02-11 10:08:08 +0100649 t0_monotonic = time.monotonic()
650 t0 = time.perf_counter()
Victor Stinner22463aa2014-01-20 23:56:40 +0100651 event_list = self._selector.select(timeout)
Victor Stinner7bff8e12014-02-11 10:08:08 +0100652 dt = time.perf_counter() - t0
653 dt_monotonic = time.monotonic() - t0_monotonic
Victor Stinner1db2ba32014-02-11 10:26:53 +0100654 if (not event_list and timeout
655 and (dt < timeout or dt_monotonic < timeout)):
Victor Stinner7bff8e12014-02-11 10:08:08 +0100656 selector = self._selector.__class__.__name__
657 if (selector.startswith(("Poll", "Epoll", "Iocp"))
658 or timeout > 1e-3 or dt > 1e-3):
659 unit, factor = "ms", 1e3
660 else:
661 unit, factor = "us", 1e6
Victor Stinner1db2ba32014-02-11 10:26:53 +0100662 print("asyncio: %s.select(%.4f %s) took %.3f %s"
663 " (monotonic=%.3f %s, clock res=%.3f %s)"
Victor Stinner7bff8e12014-02-11 10:08:08 +0100664 % (self._selector.__class__.__name__,
665 timeout * factor, unit,
666 dt * factor, unit,
667 dt_monotonic * factor, unit,
668 self._clock_resolution * factor, unit),
Victor Stinner1c143b12014-02-10 11:47:50 +0100669 file=sys.__stderr__, flush=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700670 self._process_events(event_list)
671
672 # Handle 'later' callbacks that are ready.
Victor Stinner6cf5c962014-02-10 23:42:32 +0100673 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700674 while self._scheduled:
675 handle = self._scheduled[0]
Victor Stinner6cf5c962014-02-10 23:42:32 +0100676 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700677 break
678 handle = heapq.heappop(self._scheduled)
679 self._ready.append(handle)
680
681 # This is the only place where callbacks are actually *called*.
682 # All other places just add them to ready.
683 # Note: We run all currently scheduled callbacks, but not any
684 # callbacks scheduled by callbacks run this time around --
685 # they will be run the next time (after another I/O poll).
686 # Use an idiom that is threadsafe without using locks.
687 ntodo = len(self._ready)
688 for i in range(ntodo):
689 handle = self._ready.popleft()
690 if not handle._cancelled:
691 handle._run()
692 handle = None # Needed to break cycles when an exception occurs.