blob: f18a5565706853e53d4909167c701b9d71d57dc5 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
20import logging
21import socket
22import subprocess
23import time
24import os
25import sys
26
27from . import events
28from . import futures
29from . import tasks
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070030from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33__all__ = ['BaseEventLoop', 'Server']
34
35
36# Argument for default thread pool executor creation.
37_MAX_WORKERS = 5
38
39
40class _StopError(BaseException):
41 """Raised to stop the event loop."""
42
43
44def _raise_stop_error(*args):
45 raise _StopError
46
47
48class Server(events.AbstractServer):
49
50 def __init__(self, loop, sockets):
51 self.loop = loop
52 self.sockets = sockets
53 self.active_count = 0
54 self.waiters = []
55
56 def attach(self, transport):
57 assert self.sockets is not None
58 self.active_count += 1
59
60 def detach(self, transport):
61 assert self.active_count > 0
62 self.active_count -= 1
63 if self.active_count == 0 and self.sockets is None:
64 self._wakeup()
65
66 def close(self):
67 sockets = self.sockets
68 if sockets is not None:
69 self.sockets = None
70 for sock in sockets:
71 self.loop._stop_serving(sock)
72 if self.active_count == 0:
73 self._wakeup()
74
75 def _wakeup(self):
76 waiters = self.waiters
77 self.waiters = None
78 for waiter in waiters:
79 if not waiter.done():
80 waiter.set_result(waiter)
81
82 @tasks.coroutine
83 def wait_closed(self):
84 if self.sockets is None or self.waiters is None:
85 return
86 waiter = futures.Future(loop=self.loop)
87 self.waiters.append(waiter)
88 yield from waiter
89
90
91class BaseEventLoop(events.AbstractEventLoop):
92
93 def __init__(self):
94 self._ready = collections.deque()
95 self._scheduled = []
96 self._default_executor = None
97 self._internal_fds = 0
98 self._running = False
99
100 def _make_socket_transport(self, sock, protocol, waiter=None, *,
101 extra=None, server=None):
102 """Create socket transport."""
103 raise NotImplementedError
104
105 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
106 server_side=False, server_hostname=None,
107 extra=None, server=None):
108 """Create SSL transport."""
109 raise NotImplementedError
110
111 def _make_datagram_transport(self, sock, protocol,
112 address=None, extra=None):
113 """Create datagram transport."""
114 raise NotImplementedError
115
116 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
117 extra=None):
118 """Create read pipe transport."""
119 raise NotImplementedError
120
121 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
122 extra=None):
123 """Create write pipe transport."""
124 raise NotImplementedError
125
126 @tasks.coroutine
127 def _make_subprocess_transport(self, protocol, args, shell,
128 stdin, stdout, stderr, bufsize,
129 extra=None, **kwargs):
130 """Create subprocess transport."""
131 raise NotImplementedError
132
133 def _read_from_self(self):
134 """XXX"""
135 raise NotImplementedError
136
137 def _write_to_self(self):
138 """XXX"""
139 raise NotImplementedError
140
141 def _process_events(self, event_list):
142 """Process selector events."""
143 raise NotImplementedError
144
145 def run_forever(self):
146 """Run until stop() is called."""
147 if self._running:
148 raise RuntimeError('Event loop is running.')
149 self._running = True
150 try:
151 while True:
152 try:
153 self._run_once()
154 except _StopError:
155 break
156 finally:
157 self._running = False
158
159 def run_until_complete(self, future):
160 """Run until the Future is done.
161
162 If the argument is a coroutine, it is wrapped in a Task.
163
164 XXX TBD: It would be disastrous to call run_until_complete()
165 with the same coroutine twice -- it would wrap it in two
166 different Tasks and that can't be good.
167
168 Return the Future's result, or raise its exception.
169 """
170 future = tasks.async(future, loop=self)
171 future.add_done_callback(_raise_stop_error)
172 self.run_forever()
173 future.remove_done_callback(_raise_stop_error)
174 if not future.done():
175 raise RuntimeError('Event loop stopped before Future completed.')
176
177 return future.result()
178
179 def stop(self):
180 """Stop running the event loop.
181
182 Every callback scheduled before stop() is called will run.
183 Callback scheduled after stop() is called won't. However,
184 those callbacks will run if run() is called again later.
185 """
186 self.call_soon(_raise_stop_error)
187
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200188 def close(self):
189 self._ready.clear()
190 self._scheduled.clear()
191 executor = self._default_executor
192 if executor is not None:
193 self._default_executor = None
194 executor.shutdown(wait=False)
195
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700196 def is_running(self):
197 """Returns running status of event loop."""
198 return self._running
199
200 def time(self):
201 """Return the time according to the event loop's clock."""
202 return time.monotonic()
203
204 def call_later(self, delay, callback, *args):
205 """Arrange for a callback to be called at a given time.
206
207 Return a Handle: an opaque object with a cancel() method that
208 can be used to cancel the call.
209
210 The delay can be an int or float, expressed in seconds. It is
211 always a relative time.
212
213 Each callback will be called exactly once. If two callbacks
214 are scheduled for exactly the same time, it undefined which
215 will be called first.
216
217 Any positional arguments after the callback will be passed to
218 the callback when it is called.
219 """
220 return self.call_at(self.time() + delay, callback, *args)
221
222 def call_at(self, when, callback, *args):
223 """Like call_later(), but uses an absolute time."""
224 timer = events.TimerHandle(when, callback, args)
225 heapq.heappush(self._scheduled, timer)
226 return timer
227
228 def call_soon(self, callback, *args):
229 """Arrange for a callback to be called as soon as possible.
230
231 This operates as a FIFO queue, callbacks are called in the
232 order in which they are registered. Each callback will be
233 called exactly once.
234
235 Any positional arguments after the callback will be passed to
236 the callback when it is called.
237 """
238 handle = events.make_handle(callback, args)
239 self._ready.append(handle)
240 return handle
241
242 def call_soon_threadsafe(self, callback, *args):
243 """XXX"""
244 handle = self.call_soon(callback, *args)
245 self._write_to_self()
246 return handle
247
248 def run_in_executor(self, executor, callback, *args):
249 if isinstance(callback, events.Handle):
250 assert not args
251 assert not isinstance(callback, events.TimerHandle)
252 if callback._cancelled:
253 f = futures.Future(loop=self)
254 f.set_result(None)
255 return f
256 callback, args = callback._callback, callback._args
257 if executor is None:
258 executor = self._default_executor
259 if executor is None:
260 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
261 self._default_executor = executor
262 return futures.wrap_future(executor.submit(callback, *args), loop=self)
263
264 def set_default_executor(self, executor):
265 self._default_executor = executor
266
267 def getaddrinfo(self, host, port, *,
268 family=0, type=0, proto=0, flags=0):
269 return self.run_in_executor(None, socket.getaddrinfo,
270 host, port, family, type, proto, flags)
271
272 def getnameinfo(self, sockaddr, flags=0):
273 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
274
275 @tasks.coroutine
276 def create_connection(self, protocol_factory, host=None, port=None, *,
277 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700278 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700279 """XXX"""
Guido van Rossum21c85a72013-11-01 14:16:54 -0700280 if server_hostname is not None and not ssl:
281 raise ValueError('server_hostname is only meaningful with ssl')
282
283 if server_hostname is None and ssl:
284 # Use host as default for server_hostname. It is an error
285 # if host is empty or not set, e.g. when an
286 # already-connected socket was passed or when only a port
287 # is given. To avoid this error, you can pass
288 # server_hostname='' -- this will bypass the hostname
289 # check. (This also means that if host is a numeric
290 # IP/IPv6 address, we will attempt to verify that exact
291 # address; this will probably fail, but it is possible to
292 # create a certificate for a specific IP address, so we
293 # don't judge it here.)
294 if not host:
295 raise ValueError('You must set server_hostname '
296 'when using ssl without a host')
297 server_hostname = host
298
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700299 if host is not None or port is not None:
300 if sock is not None:
301 raise ValueError(
302 'host/port and sock can not be specified at the same time')
303
304 f1 = self.getaddrinfo(
305 host, port, family=family,
306 type=socket.SOCK_STREAM, proto=proto, flags=flags)
307 fs = [f1]
308 if local_addr is not None:
309 f2 = self.getaddrinfo(
310 *local_addr, family=family,
311 type=socket.SOCK_STREAM, proto=proto, flags=flags)
312 fs.append(f2)
313 else:
314 f2 = None
315
316 yield from tasks.wait(fs, loop=self)
317
318 infos = f1.result()
319 if not infos:
320 raise OSError('getaddrinfo() returned empty list')
321 if f2 is not None:
322 laddr_infos = f2.result()
323 if not laddr_infos:
324 raise OSError('getaddrinfo() returned empty list')
325
326 exceptions = []
327 for family, type, proto, cname, address in infos:
328 try:
329 sock = socket.socket(family=family, type=type, proto=proto)
330 sock.setblocking(False)
331 if f2 is not None:
332 for _, _, _, _, laddr in laddr_infos:
333 try:
334 sock.bind(laddr)
335 break
336 except OSError as exc:
337 exc = OSError(
338 exc.errno, 'error while '
339 'attempting to bind on address '
340 '{!r}: {}'.format(
341 laddr, exc.strerror.lower()))
342 exceptions.append(exc)
343 else:
344 sock.close()
345 sock = None
346 continue
347 yield from self.sock_connect(sock, address)
348 except OSError as exc:
349 if sock is not None:
350 sock.close()
351 exceptions.append(exc)
352 else:
353 break
354 else:
355 if len(exceptions) == 1:
356 raise exceptions[0]
357 else:
358 # If they all have the same str(), raise one.
359 model = str(exceptions[0])
360 if all(str(exc) == model for exc in exceptions):
361 raise exceptions[0]
362 # Raise a combined exception so the user can see all
363 # the various error messages.
364 raise OSError('Multiple exceptions: {}'.format(
365 ', '.join(str(exc) for exc in exceptions)))
366
367 elif sock is None:
368 raise ValueError(
369 'host and port was not specified and no sock specified')
370
371 sock.setblocking(False)
372
373 protocol = protocol_factory()
374 waiter = futures.Future(loop=self)
375 if ssl:
376 sslcontext = None if isinstance(ssl, bool) else ssl
377 transport = self._make_ssl_transport(
378 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700379 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380 else:
381 transport = self._make_socket_transport(sock, protocol, waiter)
382
383 yield from waiter
384 return transport, protocol
385
386 @tasks.coroutine
387 def create_datagram_endpoint(self, protocol_factory,
388 local_addr=None, remote_addr=None, *,
389 family=0, proto=0, flags=0):
390 """Create datagram connection."""
391 if not (local_addr or remote_addr):
392 if family == 0:
393 raise ValueError('unexpected address family')
394 addr_pairs_info = (((family, proto), (None, None)),)
395 else:
396 # join addresss by (family, protocol)
397 addr_infos = collections.OrderedDict()
398 for idx, addr in ((0, local_addr), (1, remote_addr)):
399 if addr is not None:
400 assert isinstance(addr, tuple) and len(addr) == 2, (
401 '2-tuple is expected')
402
403 infos = yield from self.getaddrinfo(
404 *addr, family=family, type=socket.SOCK_DGRAM,
405 proto=proto, flags=flags)
406 if not infos:
407 raise OSError('getaddrinfo() returned empty list')
408
409 for fam, _, pro, _, address in infos:
410 key = (fam, pro)
411 if key not in addr_infos:
412 addr_infos[key] = [None, None]
413 addr_infos[key][idx] = address
414
415 # each addr has to have info for each (family, proto) pair
416 addr_pairs_info = [
417 (key, addr_pair) for key, addr_pair in addr_infos.items()
418 if not ((local_addr and addr_pair[0] is None) or
419 (remote_addr and addr_pair[1] is None))]
420
421 if not addr_pairs_info:
422 raise ValueError('can not get address information')
423
424 exceptions = []
425
426 for ((family, proto),
427 (local_address, remote_address)) in addr_pairs_info:
428 sock = None
429 r_addr = None
430 try:
431 sock = socket.socket(
432 family=family, type=socket.SOCK_DGRAM, proto=proto)
433 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
434 sock.setblocking(False)
435
436 if local_addr:
437 sock.bind(local_address)
438 if remote_addr:
439 yield from self.sock_connect(sock, remote_address)
440 r_addr = remote_address
441 except OSError as exc:
442 if sock is not None:
443 sock.close()
444 exceptions.append(exc)
445 else:
446 break
447 else:
448 raise exceptions[0]
449
450 protocol = protocol_factory()
451 transport = self._make_datagram_transport(sock, protocol, r_addr)
452 return transport, protocol
453
454 @tasks.coroutine
455 def create_server(self, protocol_factory, host=None, port=None,
456 *,
457 family=socket.AF_UNSPEC,
458 flags=socket.AI_PASSIVE,
459 sock=None,
460 backlog=100,
461 ssl=None,
462 reuse_address=None):
463 """XXX"""
464 if host is not None or port is not None:
465 if sock is not None:
466 raise ValueError(
467 'host/port and sock can not be specified at the same time')
468
469 AF_INET6 = getattr(socket, 'AF_INET6', 0)
470 if reuse_address is None:
471 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
472 sockets = []
473 if host == '':
474 host = None
475
476 infos = yield from self.getaddrinfo(
477 host, port, family=family,
478 type=socket.SOCK_STREAM, proto=0, flags=flags)
479 if not infos:
480 raise OSError('getaddrinfo() returned empty list')
481
482 completed = False
483 try:
484 for res in infos:
485 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700486 try:
487 sock = socket.socket(af, socktype, proto)
488 except socket.error:
489 # Assume it's a bad family/type/protocol combination.
490 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700491 sockets.append(sock)
492 if reuse_address:
493 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
494 True)
495 # Disable IPv4/IPv6 dual stack support (enabled by
496 # default on Linux) which makes a single socket
497 # listen on both address families.
498 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
499 sock.setsockopt(socket.IPPROTO_IPV6,
500 socket.IPV6_V6ONLY,
501 True)
502 try:
503 sock.bind(sa)
504 except OSError as err:
505 raise OSError(err.errno, 'error while attempting '
506 'to bind on address %r: %s'
507 % (sa, err.strerror.lower()))
508 completed = True
509 finally:
510 if not completed:
511 for sock in sockets:
512 sock.close()
513 else:
514 if sock is None:
515 raise ValueError(
516 'host and port was not specified and no sock specified')
517 sockets = [sock]
518
519 server = Server(self, sockets)
520 for sock in sockets:
521 sock.listen(backlog)
522 sock.setblocking(False)
523 self._start_serving(protocol_factory, sock, ssl, server)
524 return server
525
526 @tasks.coroutine
527 def connect_read_pipe(self, protocol_factory, pipe):
528 protocol = protocol_factory()
529 waiter = futures.Future(loop=self)
530 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
531 yield from waiter
532 return transport, protocol
533
534 @tasks.coroutine
535 def connect_write_pipe(self, protocol_factory, pipe):
536 protocol = protocol_factory()
537 waiter = futures.Future(loop=self)
538 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
539 yield from waiter
540 return transport, protocol
541
542 @tasks.coroutine
543 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
544 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
545 universal_newlines=False, shell=True, bufsize=0,
546 **kwargs):
547 assert not universal_newlines, "universal_newlines must be False"
548 assert shell, "shell must be True"
549 assert isinstance(cmd, str), cmd
550 protocol = protocol_factory()
551 transport = yield from self._make_subprocess_transport(
552 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
553 return transport, protocol
554
555 @tasks.coroutine
556 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
557 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
558 universal_newlines=False, shell=False, bufsize=0,
559 **kwargs):
560 assert not universal_newlines, "universal_newlines must be False"
561 assert not shell, "shell must be False"
562 protocol = protocol_factory()
563 transport = yield from self._make_subprocess_transport(
564 protocol, args, False, stdin, stdout, stderr, bufsize, **kwargs)
565 return transport, protocol
566
567 def _add_callback(self, handle):
568 """Add a Handle to ready or scheduled."""
569 assert isinstance(handle, events.Handle), 'A Handle is required here'
570 if handle._cancelled:
571 return
572 if isinstance(handle, events.TimerHandle):
573 heapq.heappush(self._scheduled, handle)
574 else:
575 self._ready.append(handle)
576
577 def _add_callback_signalsafe(self, handle):
578 """Like _add_callback() but called from a signal handler."""
579 self._add_callback(handle)
580 self._write_to_self()
581
582 def _run_once(self):
583 """Run one full iteration of the event loop.
584
585 This calls all currently ready callbacks, polls for I/O,
586 schedules the resulting callbacks, and finally schedules
587 'call_later' callbacks.
588 """
589 # Remove delayed calls that were cancelled from head of queue.
590 while self._scheduled and self._scheduled[0]._cancelled:
591 heapq.heappop(self._scheduled)
592
593 timeout = None
594 if self._ready:
595 timeout = 0
596 elif self._scheduled:
597 # Compute the desired timeout.
598 when = self._scheduled[0]._when
599 deadline = max(0, when - self.time())
600 if timeout is None:
601 timeout = deadline
602 else:
603 timeout = min(timeout, deadline)
604
605 # TODO: Instrumentation only in debug mode?
606 t0 = self.time()
607 event_list = self._selector.select(timeout)
608 t1 = self.time()
609 argstr = '' if timeout is None else '{:.3f}'.format(timeout)
610 if t1-t0 >= 1:
611 level = logging.INFO
612 else:
613 level = logging.DEBUG
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700614 logger.log(level, 'poll%s took %.3f seconds', argstr, t1-t0)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 self._process_events(event_list)
616
617 # Handle 'later' callbacks that are ready.
618 now = self.time()
619 while self._scheduled:
620 handle = self._scheduled[0]
621 if handle._when > now:
622 break
623 handle = heapq.heappop(self._scheduled)
624 self._ready.append(handle)
625
626 # This is the only place where callbacks are actually *called*.
627 # All other places just add them to ready.
628 # Note: We run all currently scheduled callbacks, but not any
629 # callbacks scheduled by callbacks run this time around --
630 # they will be run the next time (after another I/O poll).
631 # Use an idiom that is threadsafe without using locks.
632 ntodo = len(self._ready)
633 for i in range(ntodo):
634 handle = self._ready.popleft()
635 if not handle._cancelled:
636 handle._run()
637 handle = None # Needed to break cycles when an exception occurs.