blob: 3bbf6b54661a3231a70c3641f14feba14b957abd [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
20import logging
21import socket
22import subprocess
23import time
24import os
25import sys
26
27from . import events
28from . import futures
29from . import tasks
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070030from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33__all__ = ['BaseEventLoop', 'Server']
34
35
36# Argument for default thread pool executor creation.
37_MAX_WORKERS = 5
38
39
40class _StopError(BaseException):
41 """Raised to stop the event loop."""
42
43
Victor Stinner1b0580b2014-02-13 09:24:37 +010044def _check_resolved_address(sock, address):
45 # Ensure that the address is already resolved to avoid the trap of hanging
46 # the entire event loop when the address requires doing a DNS lookup.
47 family = sock.family
48 if family not in (socket.AF_INET, socket.AF_INET6):
49 return
50
51 host, port = address
52 type_mask = 0
53 if hasattr(socket, 'SOCK_NONBLOCK'):
54 type_mask |= socket.SOCK_NONBLOCK
55 if hasattr(socket, 'SOCK_CLOEXEC'):
56 type_mask |= socket.SOCK_CLOEXEC
57 # Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is
58 # already resolved.
59 try:
60 socket.getaddrinfo(host, port,
61 family=family,
62 type=(sock.type & ~type_mask),
63 proto=sock.proto,
64 flags=socket.AI_NUMERICHOST)
65 except socket.gaierror as err:
66 raise ValueError("address must be resolved (IP address), got %r: %s"
67 % (address, err))
68
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069def _raise_stop_error(*args):
70 raise _StopError
71
72
73class Server(events.AbstractServer):
74
75 def __init__(self, loop, sockets):
76 self.loop = loop
77 self.sockets = sockets
78 self.active_count = 0
79 self.waiters = []
80
81 def attach(self, transport):
82 assert self.sockets is not None
83 self.active_count += 1
84
85 def detach(self, transport):
86 assert self.active_count > 0
87 self.active_count -= 1
88 if self.active_count == 0 and self.sockets is None:
89 self._wakeup()
90
91 def close(self):
92 sockets = self.sockets
93 if sockets is not None:
94 self.sockets = None
95 for sock in sockets:
96 self.loop._stop_serving(sock)
97 if self.active_count == 0:
98 self._wakeup()
99
100 def _wakeup(self):
101 waiters = self.waiters
102 self.waiters = None
103 for waiter in waiters:
104 if not waiter.done():
105 waiter.set_result(waiter)
106
107 @tasks.coroutine
108 def wait_closed(self):
109 if self.sockets is None or self.waiters is None:
110 return
111 waiter = futures.Future(loop=self.loop)
112 self.waiters.append(waiter)
113 yield from waiter
114
115
116class BaseEventLoop(events.AbstractEventLoop):
117
118 def __init__(self):
119 self._ready = collections.deque()
120 self._scheduled = []
121 self._default_executor = None
122 self._internal_fds = 0
123 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100124 self._clock_resolution = time.get_clock_info('monotonic').resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700125
126 def _make_socket_transport(self, sock, protocol, waiter=None, *,
127 extra=None, server=None):
128 """Create socket transport."""
129 raise NotImplementedError
130
131 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
132 server_side=False, server_hostname=None,
133 extra=None, server=None):
134 """Create SSL transport."""
135 raise NotImplementedError
136
137 def _make_datagram_transport(self, sock, protocol,
138 address=None, extra=None):
139 """Create datagram transport."""
140 raise NotImplementedError
141
142 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
143 extra=None):
144 """Create read pipe transport."""
145 raise NotImplementedError
146
147 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
148 extra=None):
149 """Create write pipe transport."""
150 raise NotImplementedError
151
152 @tasks.coroutine
153 def _make_subprocess_transport(self, protocol, args, shell,
154 stdin, stdout, stderr, bufsize,
155 extra=None, **kwargs):
156 """Create subprocess transport."""
157 raise NotImplementedError
158
159 def _read_from_self(self):
160 """XXX"""
161 raise NotImplementedError
162
163 def _write_to_self(self):
164 """XXX"""
165 raise NotImplementedError
166
167 def _process_events(self, event_list):
168 """Process selector events."""
169 raise NotImplementedError
170
171 def run_forever(self):
172 """Run until stop() is called."""
173 if self._running:
174 raise RuntimeError('Event loop is running.')
175 self._running = True
176 try:
177 while True:
178 try:
179 self._run_once()
180 except _StopError:
181 break
182 finally:
183 self._running = False
184
185 def run_until_complete(self, future):
186 """Run until the Future is done.
187
188 If the argument is a coroutine, it is wrapped in a Task.
189
190 XXX TBD: It would be disastrous to call run_until_complete()
191 with the same coroutine twice -- it would wrap it in two
192 different Tasks and that can't be good.
193
194 Return the Future's result, or raise its exception.
195 """
196 future = tasks.async(future, loop=self)
197 future.add_done_callback(_raise_stop_error)
198 self.run_forever()
199 future.remove_done_callback(_raise_stop_error)
200 if not future.done():
201 raise RuntimeError('Event loop stopped before Future completed.')
202
203 return future.result()
204
205 def stop(self):
206 """Stop running the event loop.
207
208 Every callback scheduled before stop() is called will run.
209 Callback scheduled after stop() is called won't. However,
210 those callbacks will run if run() is called again later.
211 """
212 self.call_soon(_raise_stop_error)
213
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200214 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700215 """Close the event loop.
216
217 This clears the queues and shuts down the executor,
218 but does not wait for the executor to finish.
219 """
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200220 self._ready.clear()
221 self._scheduled.clear()
222 executor = self._default_executor
223 if executor is not None:
224 self._default_executor = None
225 executor.shutdown(wait=False)
226
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227 def is_running(self):
228 """Returns running status of event loop."""
229 return self._running
230
231 def time(self):
232 """Return the time according to the event loop's clock."""
233 return time.monotonic()
234
235 def call_later(self, delay, callback, *args):
236 """Arrange for a callback to be called at a given time.
237
238 Return a Handle: an opaque object with a cancel() method that
239 can be used to cancel the call.
240
241 The delay can be an int or float, expressed in seconds. It is
242 always a relative time.
243
244 Each callback will be called exactly once. If two callbacks
245 are scheduled for exactly the same time, it undefined which
246 will be called first.
247
248 Any positional arguments after the callback will be passed to
249 the callback when it is called.
250 """
251 return self.call_at(self.time() + delay, callback, *args)
252
253 def call_at(self, when, callback, *args):
254 """Like call_later(), but uses an absolute time."""
Victor Stinner9af4a242014-02-11 11:34:30 +0100255 if tasks.iscoroutinefunction(callback):
256 raise TypeError("coroutines cannot be used with call_at()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257 timer = events.TimerHandle(when, callback, args)
258 heapq.heappush(self._scheduled, timer)
259 return timer
260
261 def call_soon(self, callback, *args):
262 """Arrange for a callback to be called as soon as possible.
263
264 This operates as a FIFO queue, callbacks are called in the
265 order in which they are registered. Each callback will be
266 called exactly once.
267
268 Any positional arguments after the callback will be passed to
269 the callback when it is called.
270 """
Victor Stinner9af4a242014-02-11 11:34:30 +0100271 if tasks.iscoroutinefunction(callback):
272 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinnerdc62b7e2014-02-10 00:45:44 +0100273 handle = events.Handle(callback, args)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274 self._ready.append(handle)
275 return handle
276
277 def call_soon_threadsafe(self, callback, *args):
278 """XXX"""
279 handle = self.call_soon(callback, *args)
280 self._write_to_self()
281 return handle
282
283 def run_in_executor(self, executor, callback, *args):
Victor Stinner9af4a242014-02-11 11:34:30 +0100284 if tasks.iscoroutinefunction(callback):
285 raise TypeError("coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286 if isinstance(callback, events.Handle):
287 assert not args
288 assert not isinstance(callback, events.TimerHandle)
289 if callback._cancelled:
290 f = futures.Future(loop=self)
291 f.set_result(None)
292 return f
293 callback, args = callback._callback, callback._args
294 if executor is None:
295 executor = self._default_executor
296 if executor is None:
297 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
298 self._default_executor = executor
299 return futures.wrap_future(executor.submit(callback, *args), loop=self)
300
301 def set_default_executor(self, executor):
302 self._default_executor = executor
303
304 def getaddrinfo(self, host, port, *,
305 family=0, type=0, proto=0, flags=0):
306 return self.run_in_executor(None, socket.getaddrinfo,
307 host, port, family, type, proto, flags)
308
309 def getnameinfo(self, sockaddr, flags=0):
310 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
311
312 @tasks.coroutine
313 def create_connection(self, protocol_factory, host=None, port=None, *,
314 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700315 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 """XXX"""
Guido van Rossum21c85a72013-11-01 14:16:54 -0700317 if server_hostname is not None and not ssl:
318 raise ValueError('server_hostname is only meaningful with ssl')
319
320 if server_hostname is None and ssl:
321 # Use host as default for server_hostname. It is an error
322 # if host is empty or not set, e.g. when an
323 # already-connected socket was passed or when only a port
324 # is given. To avoid this error, you can pass
325 # server_hostname='' -- this will bypass the hostname
326 # check. (This also means that if host is a numeric
327 # IP/IPv6 address, we will attempt to verify that exact
328 # address; this will probably fail, but it is possible to
329 # create a certificate for a specific IP address, so we
330 # don't judge it here.)
331 if not host:
332 raise ValueError('You must set server_hostname '
333 'when using ssl without a host')
334 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700335
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 if host is not None or port is not None:
337 if sock is not None:
338 raise ValueError(
339 'host/port and sock can not be specified at the same time')
340
341 f1 = self.getaddrinfo(
342 host, port, family=family,
343 type=socket.SOCK_STREAM, proto=proto, flags=flags)
344 fs = [f1]
345 if local_addr is not None:
346 f2 = self.getaddrinfo(
347 *local_addr, family=family,
348 type=socket.SOCK_STREAM, proto=proto, flags=flags)
349 fs.append(f2)
350 else:
351 f2 = None
352
353 yield from tasks.wait(fs, loop=self)
354
355 infos = f1.result()
356 if not infos:
357 raise OSError('getaddrinfo() returned empty list')
358 if f2 is not None:
359 laddr_infos = f2.result()
360 if not laddr_infos:
361 raise OSError('getaddrinfo() returned empty list')
362
363 exceptions = []
364 for family, type, proto, cname, address in infos:
365 try:
366 sock = socket.socket(family=family, type=type, proto=proto)
367 sock.setblocking(False)
368 if f2 is not None:
369 for _, _, _, _, laddr in laddr_infos:
370 try:
371 sock.bind(laddr)
372 break
373 except OSError as exc:
374 exc = OSError(
375 exc.errno, 'error while '
376 'attempting to bind on address '
377 '{!r}: {}'.format(
378 laddr, exc.strerror.lower()))
379 exceptions.append(exc)
380 else:
381 sock.close()
382 sock = None
383 continue
384 yield from self.sock_connect(sock, address)
385 except OSError as exc:
386 if sock is not None:
387 sock.close()
388 exceptions.append(exc)
389 else:
390 break
391 else:
392 if len(exceptions) == 1:
393 raise exceptions[0]
394 else:
395 # If they all have the same str(), raise one.
396 model = str(exceptions[0])
397 if all(str(exc) == model for exc in exceptions):
398 raise exceptions[0]
399 # Raise a combined exception so the user can see all
400 # the various error messages.
401 raise OSError('Multiple exceptions: {}'.format(
402 ', '.join(str(exc) for exc in exceptions)))
403
404 elif sock is None:
405 raise ValueError(
406 'host and port was not specified and no sock specified')
407
408 sock.setblocking(False)
409
410 protocol = protocol_factory()
411 waiter = futures.Future(loop=self)
412 if ssl:
413 sslcontext = None if isinstance(ssl, bool) else ssl
414 transport = self._make_ssl_transport(
415 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700416 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417 else:
418 transport = self._make_socket_transport(sock, protocol, waiter)
419
420 yield from waiter
421 return transport, protocol
422
423 @tasks.coroutine
424 def create_datagram_endpoint(self, protocol_factory,
425 local_addr=None, remote_addr=None, *,
426 family=0, proto=0, flags=0):
427 """Create datagram connection."""
428 if not (local_addr or remote_addr):
429 if family == 0:
430 raise ValueError('unexpected address family')
431 addr_pairs_info = (((family, proto), (None, None)),)
432 else:
433 # join addresss by (family, protocol)
434 addr_infos = collections.OrderedDict()
435 for idx, addr in ((0, local_addr), (1, remote_addr)):
436 if addr is not None:
437 assert isinstance(addr, tuple) and len(addr) == 2, (
438 '2-tuple is expected')
439
440 infos = yield from self.getaddrinfo(
441 *addr, family=family, type=socket.SOCK_DGRAM,
442 proto=proto, flags=flags)
443 if not infos:
444 raise OSError('getaddrinfo() returned empty list')
445
446 for fam, _, pro, _, address in infos:
447 key = (fam, pro)
448 if key not in addr_infos:
449 addr_infos[key] = [None, None]
450 addr_infos[key][idx] = address
451
452 # each addr has to have info for each (family, proto) pair
453 addr_pairs_info = [
454 (key, addr_pair) for key, addr_pair in addr_infos.items()
455 if not ((local_addr and addr_pair[0] is None) or
456 (remote_addr and addr_pair[1] is None))]
457
458 if not addr_pairs_info:
459 raise ValueError('can not get address information')
460
461 exceptions = []
462
463 for ((family, proto),
464 (local_address, remote_address)) in addr_pairs_info:
465 sock = None
466 r_addr = None
467 try:
468 sock = socket.socket(
469 family=family, type=socket.SOCK_DGRAM, proto=proto)
470 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
471 sock.setblocking(False)
472
473 if local_addr:
474 sock.bind(local_address)
475 if remote_addr:
476 yield from self.sock_connect(sock, remote_address)
477 r_addr = remote_address
478 except OSError as exc:
479 if sock is not None:
480 sock.close()
481 exceptions.append(exc)
482 else:
483 break
484 else:
485 raise exceptions[0]
486
487 protocol = protocol_factory()
488 transport = self._make_datagram_transport(sock, protocol, r_addr)
489 return transport, protocol
490
491 @tasks.coroutine
492 def create_server(self, protocol_factory, host=None, port=None,
493 *,
494 family=socket.AF_UNSPEC,
495 flags=socket.AI_PASSIVE,
496 sock=None,
497 backlog=100,
498 ssl=None,
499 reuse_address=None):
500 """XXX"""
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700501 if isinstance(ssl, bool):
502 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700503 if host is not None or port is not None:
504 if sock is not None:
505 raise ValueError(
506 'host/port and sock can not be specified at the same time')
507
508 AF_INET6 = getattr(socket, 'AF_INET6', 0)
509 if reuse_address is None:
510 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
511 sockets = []
512 if host == '':
513 host = None
514
515 infos = yield from self.getaddrinfo(
516 host, port, family=family,
517 type=socket.SOCK_STREAM, proto=0, flags=flags)
518 if not infos:
519 raise OSError('getaddrinfo() returned empty list')
520
521 completed = False
522 try:
523 for res in infos:
524 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700525 try:
526 sock = socket.socket(af, socktype, proto)
527 except socket.error:
528 # Assume it's a bad family/type/protocol combination.
529 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700530 sockets.append(sock)
531 if reuse_address:
532 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
533 True)
534 # Disable IPv4/IPv6 dual stack support (enabled by
535 # default on Linux) which makes a single socket
536 # listen on both address families.
537 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
538 sock.setsockopt(socket.IPPROTO_IPV6,
539 socket.IPV6_V6ONLY,
540 True)
541 try:
542 sock.bind(sa)
543 except OSError as err:
544 raise OSError(err.errno, 'error while attempting '
545 'to bind on address %r: %s'
546 % (sa, err.strerror.lower()))
547 completed = True
548 finally:
549 if not completed:
550 for sock in sockets:
551 sock.close()
552 else:
553 if sock is None:
554 raise ValueError(
555 'host and port was not specified and no sock specified')
556 sockets = [sock]
557
558 server = Server(self, sockets)
559 for sock in sockets:
560 sock.listen(backlog)
561 sock.setblocking(False)
562 self._start_serving(protocol_factory, sock, ssl, server)
563 return server
564
565 @tasks.coroutine
566 def connect_read_pipe(self, protocol_factory, pipe):
567 protocol = protocol_factory()
568 waiter = futures.Future(loop=self)
569 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
570 yield from waiter
571 return transport, protocol
572
573 @tasks.coroutine
574 def connect_write_pipe(self, protocol_factory, pipe):
575 protocol = protocol_factory()
576 waiter = futures.Future(loop=self)
577 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
578 yield from waiter
579 return transport, protocol
580
581 @tasks.coroutine
582 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
583 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
584 universal_newlines=False, shell=True, bufsize=0,
585 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100586 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800587 raise ValueError("cmd must be a string")
588 if universal_newlines:
589 raise ValueError("universal_newlines must be False")
590 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100591 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800592 if bufsize != 0:
593 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700594 protocol = protocol_factory()
595 transport = yield from self._make_subprocess_transport(
596 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
597 return transport, protocol
598
599 @tasks.coroutine
Victor Stinner20e07432014-02-11 11:44:56 +0100600 def subprocess_exec(self, protocol_factory, program, *args, stdin=subprocess.PIPE,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700601 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
602 universal_newlines=False, shell=False, bufsize=0,
603 **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800604 if universal_newlines:
605 raise ValueError("universal_newlines must be False")
606 if shell:
607 raise ValueError("shell must be False")
608 if bufsize != 0:
609 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100610 popen_args = (program,) + args
611 for arg in popen_args:
612 if not isinstance(arg, (str, bytes)):
613 raise TypeError("program arguments must be "
614 "a bytes or text string, not %s"
615 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700616 protocol = protocol_factory()
617 transport = yield from self._make_subprocess_transport(
Victor Stinner20e07432014-02-11 11:44:56 +0100618 protocol, popen_args, False, stdin, stdout, stderr, bufsize, **kwargs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700619 return transport, protocol
620
621 def _add_callback(self, handle):
622 """Add a Handle to ready or scheduled."""
623 assert isinstance(handle, events.Handle), 'A Handle is required here'
624 if handle._cancelled:
625 return
626 if isinstance(handle, events.TimerHandle):
627 heapq.heappush(self._scheduled, handle)
628 else:
629 self._ready.append(handle)
630
631 def _add_callback_signalsafe(self, handle):
632 """Like _add_callback() but called from a signal handler."""
633 self._add_callback(handle)
634 self._write_to_self()
635
636 def _run_once(self):
637 """Run one full iteration of the event loop.
638
639 This calls all currently ready callbacks, polls for I/O,
640 schedules the resulting callbacks, and finally schedules
641 'call_later' callbacks.
642 """
643 # Remove delayed calls that were cancelled from head of queue.
644 while self._scheduled and self._scheduled[0]._cancelled:
645 heapq.heappop(self._scheduled)
646
647 timeout = None
648 if self._ready:
649 timeout = 0
650 elif self._scheduled:
651 # Compute the desired timeout.
652 when = self._scheduled[0]._when
653 deadline = max(0, when - self.time())
654 if timeout is None:
655 timeout = deadline
656 else:
657 timeout = min(timeout, deadline)
658
659 # TODO: Instrumentation only in debug mode?
Victor Stinner49d0f4e2014-01-31 12:59:43 +0100660 if logger.isEnabledFor(logging.INFO):
Victor Stinner22463aa2014-01-20 23:56:40 +0100661 t0 = self.time()
662 event_list = self._selector.select(timeout)
663 t1 = self.time()
Victor Stinner22463aa2014-01-20 23:56:40 +0100664 if t1-t0 >= 1:
665 level = logging.INFO
666 else:
667 level = logging.DEBUG
Victor Stinner4a2dbeb2014-01-22 12:26:01 +0100668 if timeout is not None:
669 logger.log(level, 'poll %.3f took %.3f seconds',
670 timeout, t1-t0)
671 else:
672 logger.log(level, 'poll took %.3f seconds', t1-t0)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700673 else:
Victor Stinner22463aa2014-01-20 23:56:40 +0100674 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700675 self._process_events(event_list)
676
677 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +0100678 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700679 while self._scheduled:
680 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +0100681 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700682 break
683 handle = heapq.heappop(self._scheduled)
684 self._ready.append(handle)
685
686 # This is the only place where callbacks are actually *called*.
687 # All other places just add them to ready.
688 # Note: We run all currently scheduled callbacks, but not any
689 # callbacks scheduled by callbacks run this time around --
690 # they will be run the next time (after another I/O poll).
691 # Use an idiom that is threadsafe without using locks.
692 ntodo = len(self._ready)
693 for i in range(ntodo):
694 handle = self._ready.popleft()
695 if not handle._cancelled:
696 handle._run()
697 handle = None # Needed to break cycles when an exception occurs.