blob: f2d117bdbd2ac9ea284f2557102cf72cfd98843d [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
20import logging
21import socket
22import subprocess
23import time
24import os
25import sys
26
27from . import events
28from . import futures
29from . import tasks
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070030from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33__all__ = ['BaseEventLoop', 'Server']
34
35
36# Argument for default thread pool executor creation.
37_MAX_WORKERS = 5
38
39
40class _StopError(BaseException):
41 """Raised to stop the event loop."""
42
43
44def _raise_stop_error(*args):
45 raise _StopError
46
47
48class Server(events.AbstractServer):
49
50 def __init__(self, loop, sockets):
51 self.loop = loop
52 self.sockets = sockets
53 self.active_count = 0
54 self.waiters = []
55
56 def attach(self, transport):
57 assert self.sockets is not None
58 self.active_count += 1
59
60 def detach(self, transport):
61 assert self.active_count > 0
62 self.active_count -= 1
63 if self.active_count == 0 and self.sockets is None:
64 self._wakeup()
65
66 def close(self):
67 sockets = self.sockets
68 if sockets is not None:
69 self.sockets = None
70 for sock in sockets:
71 self.loop._stop_serving(sock)
72 if self.active_count == 0:
73 self._wakeup()
74
75 def _wakeup(self):
76 waiters = self.waiters
77 self.waiters = None
78 for waiter in waiters:
79 if not waiter.done():
80 waiter.set_result(waiter)
81
82 @tasks.coroutine
83 def wait_closed(self):
84 if self.sockets is None or self.waiters is None:
85 return
86 waiter = futures.Future(loop=self.loop)
87 self.waiters.append(waiter)
88 yield from waiter
89
90
91class BaseEventLoop(events.AbstractEventLoop):
92
93 def __init__(self):
94 self._ready = collections.deque()
95 self._scheduled = []
96 self._default_executor = None
97 self._internal_fds = 0
98 self._running = False
99
100 def _make_socket_transport(self, sock, protocol, waiter=None, *,
101 extra=None, server=None):
102 """Create socket transport."""
103 raise NotImplementedError
104
105 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
106 server_side=False, server_hostname=None,
107 extra=None, server=None):
108 """Create SSL transport."""
109 raise NotImplementedError
110
111 def _make_datagram_transport(self, sock, protocol,
112 address=None, extra=None):
113 """Create datagram transport."""
114 raise NotImplementedError
115
116 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
117 extra=None):
118 """Create read pipe transport."""
119 raise NotImplementedError
120
121 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
122 extra=None):
123 """Create write pipe transport."""
124 raise NotImplementedError
125
126 @tasks.coroutine
127 def _make_subprocess_transport(self, protocol, args, shell,
128 stdin, stdout, stderr, bufsize,
129 extra=None, **kwargs):
130 """Create subprocess transport."""
131 raise NotImplementedError
132
133 def _read_from_self(self):
134 """XXX"""
135 raise NotImplementedError
136
137 def _write_to_self(self):
138 """XXX"""
139 raise NotImplementedError
140
141 def _process_events(self, event_list):
142 """Process selector events."""
143 raise NotImplementedError
144
145 def run_forever(self):
146 """Run until stop() is called."""
147 if self._running:
148 raise RuntimeError('Event loop is running.')
149 self._running = True
150 try:
151 while True:
152 try:
153 self._run_once()
154 except _StopError:
155 break
156 finally:
157 self._running = False
158
159 def run_until_complete(self, future):
160 """Run until the Future is done.
161
162 If the argument is a coroutine, it is wrapped in a Task.
163
164 XXX TBD: It would be disastrous to call run_until_complete()
165 with the same coroutine twice -- it would wrap it in two
166 different Tasks and that can't be good.
167
168 Return the Future's result, or raise its exception.
169 """
170 future = tasks.async(future, loop=self)
171 future.add_done_callback(_raise_stop_error)
172 self.run_forever()
173 future.remove_done_callback(_raise_stop_error)
174 if not future.done():
175 raise RuntimeError('Event loop stopped before Future completed.')
176
177 return future.result()
178
179 def stop(self):
180 """Stop running the event loop.
181
182 Every callback scheduled before stop() is called will run.
183 Callback scheduled after stop() is called won't. However,
184 those callbacks will run if run() is called again later.
185 """
186 self.call_soon(_raise_stop_error)
187
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200188 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700189 """Close the event loop.
190
191 This clears the queues and shuts down the executor,
192 but does not wait for the executor to finish.
193 """
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200194 self._ready.clear()
195 self._scheduled.clear()
196 executor = self._default_executor
197 if executor is not None:
198 self._default_executor = None
199 executor.shutdown(wait=False)
200
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700201 def is_running(self):
202 """Returns running status of event loop."""
203 return self._running
204
205 def time(self):
206 """Return the time according to the event loop's clock."""
207 return time.monotonic()
208
209 def call_later(self, delay, callback, *args):
210 """Arrange for a callback to be called at a given time.
211
212 Return a Handle: an opaque object with a cancel() method that
213 can be used to cancel the call.
214
215 The delay can be an int or float, expressed in seconds. It is
216 always a relative time.
217
218 Each callback will be called exactly once. If two callbacks
219 are scheduled for exactly the same time, it undefined which
220 will be called first.
221
222 Any positional arguments after the callback will be passed to
223 the callback when it is called.
224 """
225 return self.call_at(self.time() + delay, callback, *args)
226
227 def call_at(self, when, callback, *args):
228 """Like call_later(), but uses an absolute time."""
229 timer = events.TimerHandle(when, callback, args)
230 heapq.heappush(self._scheduled, timer)
231 return timer
232
233 def call_soon(self, callback, *args):
234 """Arrange for a callback to be called as soon as possible.
235
236 This operates as a FIFO queue, callbacks are called in the
237 order in which they are registered. Each callback will be
238 called exactly once.
239
240 Any positional arguments after the callback will be passed to
241 the callback when it is called.
242 """
243 handle = events.make_handle(callback, args)
244 self._ready.append(handle)
245 return handle
246
247 def call_soon_threadsafe(self, callback, *args):
248 """XXX"""
249 handle = self.call_soon(callback, *args)
250 self._write_to_self()
251 return handle
252
253 def run_in_executor(self, executor, callback, *args):
254 if isinstance(callback, events.Handle):
255 assert not args
256 assert not isinstance(callback, events.TimerHandle)
257 if callback._cancelled:
258 f = futures.Future(loop=self)
259 f.set_result(None)
260 return f
261 callback, args = callback._callback, callback._args
262 if executor is None:
263 executor = self._default_executor
264 if executor is None:
265 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
266 self._default_executor = executor
267 return futures.wrap_future(executor.submit(callback, *args), loop=self)
268
269 def set_default_executor(self, executor):
270 self._default_executor = executor
271
272 def getaddrinfo(self, host, port, *,
273 family=0, type=0, proto=0, flags=0):
274 return self.run_in_executor(None, socket.getaddrinfo,
275 host, port, family, type, proto, flags)
276
277 def getnameinfo(self, sockaddr, flags=0):
278 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
279
280 @tasks.coroutine
281 def create_connection(self, protocol_factory, host=None, port=None, *,
282 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700283 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284 """XXX"""
Guido van Rossum21c85a72013-11-01 14:16:54 -0700285 if server_hostname is not None and not ssl:
286 raise ValueError('server_hostname is only meaningful with ssl')
287
288 if server_hostname is None and ssl:
289 # Use host as default for server_hostname. It is an error
290 # if host is empty or not set, e.g. when an
291 # already-connected socket was passed or when only a port
292 # is given. To avoid this error, you can pass
293 # server_hostname='' -- this will bypass the hostname
294 # check. (This also means that if host is a numeric
295 # IP/IPv6 address, we will attempt to verify that exact
296 # address; this will probably fail, but it is possible to
297 # create a certificate for a specific IP address, so we
298 # don't judge it here.)
299 if not host:
300 raise ValueError('You must set server_hostname '
301 'when using ssl without a host')
302 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700303
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700304 if host is not None or port is not None:
305 if sock is not None:
306 raise ValueError(
307 'host/port and sock can not be specified at the same time')
308
309 f1 = self.getaddrinfo(
310 host, port, family=family,
311 type=socket.SOCK_STREAM, proto=proto, flags=flags)
312 fs = [f1]
313 if local_addr is not None:
314 f2 = self.getaddrinfo(
315 *local_addr, family=family,
316 type=socket.SOCK_STREAM, proto=proto, flags=flags)
317 fs.append(f2)
318 else:
319 f2 = None
320
321 yield from tasks.wait(fs, loop=self)
322
323 infos = f1.result()
324 if not infos:
325 raise OSError('getaddrinfo() returned empty list')
326 if f2 is not None:
327 laddr_infos = f2.result()
328 if not laddr_infos:
329 raise OSError('getaddrinfo() returned empty list')
330
331 exceptions = []
332 for family, type, proto, cname, address in infos:
333 try:
334 sock = socket.socket(family=family, type=type, proto=proto)
335 sock.setblocking(False)
336 if f2 is not None:
337 for _, _, _, _, laddr in laddr_infos:
338 try:
339 sock.bind(laddr)
340 break
341 except OSError as exc:
342 exc = OSError(
343 exc.errno, 'error while '
344 'attempting to bind on address '
345 '{!r}: {}'.format(
346 laddr, exc.strerror.lower()))
347 exceptions.append(exc)
348 else:
349 sock.close()
350 sock = None
351 continue
352 yield from self.sock_connect(sock, address)
353 except OSError as exc:
354 if sock is not None:
355 sock.close()
356 exceptions.append(exc)
357 else:
358 break
359 else:
360 if len(exceptions) == 1:
361 raise exceptions[0]
362 else:
363 # If they all have the same str(), raise one.
364 model = str(exceptions[0])
365 if all(str(exc) == model for exc in exceptions):
366 raise exceptions[0]
367 # Raise a combined exception so the user can see all
368 # the various error messages.
369 raise OSError('Multiple exceptions: {}'.format(
370 ', '.join(str(exc) for exc in exceptions)))
371
372 elif sock is None:
373 raise ValueError(
374 'host and port was not specified and no sock specified')
375
376 sock.setblocking(False)
377
378 protocol = protocol_factory()
379 waiter = futures.Future(loop=self)
380 if ssl:
381 sslcontext = None if isinstance(ssl, bool) else ssl
382 transport = self._make_ssl_transport(
383 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700384 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385 else:
386 transport = self._make_socket_transport(sock, protocol, waiter)
387
388 yield from waiter
389 return transport, protocol
390
391 @tasks.coroutine
392 def create_datagram_endpoint(self, protocol_factory,
393 local_addr=None, remote_addr=None, *,
394 family=0, proto=0, flags=0):
395 """Create datagram connection."""
396 if not (local_addr or remote_addr):
397 if family == 0:
398 raise ValueError('unexpected address family')
399 addr_pairs_info = (((family, proto), (None, None)),)
400 else:
401 # join addresss by (family, protocol)
402 addr_infos = collections.OrderedDict()
403 for idx, addr in ((0, local_addr), (1, remote_addr)):
404 if addr is not None:
405 assert isinstance(addr, tuple) and len(addr) == 2, (
406 '2-tuple is expected')
407
408 infos = yield from self.getaddrinfo(
409 *addr, family=family, type=socket.SOCK_DGRAM,
410 proto=proto, flags=flags)
411 if not infos:
412 raise OSError('getaddrinfo() returned empty list')
413
414 for fam, _, pro, _, address in infos:
415 key = (fam, pro)
416 if key not in addr_infos:
417 addr_infos[key] = [None, None]
418 addr_infos[key][idx] = address
419
420 # each addr has to have info for each (family, proto) pair
421 addr_pairs_info = [
422 (key, addr_pair) for key, addr_pair in addr_infos.items()
423 if not ((local_addr and addr_pair[0] is None) or
424 (remote_addr and addr_pair[1] is None))]
425
426 if not addr_pairs_info:
427 raise ValueError('can not get address information')
428
429 exceptions = []
430
431 for ((family, proto),
432 (local_address, remote_address)) in addr_pairs_info:
433 sock = None
434 r_addr = None
435 try:
436 sock = socket.socket(
437 family=family, type=socket.SOCK_DGRAM, proto=proto)
438 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
439 sock.setblocking(False)
440
441 if local_addr:
442 sock.bind(local_address)
443 if remote_addr:
444 yield from self.sock_connect(sock, remote_address)
445 r_addr = remote_address
446 except OSError as exc:
447 if sock is not None:
448 sock.close()
449 exceptions.append(exc)
450 else:
451 break
452 else:
453 raise exceptions[0]
454
455 protocol = protocol_factory()
456 transport = self._make_datagram_transport(sock, protocol, r_addr)
457 return transport, protocol
458
459 @tasks.coroutine
460 def create_server(self, protocol_factory, host=None, port=None,
461 *,
462 family=socket.AF_UNSPEC,
463 flags=socket.AI_PASSIVE,
464 sock=None,
465 backlog=100,
466 ssl=None,
467 reuse_address=None):
468 """XXX"""
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700469 if isinstance(ssl, bool):
470 raise TypeError('ssl argument must be an SSLContext or None')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471 if host is not None or port is not None:
472 if sock is not None:
473 raise ValueError(
474 'host/port and sock can not be specified at the same time')
475
476 AF_INET6 = getattr(socket, 'AF_INET6', 0)
477 if reuse_address is None:
478 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
479 sockets = []
480 if host == '':
481 host = None
482
483 infos = yield from self.getaddrinfo(
484 host, port, family=family,
485 type=socket.SOCK_STREAM, proto=0, flags=flags)
486 if not infos:
487 raise OSError('getaddrinfo() returned empty list')
488
489 completed = False
490 try:
491 for res in infos:
492 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700493 try:
494 sock = socket.socket(af, socktype, proto)
495 except socket.error:
496 # Assume it's a bad family/type/protocol combination.
497 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 sockets.append(sock)
499 if reuse_address:
500 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
501 True)
502 # Disable IPv4/IPv6 dual stack support (enabled by
503 # default on Linux) which makes a single socket
504 # listen on both address families.
505 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
506 sock.setsockopt(socket.IPPROTO_IPV6,
507 socket.IPV6_V6ONLY,
508 True)
509 try:
510 sock.bind(sa)
511 except OSError as err:
512 raise OSError(err.errno, 'error while attempting '
513 'to bind on address %r: %s'
514 % (sa, err.strerror.lower()))
515 completed = True
516 finally:
517 if not completed:
518 for sock in sockets:
519 sock.close()
520 else:
521 if sock is None:
522 raise ValueError(
523 'host and port was not specified and no sock specified')
524 sockets = [sock]
525
526 server = Server(self, sockets)
527 for sock in sockets:
528 sock.listen(backlog)
529 sock.setblocking(False)
530 self._start_serving(protocol_factory, sock, ssl, server)
531 return server
532
533 @tasks.coroutine
534 def connect_read_pipe(self, protocol_factory, pipe):
535 protocol = protocol_factory()
536 waiter = futures.Future(loop=self)
537 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
538 yield from waiter
539 return transport, protocol
540
541 @tasks.coroutine
542 def connect_write_pipe(self, protocol_factory, pipe):
543 protocol = protocol_factory()
544 waiter = futures.Future(loop=self)
545 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
546 yield from waiter
547 return transport, protocol
548
549 @tasks.coroutine
550 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
551 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
552 universal_newlines=False, shell=True, bufsize=0,
553 **kwargs):
554 assert not universal_newlines, "universal_newlines must be False"
555 assert shell, "shell must be True"
556 assert isinstance(cmd, str), cmd
557 protocol = protocol_factory()
558 transport = yield from self._make_subprocess_transport(
559 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
560 return transport, protocol
561
562 @tasks.coroutine
563 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
564 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
565 universal_newlines=False, shell=False, bufsize=0,
566 **kwargs):
567 assert not universal_newlines, "universal_newlines must be False"
568 assert not shell, "shell must be False"
569 protocol = protocol_factory()
570 transport = yield from self._make_subprocess_transport(
571 protocol, args, False, stdin, stdout, stderr, bufsize, **kwargs)
572 return transport, protocol
573
574 def _add_callback(self, handle):
575 """Add a Handle to ready or scheduled."""
576 assert isinstance(handle, events.Handle), 'A Handle is required here'
577 if handle._cancelled:
578 return
579 if isinstance(handle, events.TimerHandle):
580 heapq.heappush(self._scheduled, handle)
581 else:
582 self._ready.append(handle)
583
584 def _add_callback_signalsafe(self, handle):
585 """Like _add_callback() but called from a signal handler."""
586 self._add_callback(handle)
587 self._write_to_self()
588
589 def _run_once(self):
590 """Run one full iteration of the event loop.
591
592 This calls all currently ready callbacks, polls for I/O,
593 schedules the resulting callbacks, and finally schedules
594 'call_later' callbacks.
595 """
596 # Remove delayed calls that were cancelled from head of queue.
597 while self._scheduled and self._scheduled[0]._cancelled:
598 heapq.heappop(self._scheduled)
599
600 timeout = None
601 if self._ready:
602 timeout = 0
603 elif self._scheduled:
604 # Compute the desired timeout.
605 when = self._scheduled[0]._when
606 deadline = max(0, when - self.time())
607 if timeout is None:
608 timeout = deadline
609 else:
610 timeout = min(timeout, deadline)
611
612 # TODO: Instrumentation only in debug mode?
613 t0 = self.time()
614 event_list = self._selector.select(timeout)
615 t1 = self.time()
616 argstr = '' if timeout is None else '{:.3f}'.format(timeout)
617 if t1-t0 >= 1:
618 level = logging.INFO
619 else:
620 level = logging.DEBUG
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700621 logger.log(level, 'poll%s took %.3f seconds', argstr, t1-t0)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700622 self._process_events(event_list)
623
624 # Handle 'later' callbacks that are ready.
625 now = self.time()
626 while self._scheduled:
627 handle = self._scheduled[0]
628 if handle._when > now:
629 break
630 handle = heapq.heappop(self._scheduled)
631 self._ready.append(handle)
632
633 # This is the only place where callbacks are actually *called*.
634 # All other places just add them to ready.
635 # Note: We run all currently scheduled callbacks, but not any
636 # callbacks scheduled by callbacks run this time around --
637 # they will be run the next time (after another I/O poll).
638 # Use an idiom that is threadsafe without using locks.
639 ntodo = len(self._ready)
640 for i in range(ntodo):
641 handle = self._ready.popleft()
642 if not handle._cancelled:
643 handle._run()
644 handle = None # Needed to break cycles when an exception occurs.