blob: 58c3520ea3c93e3084be65f6b01c6005c321c135 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
20import logging
21import socket
22import subprocess
23import time
24import os
25import sys
26
27from . import events
28from . import futures
29from . import tasks
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070030from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33__all__ = ['BaseEventLoop', 'Server']
34
35
36# Argument for default thread pool executor creation.
37_MAX_WORKERS = 5
38
39
40class _StopError(BaseException):
41 """Raised to stop the event loop."""
42
43
44def _raise_stop_error(*args):
45 raise _StopError
46
47
48class Server(events.AbstractServer):
49
50 def __init__(self, loop, sockets):
51 self.loop = loop
52 self.sockets = sockets
53 self.active_count = 0
54 self.waiters = []
55
56 def attach(self, transport):
57 assert self.sockets is not None
58 self.active_count += 1
59
60 def detach(self, transport):
61 assert self.active_count > 0
62 self.active_count -= 1
63 if self.active_count == 0 and self.sockets is None:
64 self._wakeup()
65
66 def close(self):
67 sockets = self.sockets
68 if sockets is not None:
69 self.sockets = None
70 for sock in sockets:
71 self.loop._stop_serving(sock)
72 if self.active_count == 0:
73 self._wakeup()
74
75 def _wakeup(self):
76 waiters = self.waiters
77 self.waiters = None
78 for waiter in waiters:
79 if not waiter.done():
80 waiter.set_result(waiter)
81
82 @tasks.coroutine
83 def wait_closed(self):
84 if self.sockets is None or self.waiters is None:
85 return
86 waiter = futures.Future(loop=self.loop)
87 self.waiters.append(waiter)
88 yield from waiter
89
90
91class BaseEventLoop(events.AbstractEventLoop):
92
93 def __init__(self):
94 self._ready = collections.deque()
95 self._scheduled = []
96 self._default_executor = None
97 self._internal_fds = 0
98 self._running = False
Victor Stinner669eeaf2014-01-26 00:02:31 +010099 self._granularity = time.get_clock_info('monotonic').resolution
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700100
101 def _make_socket_transport(self, sock, protocol, waiter=None, *,
102 extra=None, server=None):
103 """Create socket transport."""
104 raise NotImplementedError
105
106 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
107 server_side=False, server_hostname=None,
108 extra=None, server=None):
109 """Create SSL transport."""
110 raise NotImplementedError
111
112 def _make_datagram_transport(self, sock, protocol,
113 address=None, extra=None):
114 """Create datagram transport."""
115 raise NotImplementedError
116
117 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
118 extra=None):
119 """Create read pipe transport."""
120 raise NotImplementedError
121
122 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
123 extra=None):
124 """Create write pipe transport."""
125 raise NotImplementedError
126
127 @tasks.coroutine
128 def _make_subprocess_transport(self, protocol, args, shell,
129 stdin, stdout, stderr, bufsize,
130 extra=None, **kwargs):
131 """Create subprocess transport."""
132 raise NotImplementedError
133
134 def _read_from_self(self):
135 """XXX"""
136 raise NotImplementedError
137
138 def _write_to_self(self):
139 """XXX"""
140 raise NotImplementedError
141
142 def _process_events(self, event_list):
143 """Process selector events."""
144 raise NotImplementedError
145
146 def run_forever(self):
147 """Run until stop() is called."""
148 if self._running:
149 raise RuntimeError('Event loop is running.')
150 self._running = True
151 try:
152 while True:
153 try:
154 self._run_once()
155 except _StopError:
156 break
157 finally:
158 self._running = False
159
160 def run_until_complete(self, future):
161 """Run until the Future is done.
162
163 If the argument is a coroutine, it is wrapped in a Task.
164
165 XXX TBD: It would be disastrous to call run_until_complete()
166 with the same coroutine twice -- it would wrap it in two
167 different Tasks and that can't be good.
168
169 Return the Future's result, or raise its exception.
170 """
171 future = tasks.async(future, loop=self)
172 future.add_done_callback(_raise_stop_error)
173 self.run_forever()
174 future.remove_done_callback(_raise_stop_error)
175 if not future.done():
176 raise RuntimeError('Event loop stopped before Future completed.')
177
178 return future.result()
179
180 def stop(self):
181 """Stop running the event loop.
182
183 Every callback scheduled before stop() is called will run.
184 Callback scheduled after stop() is called won't. However,
185 those callbacks will run if run() is called again later.
186 """
187 self.call_soon(_raise_stop_error)
188
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200189 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700190 """Close the event loop.
191
192 This clears the queues and shuts down the executor,
193 but does not wait for the executor to finish.
194 """
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200195 self._ready.clear()
196 self._scheduled.clear()
197 executor = self._default_executor
198 if executor is not None:
199 self._default_executor = None
200 executor.shutdown(wait=False)
201
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700202 def is_running(self):
203 """Returns running status of event loop."""
204 return self._running
205
206 def time(self):
207 """Return the time according to the event loop's clock."""
208 return time.monotonic()
209
210 def call_later(self, delay, callback, *args):
211 """Arrange for a callback to be called at a given time.
212
213 Return a Handle: an opaque object with a cancel() method that
214 can be used to cancel the call.
215
216 The delay can be an int or float, expressed in seconds. It is
217 always a relative time.
218
219 Each callback will be called exactly once. If two callbacks
220 are scheduled for exactly the same time, it undefined which
221 will be called first.
222
223 Any positional arguments after the callback will be passed to
224 the callback when it is called.
225 """
226 return self.call_at(self.time() + delay, callback, *args)
227
228 def call_at(self, when, callback, *args):
229 """Like call_later(), but uses an absolute time."""
230 timer = events.TimerHandle(when, callback, args)
231 heapq.heappush(self._scheduled, timer)
232 return timer
233
234 def call_soon(self, callback, *args):
235 """Arrange for a callback to be called as soon as possible.
236
237 This operates as a FIFO queue, callbacks are called in the
238 order in which they are registered. Each callback will be
239 called exactly once.
240
241 Any positional arguments after the callback will be passed to
242 the callback when it is called.
243 """
244 handle = events.make_handle(callback, args)
245 self._ready.append(handle)
246 return handle
247
248 def call_soon_threadsafe(self, callback, *args):
249 """XXX"""
250 handle = self.call_soon(callback, *args)
251 self._write_to_self()
252 return handle
253
254 def run_in_executor(self, executor, callback, *args):
255 if isinstance(callback, events.Handle):
256 assert not args
257 assert not isinstance(callback, events.TimerHandle)
258 if callback._cancelled:
259 f = futures.Future(loop=self)
260 f.set_result(None)
261 return f
262 callback, args = callback._callback, callback._args
263 if executor is None:
264 executor = self._default_executor
265 if executor is None:
266 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
267 self._default_executor = executor
268 return futures.wrap_future(executor.submit(callback, *args), loop=self)
269
270 def set_default_executor(self, executor):
271 self._default_executor = executor
272
273 def getaddrinfo(self, host, port, *,
274 family=0, type=0, proto=0, flags=0):
275 return self.run_in_executor(None, socket.getaddrinfo,
276 host, port, family, type, proto, flags)
277
278 def getnameinfo(self, sockaddr, flags=0):
279 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
280
281 @tasks.coroutine
282 def create_connection(self, protocol_factory, host=None, port=None, *,
283 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700284 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700285 """XXX"""
Guido van Rossum21c85a72013-11-01 14:16:54 -0700286 if server_hostname is not None and not ssl:
287 raise ValueError('server_hostname is only meaningful with ssl')
288
289 if server_hostname is None and ssl:
290 # Use host as default for server_hostname. It is an error
291 # if host is empty or not set, e.g. when an
292 # already-connected socket was passed or when only a port
293 # is given. To avoid this error, you can pass
294 # server_hostname='' -- this will bypass the hostname
295 # check. (This also means that if host is a numeric
296 # IP/IPv6 address, we will attempt to verify that exact
297 # address; this will probably fail, but it is possible to
298 # create a certificate for a specific IP address, so we
299 # don't judge it here.)
300 if not host:
301 raise ValueError('You must set server_hostname '
302 'when using ssl without a host')
303 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700304
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700305 if host is not None or port is not None:
306 if sock is not None:
307 raise ValueError(
308 'host/port and sock can not be specified at the same time')
309
310 f1 = self.getaddrinfo(
311 host, port, family=family,
312 type=socket.SOCK_STREAM, proto=proto, flags=flags)
313 fs = [f1]
314 if local_addr is not None:
315 f2 = self.getaddrinfo(
316 *local_addr, family=family,
317 type=socket.SOCK_STREAM, proto=proto, flags=flags)
318 fs.append(f2)
319 else:
320 f2 = None
321
322 yield from tasks.wait(fs, loop=self)
323
324 infos = f1.result()
325 if not infos:
326 raise OSError('getaddrinfo() returned empty list')
327 if f2 is not None:
328 laddr_infos = f2.result()
329 if not laddr_infos:
330 raise OSError('getaddrinfo() returned empty list')
331
332 exceptions = []
333 for family, type, proto, cname, address in infos:
334 try:
335 sock = socket.socket(family=family, type=type, proto=proto)
336 sock.setblocking(False)
337 if f2 is not None:
338 for _, _, _, _, laddr in laddr_infos:
339 try:
340 sock.bind(laddr)
341 break
342 except OSError as exc:
343 exc = OSError(
344 exc.errno, 'error while '
345 'attempting to bind on address '
346 '{!r}: {}'.format(
347 laddr, exc.strerror.lower()))
348 exceptions.append(exc)
349 else:
350 sock.close()
351 sock = None
352 continue
353 yield from self.sock_connect(sock, address)
354 except OSError as exc:
355 if sock is not None:
356 sock.close()
357 exceptions.append(exc)
358 else:
359 break
360 else:
361 if len(exceptions) == 1:
362 raise exceptions[0]
363 else:
364 # If they all have the same str(), raise one.
365 model = str(exceptions[0])
366 if all(str(exc) == model for exc in exceptions):
367 raise exceptions[0]
368 # Raise a combined exception so the user can see all
369 # the various error messages.
370 raise OSError('Multiple exceptions: {}'.format(
371 ', '.join(str(exc) for exc in exceptions)))
372
373 elif sock is None:
374 raise ValueError(
375 'host and port was not specified and no sock specified')
376
377 sock.setblocking(False)
378
379 protocol = protocol_factory()
380 waiter = futures.Future(loop=self)
381 if ssl:
382 sslcontext = None if isinstance(ssl, bool) else ssl
383 transport = self._make_ssl_transport(
384 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700385 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700386 else:
387 transport = self._make_socket_transport(sock, protocol, waiter)
388
389 yield from waiter
390 return transport, protocol
391
392 @tasks.coroutine
393 def create_datagram_endpoint(self, protocol_factory,
394 local_addr=None, remote_addr=None, *,
395 family=0, proto=0, flags=0):
396 """Create datagram connection."""
397 if not (local_addr or remote_addr):
398 if family == 0:
399 raise ValueError('unexpected address family')
400 addr_pairs_info = (((family, proto), (None, None)),)
401 else:
402 # join addresss by (family, protocol)
403 addr_infos = collections.OrderedDict()
404 for idx, addr in ((0, local_addr), (1, remote_addr)):
405 if addr is not None:
406 assert isinstance(addr, tuple) and len(addr) == 2, (
407 '2-tuple is expected')
408
409 infos = yield from self.getaddrinfo(
410 *addr, family=family, type=socket.SOCK_DGRAM,
411 proto=proto, flags=flags)
412 if not infos:
413 raise OSError('getaddrinfo() returned empty list')
414
415 for fam, _, pro, _, address in infos:
416 key = (fam, pro)
417 if key not in addr_infos:
418 addr_infos[key] = [None, None]
419 addr_infos[key][idx] = address
420
421 # each addr has to have info for each (family, proto) pair
422 addr_pairs_info = [
423 (key, addr_pair) for key, addr_pair in addr_infos.items()
424 if not ((local_addr and addr_pair[0] is None) or
425 (remote_addr and addr_pair[1] is None))]
426
427 if not addr_pairs_info:
428 raise ValueError('can not get address information')
429
430 exceptions = []
431
432 for ((family, proto),
433 (local_address, remote_address)) in addr_pairs_info:
434 sock = None
435 r_addr = None
436 try:
437 sock = socket.socket(
438 family=family, type=socket.SOCK_DGRAM, proto=proto)
439 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
440 sock.setblocking(False)
441
442 if local_addr:
443 sock.bind(local_address)
444 if remote_addr:
445 yield from self.sock_connect(sock, remote_address)
446 r_addr = remote_address
447 except OSError as exc:
448 if sock is not None:
449 sock.close()
450 exceptions.append(exc)
451 else:
452 break
453 else:
454 raise exceptions[0]
455
456 protocol = protocol_factory()
457 transport = self._make_datagram_transport(sock, protocol, r_addr)
458 return transport, protocol
459
460 @tasks.coroutine
461 def create_server(self, protocol_factory, host=None, port=None,
462 *,
463 family=socket.AF_UNSPEC,
464 flags=socket.AI_PASSIVE,
465 sock=None,
466 backlog=100,
467 ssl=None,
468 reuse_address=None):
469 """XXX"""
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700470 if isinstance(ssl, bool):
471 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700472 if host is not None or port is not None:
473 if sock is not None:
474 raise ValueError(
475 'host/port and sock can not be specified at the same time')
476
477 AF_INET6 = getattr(socket, 'AF_INET6', 0)
478 if reuse_address is None:
479 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
480 sockets = []
481 if host == '':
482 host = None
483
484 infos = yield from self.getaddrinfo(
485 host, port, family=family,
486 type=socket.SOCK_STREAM, proto=0, flags=flags)
487 if not infos:
488 raise OSError('getaddrinfo() returned empty list')
489
490 completed = False
491 try:
492 for res in infos:
493 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700494 try:
495 sock = socket.socket(af, socktype, proto)
496 except socket.error:
497 # Assume it's a bad family/type/protocol combination.
498 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700499 sockets.append(sock)
500 if reuse_address:
501 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
502 True)
503 # Disable IPv4/IPv6 dual stack support (enabled by
504 # default on Linux) which makes a single socket
505 # listen on both address families.
506 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
507 sock.setsockopt(socket.IPPROTO_IPV6,
508 socket.IPV6_V6ONLY,
509 True)
510 try:
511 sock.bind(sa)
512 except OSError as err:
513 raise OSError(err.errno, 'error while attempting '
514 'to bind on address %r: %s'
515 % (sa, err.strerror.lower()))
516 completed = True
517 finally:
518 if not completed:
519 for sock in sockets:
520 sock.close()
521 else:
522 if sock is None:
523 raise ValueError(
524 'host and port was not specified and no sock specified')
525 sockets = [sock]
526
527 server = Server(self, sockets)
528 for sock in sockets:
529 sock.listen(backlog)
530 sock.setblocking(False)
531 self._start_serving(protocol_factory, sock, ssl, server)
532 return server
533
534 @tasks.coroutine
535 def connect_read_pipe(self, protocol_factory, pipe):
536 protocol = protocol_factory()
537 waiter = futures.Future(loop=self)
538 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
539 yield from waiter
540 return transport, protocol
541
542 @tasks.coroutine
543 def connect_write_pipe(self, protocol_factory, pipe):
544 protocol = protocol_factory()
545 waiter = futures.Future(loop=self)
546 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
547 yield from waiter
548 return transport, protocol
549
550 @tasks.coroutine
551 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
552 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
553 universal_newlines=False, shell=True, bufsize=0,
554 **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800555 if not isinstance(cmd, str):
556 raise ValueError("cmd must be a string")
557 if universal_newlines:
558 raise ValueError("universal_newlines must be False")
559 if not shell:
560 raise ValueError("shell must be False")
561 if bufsize != 0:
562 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700563 protocol = protocol_factory()
564 transport = yield from self._make_subprocess_transport(
565 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
566 return transport, protocol
567
568 @tasks.coroutine
569 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
570 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
571 universal_newlines=False, shell=False, bufsize=0,
572 **kwargs):
Victor Stinnere623a122014-01-29 14:35:15 -0800573 if universal_newlines:
574 raise ValueError("universal_newlines must be False")
575 if shell:
576 raise ValueError("shell must be False")
577 if bufsize != 0:
578 raise ValueError("bufsize must be 0")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579 protocol = protocol_factory()
580 transport = yield from self._make_subprocess_transport(
581 protocol, args, False, stdin, stdout, stderr, bufsize, **kwargs)
582 return transport, protocol
583
584 def _add_callback(self, handle):
585 """Add a Handle to ready or scheduled."""
586 assert isinstance(handle, events.Handle), 'A Handle is required here'
587 if handle._cancelled:
588 return
589 if isinstance(handle, events.TimerHandle):
590 heapq.heappush(self._scheduled, handle)
591 else:
592 self._ready.append(handle)
593
594 def _add_callback_signalsafe(self, handle):
595 """Like _add_callback() but called from a signal handler."""
596 self._add_callback(handle)
597 self._write_to_self()
598
599 def _run_once(self):
600 """Run one full iteration of the event loop.
601
602 This calls all currently ready callbacks, polls for I/O,
603 schedules the resulting callbacks, and finally schedules
604 'call_later' callbacks.
605 """
606 # Remove delayed calls that were cancelled from head of queue.
607 while self._scheduled and self._scheduled[0]._cancelled:
608 heapq.heappop(self._scheduled)
609
610 timeout = None
611 if self._ready:
612 timeout = 0
613 elif self._scheduled:
614 # Compute the desired timeout.
615 when = self._scheduled[0]._when
616 deadline = max(0, when - self.time())
617 if timeout is None:
618 timeout = deadline
619 else:
620 timeout = min(timeout, deadline)
621
622 # TODO: Instrumentation only in debug mode?
Victor Stinner22463aa2014-01-20 23:56:40 +0100623 if logger.isEnabledFor(logging.INFO):
624 t0 = self.time()
625 event_list = self._selector.select(timeout)
626 t1 = self.time()
Victor Stinner22463aa2014-01-20 23:56:40 +0100627 if t1-t0 >= 1:
628 level = logging.INFO
629 else:
630 level = logging.DEBUG
Victor Stinner4a2dbeb2014-01-22 12:26:01 +0100631 if timeout is not None:
632 logger.log(level, 'poll %.3f took %.3f seconds',
633 timeout, t1-t0)
634 else:
635 logger.log(level, 'poll took %.3f seconds', t1-t0)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700636 else:
Victor Stinner22463aa2014-01-20 23:56:40 +0100637 event_list = self._selector.select(timeout)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700638 self._process_events(event_list)
639
640 # Handle 'later' callbacks that are ready.
Victor Stinner669eeaf2014-01-26 00:02:31 +0100641 now = self.time() + self._granularity
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700642 while self._scheduled:
643 handle = self._scheduled[0]
644 if handle._when > now:
645 break
646 handle = heapq.heappop(self._scheduled)
647 self._ready.append(handle)
648
649 # This is the only place where callbacks are actually *called*.
650 # All other places just add them to ready.
651 # Note: We run all currently scheduled callbacks, but not any
652 # callbacks scheduled by callbacks run this time around --
653 # they will be run the next time (after another I/O poll).
654 # Use an idiom that is threadsafe without using locks.
655 ntodo = len(self._ready)
656 for i in range(ntodo):
657 handle = self._ready.popleft()
658 if not handle._cancelled:
659 handle._run()
660 handle = None # Needed to break cycles when an exception occurs.