blob: b74e9369414b75e4db75221c31b9c2787b1afe64 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
20import logging
21import socket
22import subprocess
23import time
24import os
25import sys
26
27from . import events
28from . import futures
29from . import tasks
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070030from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33__all__ = ['BaseEventLoop', 'Server']
34
35
36# Argument for default thread pool executor creation.
37_MAX_WORKERS = 5
38
39
40class _StopError(BaseException):
41 """Raised to stop the event loop."""
42
43
Victor Stinner1b0580b2014-02-13 09:24:37 +010044def _check_resolved_address(sock, address):
45 # Ensure that the address is already resolved to avoid the trap of hanging
46 # the entire event loop when the address requires doing a DNS lookup.
47 family = sock.family
48 if family not in (socket.AF_INET, socket.AF_INET6):
49 return
50
51 host, port = address
52 type_mask = 0
53 if hasattr(socket, 'SOCK_NONBLOCK'):
54 type_mask |= socket.SOCK_NONBLOCK
55 if hasattr(socket, 'SOCK_CLOEXEC'):
56 type_mask |= socket.SOCK_CLOEXEC
57 # Use getaddrinfo(AI_NUMERICHOST) to ensure that the address is
58 # already resolved.
59 try:
60 socket.getaddrinfo(host, port,
61 family=family,
62 type=(sock.type & ~type_mask),
63 proto=sock.proto,
64 flags=socket.AI_NUMERICHOST)
65 except socket.gaierror as err:
66 raise ValueError("address must be resolved (IP address), got %r: %s"
67 % (address, err))
68
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069def _raise_stop_error(*args):
70 raise _StopError
71
72
73class Server(events.AbstractServer):
74
75 def __init__(self, loop, sockets):
76 self.loop = loop
77 self.sockets = sockets
78 self.active_count = 0
79 self.waiters = []
80
81 def attach(self, transport):
82 assert self.sockets is not None
83 self.active_count += 1
84
85 def detach(self, transport):
86 assert self.active_count > 0
87 self.active_count -= 1
88 if self.active_count == 0 and self.sockets is None:
89 self._wakeup()
90
91 def close(self):
92 sockets = self.sockets
93 if sockets is not None:
94 self.sockets = None
95 for sock in sockets:
96 self.loop._stop_serving(sock)
97 if self.active_count == 0:
98 self._wakeup()
99
100 def _wakeup(self):
101 waiters = self.waiters
102 self.waiters = None
103 for waiter in waiters:
104 if not waiter.done():
105 waiter.set_result(waiter)
106
107 @tasks.coroutine
108 def wait_closed(self):
109 if self.sockets is None or self.waiters is None:
110 return
111 waiter = futures.Future(loop=self.loop)
112 self.waiters.append(waiter)
113 yield from waiter
114
115
116class BaseEventLoop(events.AbstractEventLoop):
117
118 def __init__(self):
119 self._ready = collections.deque()
120 self._scheduled = []
121 self._default_executor = None
122 self._internal_fds = 0
123 self._running = False
Victor Stinnered1654f2014-02-10 23:42:32 +0100124 self._clock_resolution = time.get_clock_info('monotonic').resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700125
126 def _make_socket_transport(self, sock, protocol, waiter=None, *,
127 extra=None, server=None):
128 """Create socket transport."""
129 raise NotImplementedError
130
131 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
132 server_side=False, server_hostname=None,
133 extra=None, server=None):
134 """Create SSL transport."""
135 raise NotImplementedError
136
137 def _make_datagram_transport(self, sock, protocol,
138 address=None, extra=None):
139 """Create datagram transport."""
140 raise NotImplementedError
141
142 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
143 extra=None):
144 """Create read pipe transport."""
145 raise NotImplementedError
146
147 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
148 extra=None):
149 """Create write pipe transport."""
150 raise NotImplementedError
151
152 @tasks.coroutine
153 def _make_subprocess_transport(self, protocol, args, shell,
154 stdin, stdout, stderr, bufsize,
155 extra=None, **kwargs):
156 """Create subprocess transport."""
157 raise NotImplementedError
158
159 def _read_from_self(self):
160 """XXX"""
161 raise NotImplementedError
162
163 def _write_to_self(self):
164 """XXX"""
165 raise NotImplementedError
166
167 def _process_events(self, event_list):
168 """Process selector events."""
169 raise NotImplementedError
170
171 def run_forever(self):
172 """Run until stop() is called."""
173 if self._running:
174 raise RuntimeError('Event loop is running.')
175 self._running = True
176 try:
177 while True:
178 try:
179 self._run_once()
180 except _StopError:
181 break
182 finally:
183 self._running = False
184
185 def run_until_complete(self, future):
186 """Run until the Future is done.
187
188 If the argument is a coroutine, it is wrapped in a Task.
189
190 XXX TBD: It would be disastrous to call run_until_complete()
191 with the same coroutine twice -- it would wrap it in two
192 different Tasks and that can't be good.
193
194 Return the Future's result, or raise its exception.
195 """
196 future = tasks.async(future, loop=self)
197 future.add_done_callback(_raise_stop_error)
198 self.run_forever()
199 future.remove_done_callback(_raise_stop_error)
200 if not future.done():
201 raise RuntimeError('Event loop stopped before Future completed.')
202
203 return future.result()
204
205 def stop(self):
206 """Stop running the event loop.
207
208 Every callback scheduled before stop() is called will run.
209 Callback scheduled after stop() is called won't. However,
210 those callbacks will run if run() is called again later.
211 """
212 self.call_soon(_raise_stop_error)
213
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200214 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700215 """Close the event loop.
216
217 This clears the queues and shuts down the executor,
218 but does not wait for the executor to finish.
219 """
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200220 self._ready.clear()
221 self._scheduled.clear()
222 executor = self._default_executor
223 if executor is not None:
224 self._default_executor = None
225 executor.shutdown(wait=False)
226
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227 def is_running(self):
228 """Returns running status of event loop."""
229 return self._running
230
231 def time(self):
232 """Return the time according to the event loop's clock."""
233 return time.monotonic()
234
235 def call_later(self, delay, callback, *args):
236 """Arrange for a callback to be called at a given time.
237
238 Return a Handle: an opaque object with a cancel() method that
239 can be used to cancel the call.
240
241 The delay can be an int or float, expressed in seconds. It is
242 always a relative time.
243
244 Each callback will be called exactly once. If two callbacks
245 are scheduled for exactly the same time, it undefined which
246 will be called first.
247
248 Any positional arguments after the callback will be passed to
249 the callback when it is called.
250 """
251 return self.call_at(self.time() + delay, callback, *args)
252
253 def call_at(self, when, callback, *args):
254 """Like call_later(), but uses an absolute time."""
Victor Stinner9af4a242014-02-11 11:34:30 +0100255 if tasks.iscoroutinefunction(callback):
256 raise TypeError("coroutines cannot be used with call_at()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700257 timer = events.TimerHandle(when, callback, args)
258 heapq.heappush(self._scheduled, timer)
259 return timer
260
261 def call_soon(self, callback, *args):
262 """Arrange for a callback to be called as soon as possible.
263
264 This operates as a FIFO queue, callbacks are called in the
265 order in which they are registered. Each callback will be
266 called exactly once.
267
268 Any positional arguments after the callback will be passed to
269 the callback when it is called.
270 """
Victor Stinner9af4a242014-02-11 11:34:30 +0100271 if tasks.iscoroutinefunction(callback):
272 raise TypeError("coroutines cannot be used with call_soon()")
Victor Stinnerdc62b7e2014-02-10 00:45:44 +0100273 handle = events.Handle(callback, args)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700274 self._ready.append(handle)
275 return handle
276
277 def call_soon_threadsafe(self, callback, *args):
278 """XXX"""
279 handle = self.call_soon(callback, *args)
280 self._write_to_self()
281 return handle
282
283 def run_in_executor(self, executor, callback, *args):
Victor Stinner9af4a242014-02-11 11:34:30 +0100284 if tasks.iscoroutinefunction(callback):
285 raise TypeError("coroutines cannot be used with run_in_executor()")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700286 if isinstance(callback, events.Handle):
287 assert not args
288 assert not isinstance(callback, events.TimerHandle)
289 if callback._cancelled:
290 f = futures.Future(loop=self)
291 f.set_result(None)
292 return f
293 callback, args = callback._callback, callback._args
294 if executor is None:
295 executor = self._default_executor
296 if executor is None:
297 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
298 self._default_executor = executor
299 return futures.wrap_future(executor.submit(callback, *args), loop=self)
300
301 def set_default_executor(self, executor):
302 self._default_executor = executor
303
304 def getaddrinfo(self, host, port, *,
305 family=0, type=0, proto=0, flags=0):
306 return self.run_in_executor(None, socket.getaddrinfo,
307 host, port, family, type, proto, flags)
308
309 def getnameinfo(self, sockaddr, flags=0):
310 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
311
312 @tasks.coroutine
313 def create_connection(self, protocol_factory, host=None, port=None, *,
314 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700315 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700316 """XXX"""
Guido van Rossum21c85a72013-11-01 14:16:54 -0700317 if server_hostname is not None and not ssl:
318 raise ValueError('server_hostname is only meaningful with ssl')
319
320 if server_hostname is None and ssl:
321 # Use host as default for server_hostname. It is an error
322 # if host is empty or not set, e.g. when an
323 # already-connected socket was passed or when only a port
324 # is given. To avoid this error, you can pass
325 # server_hostname='' -- this will bypass the hostname
326 # check. (This also means that if host is a numeric
327 # IP/IPv6 address, we will attempt to verify that exact
328 # address; this will probably fail, but it is possible to
329 # create a certificate for a specific IP address, so we
330 # don't judge it here.)
331 if not host:
332 raise ValueError('You must set server_hostname '
333 'when using ssl without a host')
334 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700335
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700336 if host is not None or port is not None:
337 if sock is not None:
338 raise ValueError(
339 'host/port and sock can not be specified at the same time')
340
341 f1 = self.getaddrinfo(
342 host, port, family=family,
343 type=socket.SOCK_STREAM, proto=proto, flags=flags)
344 fs = [f1]
345 if local_addr is not None:
346 f2 = self.getaddrinfo(
347 *local_addr, family=family,
348 type=socket.SOCK_STREAM, proto=proto, flags=flags)
349 fs.append(f2)
350 else:
351 f2 = None
352
353 yield from tasks.wait(fs, loop=self)
354
355 infos = f1.result()
356 if not infos:
357 raise OSError('getaddrinfo() returned empty list')
358 if f2 is not None:
359 laddr_infos = f2.result()
360 if not laddr_infos:
361 raise OSError('getaddrinfo() returned empty list')
362
363 exceptions = []
364 for family, type, proto, cname, address in infos:
365 try:
366 sock = socket.socket(family=family, type=type, proto=proto)
367 sock.setblocking(False)
368 if f2 is not None:
369 for _, _, _, _, laddr in laddr_infos:
370 try:
371 sock.bind(laddr)
372 break
373 except OSError as exc:
374 exc = OSError(
375 exc.errno, 'error while '
376 'attempting to bind on address '
377 '{!r}: {}'.format(
378 laddr, exc.strerror.lower()))
379 exceptions.append(exc)
380 else:
381 sock.close()
382 sock = None
383 continue
384 yield from self.sock_connect(sock, address)
385 except OSError as exc:
386 if sock is not None:
387 sock.close()
388 exceptions.append(exc)
389 else:
390 break
391 else:
392 if len(exceptions) == 1:
393 raise exceptions[0]
394 else:
395 # If they all have the same str(), raise one.
396 model = str(exceptions[0])
397 if all(str(exc) == model for exc in exceptions):
398 raise exceptions[0]
399 # Raise a combined exception so the user can see all
400 # the various error messages.
401 raise OSError('Multiple exceptions: {}'.format(
402 ', '.join(str(exc) for exc in exceptions)))
403
404 elif sock is None:
405 raise ValueError(
406 'host and port was not specified and no sock specified')
407
408 sock.setblocking(False)
409
Yury Selivanovb057c522014-02-18 12:15:06 -0500410 transport, protocol = yield from self._create_connection_transport(
411 sock, protocol_factory, ssl, server_hostname)
412 return transport, protocol
413
414 @tasks.coroutine
415 def _create_connection_transport(self, sock, protocol_factory, ssl,
416 server_hostname):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700417 protocol = protocol_factory()
418 waiter = futures.Future(loop=self)
419 if ssl:
420 sslcontext = None if isinstance(ssl, bool) else ssl
421 transport = self._make_ssl_transport(
422 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700423 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700424 else:
425 transport = self._make_socket_transport(sock, protocol, waiter)
426
427 yield from waiter
428 return transport, protocol
429
430 @tasks.coroutine
431 def create_datagram_endpoint(self, protocol_factory,
432 local_addr=None, remote_addr=None, *,
433 family=0, proto=0, flags=0):
434 """Create datagram connection."""
435 if not (local_addr or remote_addr):
436 if family == 0:
437 raise ValueError('unexpected address family')
438 addr_pairs_info = (((family, proto), (None, None)),)
439 else:
440 # join addresss by (family, protocol)
441 addr_infos = collections.OrderedDict()
442 for idx, addr in ((0, local_addr), (1, remote_addr)):
443 if addr is not None:
444 assert isinstance(addr, tuple) and len(addr) == 2, (
445 '2-tuple is expected')
446
447 infos = yield from self.getaddrinfo(
448 *addr, family=family, type=socket.SOCK_DGRAM,
449 proto=proto, flags=flags)
450 if not infos:
451 raise OSError('getaddrinfo() returned empty list')
452
453 for fam, _, pro, _, address in infos:
454 key = (fam, pro)
455 if key not in addr_infos:
456 addr_infos[key] = [None, None]
457 addr_infos[key][idx] = address
458
459 # each addr has to have info for each (family, proto) pair
460 addr_pairs_info = [
461 (key, addr_pair) for key, addr_pair in addr_infos.items()
462 if not ((local_addr and addr_pair[0] is None) or
463 (remote_addr and addr_pair[1] is None))]
464
465 if not addr_pairs_info:
466 raise ValueError('can not get address information')
467
468 exceptions = []
469
470 for ((family, proto),
471 (local_address, remote_address)) in addr_pairs_info:
472 sock = None
473 r_addr = None
474 try:
475 sock = socket.socket(
476 family=family, type=socket.SOCK_DGRAM, proto=proto)
477 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
478 sock.setblocking(False)
479
480 if local_addr:
481 sock.bind(local_address)
482 if remote_addr:
483 yield from self.sock_connect(sock, remote_address)
484 r_addr = remote_address
485 except OSError as exc:
486 if sock is not None:
487 sock.close()
488 exceptions.append(exc)
489 else:
490 break
491 else:
492 raise exceptions[0]
493
494 protocol = protocol_factory()
495 transport = self._make_datagram_transport(sock, protocol, r_addr)
496 return transport, protocol
497
498 @tasks.coroutine
499 def create_server(self, protocol_factory, host=None, port=None,
500 *,
501 family=socket.AF_UNSPEC,
502 flags=socket.AI_PASSIVE,
503 sock=None,
504 backlog=100,
505 ssl=None,
506 reuse_address=None):
507 """XXX"""
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700508 if isinstance(ssl, bool):
509 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700510 if host is not None or port is not None:
511 if sock is not None:
512 raise ValueError(
513 'host/port and sock can not be specified at the same time')
514
515 AF_INET6 = getattr(socket, 'AF_INET6', 0)
516 if reuse_address is None:
517 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
518 sockets = []
519 if host == '':
520 host = None
521
522 infos = yield from self.getaddrinfo(
523 host, port, family=family,
524 type=socket.SOCK_STREAM, proto=0, flags=flags)
525 if not infos:
526 raise OSError('getaddrinfo() returned empty list')
527
528 completed = False
529 try:
530 for res in infos:
531 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700532 try:
533 sock = socket.socket(af, socktype, proto)
534 except socket.error:
535 # Assume it's a bad family/type/protocol combination.
536 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700537 sockets.append(sock)
538 if reuse_address:
539 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
540 True)
541 # Disable IPv4/IPv6 dual stack support (enabled by
542 # default on Linux) which makes a single socket
543 # listen on both address families.
544 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
545 sock.setsockopt(socket.IPPROTO_IPV6,
546 socket.IPV6_V6ONLY,
547 True)
548 try:
549 sock.bind(sa)
550 except OSError as err:
551 raise OSError(err.errno, 'error while attempting '
552 'to bind on address %r: %s'
553 % (sa, err.strerror.lower()))
554 completed = True
555 finally:
556 if not completed:
557 for sock in sockets:
558 sock.close()
559 else:
560 if sock is None:
561 raise ValueError(
562 'host and port was not specified and no sock specified')
563 sockets = [sock]
564
565 server = Server(self, sockets)
566 for sock in sockets:
567 sock.listen(backlog)
568 sock.setblocking(False)
569 self._start_serving(protocol_factory, sock, ssl, server)
570 return server
571
572 @tasks.coroutine
573 def connect_read_pipe(self, protocol_factory, pipe):
574 protocol = protocol_factory()
575 waiter = futures.Future(loop=self)
576 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
577 yield from waiter
578 return transport, protocol
579
580 @tasks.coroutine
581 def connect_write_pipe(self, protocol_factory, pipe):
582 protocol = protocol_factory()
583 waiter = futures.Future(loop=self)
584 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
585 yield from waiter
586 return transport, protocol
587
588 @tasks.coroutine
589 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
590 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
591 universal_newlines=False, shell=True, bufsize=0,
592 **kwargs):
Victor Stinner20e07432014-02-11 11:44:56 +0100593 if not isinstance(cmd, (bytes, str)):
Victor Stinnere623a122014-01-29 14:35:15 -0800594 raise ValueError("cmd must be a string")
595 if universal_newlines:
596 raise ValueError("universal_newlines must be False")
597 if not shell:
Victor Stinner323748e2014-01-31 12:28:30 +0100598 raise ValueError("shell must be True")
Victor Stinnere623a122014-01-29 14:35:15 -0800599 if bufsize != 0:
600 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700601 protocol = protocol_factory()
602 transport = yield from self._make_subprocess_transport(
603 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
604 return transport, protocol
605
606 @tasks.coroutine
Victor Stinner20e07432014-02-11 11:44:56 +0100607 def subprocess_exec(self, protocol_factory, program, *args, stdin=subprocess.PIPE,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700608 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
609 universal_newlines=False, shell=False, bufsize=0,
610 **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800611 if universal_newlines:
612 raise ValueError("universal_newlines must be False")
613 if shell:
614 raise ValueError("shell must be False")
615 if bufsize != 0:
616 raise ValueError("bufsize must be 0")
Victor Stinner20e07432014-02-11 11:44:56 +0100617 popen_args = (program,) + args
618 for arg in popen_args:
619 if not isinstance(arg, (str, bytes)):
620 raise TypeError("program arguments must be "
621 "a bytes or text string, not %s"
622 % type(arg).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700623 protocol = protocol_factory()
624 transport = yield from self._make_subprocess_transport(
Victor Stinner20e07432014-02-11 11:44:56 +0100625 protocol, popen_args, False, stdin, stdout, stderr, bufsize, **kwargs)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700626 return transport, protocol
627
628 def _add_callback(self, handle):
629 """Add a Handle to ready or scheduled."""
630 assert isinstance(handle, events.Handle), 'A Handle is required here'
631 if handle._cancelled:
632 return
633 if isinstance(handle, events.TimerHandle):
634 heapq.heappush(self._scheduled, handle)
635 else:
636 self._ready.append(handle)
637
638 def _add_callback_signalsafe(self, handle):
639 """Like _add_callback() but called from a signal handler."""
640 self._add_callback(handle)
641 self._write_to_self()
642
643 def _run_once(self):
644 """Run one full iteration of the event loop.
645
646 This calls all currently ready callbacks, polls for I/O,
647 schedules the resulting callbacks, and finally schedules
648 'call_later' callbacks.
649 """
650 # Remove delayed calls that were cancelled from head of queue.
651 while self._scheduled and self._scheduled[0]._cancelled:
652 heapq.heappop(self._scheduled)
653
654 timeout = None
655 if self._ready:
656 timeout = 0
657 elif self._scheduled:
658 # Compute the desired timeout.
659 when = self._scheduled[0]._when
660 deadline = max(0, when - self.time())
661 if timeout is None:
662 timeout = deadline
663 else:
664 timeout = min(timeout, deadline)
665
666 # TODO: Instrumentation only in debug mode?
Victor Stinner49d0f4e2014-01-31 12:59:43 +0100667 if logger.isEnabledFor(logging.INFO):
Victor Stinner22463aa2014-01-20 23:56:40 +0100668 t0 = self.time()
669 event_list = self._selector.select(timeout)
670 t1 = self.time()
Victor Stinner22463aa2014-01-20 23:56:40 +0100671 if t1-t0 >= 1:
672 level = logging.INFO
673 else:
674 level = logging.DEBUG
Victor Stinner4a2dbeb2014-01-22 12:26:01 +0100675 if timeout is not None:
676 logger.log(level, 'poll %.3f took %.3f seconds',
677 timeout, t1-t0)
678 else:
679 logger.log(level, 'poll took %.3f seconds', t1-t0)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700680 else:
Victor Stinner22463aa2014-01-20 23:56:40 +0100681 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700682 self._process_events(event_list)
683
684 # Handle 'later' callbacks that are ready.
Victor Stinnered1654f2014-02-10 23:42:32 +0100685 end_time = self.time() + self._clock_resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686 while self._scheduled:
687 handle = self._scheduled[0]
Victor Stinnered1654f2014-02-10 23:42:32 +0100688 if handle._when >= end_time:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700689 break
690 handle = heapq.heappop(self._scheduled)
691 self._ready.append(handle)
692
693 # This is the only place where callbacks are actually *called*.
694 # All other places just add them to ready.
695 # Note: We run all currently scheduled callbacks, but not any
696 # callbacks scheduled by callbacks run this time around --
697 # they will be run the next time (after another I/O poll).
698 # Use an idiom that is threadsafe without using locks.
699 ntodo = len(self._ready)
700 for i in range(ntodo):
701 handle = self._ready.popleft()
702 if not handle._cancelled:
703 handle._run()
704 handle = None # Needed to break cycles when an exception occurs.