blob: a73b3d3977121cdec2263daf3494c3bfc4cab6b0 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
20import logging
21import socket
22import subprocess
23import time
24import os
25import sys
26
27from . import events
28from . import futures
29from . import tasks
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070030from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33__all__ = ['BaseEventLoop', 'Server']
34
35
36# Argument for default thread pool executor creation.
37_MAX_WORKERS = 5
38
39
40class _StopError(BaseException):
41 """Raised to stop the event loop."""
42
43
44def _raise_stop_error(*args):
45 raise _StopError
46
47
48class Server(events.AbstractServer):
49
50 def __init__(self, loop, sockets):
51 self.loop = loop
52 self.sockets = sockets
53 self.active_count = 0
54 self.waiters = []
55
56 def attach(self, transport):
57 assert self.sockets is not None
58 self.active_count += 1
59
60 def detach(self, transport):
61 assert self.active_count > 0
62 self.active_count -= 1
63 if self.active_count == 0 and self.sockets is None:
64 self._wakeup()
65
66 def close(self):
67 sockets = self.sockets
68 if sockets is not None:
69 self.sockets = None
70 for sock in sockets:
71 self.loop._stop_serving(sock)
72 if self.active_count == 0:
73 self._wakeup()
74
75 def _wakeup(self):
76 waiters = self.waiters
77 self.waiters = None
78 for waiter in waiters:
79 if not waiter.done():
80 waiter.set_result(waiter)
81
82 @tasks.coroutine
83 def wait_closed(self):
84 if self.sockets is None or self.waiters is None:
85 return
86 waiter = futures.Future(loop=self.loop)
87 self.waiters.append(waiter)
88 yield from waiter
89
90
91class BaseEventLoop(events.AbstractEventLoop):
92
93 def __init__(self):
94 self._ready = collections.deque()
95 self._scheduled = []
96 self._default_executor = None
97 self._internal_fds = 0
98 self._running = False
99
100 def _make_socket_transport(self, sock, protocol, waiter=None, *,
101 extra=None, server=None):
102 """Create socket transport."""
103 raise NotImplementedError
104
105 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
106 server_side=False, server_hostname=None,
107 extra=None, server=None):
108 """Create SSL transport."""
109 raise NotImplementedError
110
111 def _make_datagram_transport(self, sock, protocol,
112 address=None, extra=None):
113 """Create datagram transport."""
114 raise NotImplementedError
115
116 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
117 extra=None):
118 """Create read pipe transport."""
119 raise NotImplementedError
120
121 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
122 extra=None):
123 """Create write pipe transport."""
124 raise NotImplementedError
125
126 @tasks.coroutine
127 def _make_subprocess_transport(self, protocol, args, shell,
128 stdin, stdout, stderr, bufsize,
129 extra=None, **kwargs):
130 """Create subprocess transport."""
131 raise NotImplementedError
132
133 def _read_from_self(self):
134 """XXX"""
135 raise NotImplementedError
136
137 def _write_to_self(self):
138 """XXX"""
139 raise NotImplementedError
140
141 def _process_events(self, event_list):
142 """Process selector events."""
143 raise NotImplementedError
144
145 def run_forever(self):
146 """Run until stop() is called."""
147 if self._running:
148 raise RuntimeError('Event loop is running.')
149 self._running = True
150 try:
151 while True:
152 try:
153 self._run_once()
154 except _StopError:
155 break
156 finally:
157 self._running = False
158
159 def run_until_complete(self, future):
160 """Run until the Future is done.
161
162 If the argument is a coroutine, it is wrapped in a Task.
163
164 XXX TBD: It would be disastrous to call run_until_complete()
165 with the same coroutine twice -- it would wrap it in two
166 different Tasks and that can't be good.
167
168 Return the Future's result, or raise its exception.
169 """
170 future = tasks.async(future, loop=self)
171 future.add_done_callback(_raise_stop_error)
172 self.run_forever()
173 future.remove_done_callback(_raise_stop_error)
174 if not future.done():
175 raise RuntimeError('Event loop stopped before Future completed.')
176
177 return future.result()
178
179 def stop(self):
180 """Stop running the event loop.
181
182 Every callback scheduled before stop() is called will run.
183 Callback scheduled after stop() is called won't. However,
184 those callbacks will run if run() is called again later.
185 """
186 self.call_soon(_raise_stop_error)
187
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200188 def close(self):
Guido van Rossume3f52ef2013-11-01 14:19:04 -0700189 """Close the event loop.
190
191 This clears the queues and shuts down the executor,
192 but does not wait for the executor to finish.
193 """
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200194 self._ready.clear()
195 self._scheduled.clear()
196 executor = self._default_executor
197 if executor is not None:
198 self._default_executor = None
199 executor.shutdown(wait=False)
200
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700201 def is_running(self):
202 """Returns running status of event loop."""
203 return self._running
204
205 def time(self):
206 """Return the time according to the event loop's clock."""
207 return time.monotonic()
208
209 def call_later(self, delay, callback, *args):
210 """Arrange for a callback to be called at a given time.
211
212 Return a Handle: an opaque object with a cancel() method that
213 can be used to cancel the call.
214
215 The delay can be an int or float, expressed in seconds. It is
216 always a relative time.
217
218 Each callback will be called exactly once. If two callbacks
219 are scheduled for exactly the same time, it undefined which
220 will be called first.
221
222 Any positional arguments after the callback will be passed to
223 the callback when it is called.
224 """
225 return self.call_at(self.time() + delay, callback, *args)
226
227 def call_at(self, when, callback, *args):
228 """Like call_later(), but uses an absolute time."""
229 timer = events.TimerHandle(when, callback, args)
230 heapq.heappush(self._scheduled, timer)
231 return timer
232
233 def call_soon(self, callback, *args):
234 """Arrange for a callback to be called as soon as possible.
235
236 This operates as a FIFO queue, callbacks are called in the
237 order in which they are registered. Each callback will be
238 called exactly once.
239
240 Any positional arguments after the callback will be passed to
241 the callback when it is called.
242 """
243 handle = events.make_handle(callback, args)
244 self._ready.append(handle)
245 return handle
246
247 def call_soon_threadsafe(self, callback, *args):
248 """XXX"""
249 handle = self.call_soon(callback, *args)
250 self._write_to_self()
251 return handle
252
253 def run_in_executor(self, executor, callback, *args):
254 if isinstance(callback, events.Handle):
255 assert not args
256 assert not isinstance(callback, events.TimerHandle)
257 if callback._cancelled:
258 f = futures.Future(loop=self)
259 f.set_result(None)
260 return f
261 callback, args = callback._callback, callback._args
262 if executor is None:
263 executor = self._default_executor
264 if executor is None:
265 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
266 self._default_executor = executor
267 return futures.wrap_future(executor.submit(callback, *args), loop=self)
268
269 def set_default_executor(self, executor):
270 self._default_executor = executor
271
272 def getaddrinfo(self, host, port, *,
273 family=0, type=0, proto=0, flags=0):
274 return self.run_in_executor(None, socket.getaddrinfo,
275 host, port, family, type, proto, flags)
276
277 def getnameinfo(self, sockaddr, flags=0):
278 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
279
280 @tasks.coroutine
281 def create_connection(self, protocol_factory, host=None, port=None, *,
282 ssl=None, family=0, proto=0, flags=0, sock=None,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700283 local_addr=None, server_hostname=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284 """XXX"""
Guido van Rossum21c85a72013-11-01 14:16:54 -0700285 if server_hostname is not None and not ssl:
286 raise ValueError('server_hostname is only meaningful with ssl')
287
288 if server_hostname is None and ssl:
289 # Use host as default for server_hostname. It is an error
290 # if host is empty or not set, e.g. when an
291 # already-connected socket was passed or when only a port
292 # is given. To avoid this error, you can pass
293 # server_hostname='' -- this will bypass the hostname
294 # check. (This also means that if host is a numeric
295 # IP/IPv6 address, we will attempt to verify that exact
296 # address; this will probably fail, but it is possible to
297 # create a certificate for a specific IP address, so we
298 # don't judge it here.)
299 if not host:
300 raise ValueError('You must set server_hostname '
301 'when using ssl without a host')
302 server_hostname = host
Guido van Rossuma8d630a2013-11-01 14:20:55 -0700303
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700304 if host is not None or port is not None:
305 if sock is not None:
306 raise ValueError(
307 'host/port and sock can not be specified at the same time')
308
309 f1 = self.getaddrinfo(
310 host, port, family=family,
311 type=socket.SOCK_STREAM, proto=proto, flags=flags)
312 fs = [f1]
313 if local_addr is not None:
314 f2 = self.getaddrinfo(
315 *local_addr, family=family,
316 type=socket.SOCK_STREAM, proto=proto, flags=flags)
317 fs.append(f2)
318 else:
319 f2 = None
320
321 yield from tasks.wait(fs, loop=self)
322
323 infos = f1.result()
324 if not infos:
325 raise OSError('getaddrinfo() returned empty list')
326 if f2 is not None:
327 laddr_infos = f2.result()
328 if not laddr_infos:
329 raise OSError('getaddrinfo() returned empty list')
330
331 exceptions = []
332 for family, type, proto, cname, address in infos:
333 try:
334 sock = socket.socket(family=family, type=type, proto=proto)
335 sock.setblocking(False)
336 if f2 is not None:
337 for _, _, _, _, laddr in laddr_infos:
338 try:
339 sock.bind(laddr)
340 break
341 except OSError as exc:
342 exc = OSError(
343 exc.errno, 'error while '
344 'attempting to bind on address '
345 '{!r}: {}'.format(
346 laddr, exc.strerror.lower()))
347 exceptions.append(exc)
348 else:
349 sock.close()
350 sock = None
351 continue
352 yield from self.sock_connect(sock, address)
353 except OSError as exc:
354 if sock is not None:
355 sock.close()
356 exceptions.append(exc)
357 else:
358 break
359 else:
360 if len(exceptions) == 1:
361 raise exceptions[0]
362 else:
363 # If they all have the same str(), raise one.
364 model = str(exceptions[0])
365 if all(str(exc) == model for exc in exceptions):
366 raise exceptions[0]
367 # Raise a combined exception so the user can see all
368 # the various error messages.
369 raise OSError('Multiple exceptions: {}'.format(
370 ', '.join(str(exc) for exc in exceptions)))
371
372 elif sock is None:
373 raise ValueError(
374 'host and port was not specified and no sock specified')
375
376 sock.setblocking(False)
377
378 protocol = protocol_factory()
379 waiter = futures.Future(loop=self)
380 if ssl:
381 sslcontext = None if isinstance(ssl, bool) else ssl
382 transport = self._make_ssl_transport(
383 sock, protocol, sslcontext, waiter,
Guido van Rossum21c85a72013-11-01 14:16:54 -0700384 server_side=False, server_hostname=server_hostname)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385 else:
386 transport = self._make_socket_transport(sock, protocol, waiter)
387
388 yield from waiter
389 return transport, protocol
390
391 @tasks.coroutine
392 def create_datagram_endpoint(self, protocol_factory,
393 local_addr=None, remote_addr=None, *,
394 family=0, proto=0, flags=0):
395 """Create datagram connection."""
396 if not (local_addr or remote_addr):
397 if family == 0:
398 raise ValueError('unexpected address family')
399 addr_pairs_info = (((family, proto), (None, None)),)
400 else:
401 # join addresss by (family, protocol)
402 addr_infos = collections.OrderedDict()
403 for idx, addr in ((0, local_addr), (1, remote_addr)):
404 if addr is not None:
405 assert isinstance(addr, tuple) and len(addr) == 2, (
406 '2-tuple is expected')
407
408 infos = yield from self.getaddrinfo(
409 *addr, family=family, type=socket.SOCK_DGRAM,
410 proto=proto, flags=flags)
411 if not infos:
412 raise OSError('getaddrinfo() returned empty list')
413
414 for fam, _, pro, _, address in infos:
415 key = (fam, pro)
416 if key not in addr_infos:
417 addr_infos[key] = [None, None]
418 addr_infos[key][idx] = address
419
420 # each addr has to have info for each (family, proto) pair
421 addr_pairs_info = [
422 (key, addr_pair) for key, addr_pair in addr_infos.items()
423 if not ((local_addr and addr_pair[0] is None) or
424 (remote_addr and addr_pair[1] is None))]
425
426 if not addr_pairs_info:
427 raise ValueError('can not get address information')
428
429 exceptions = []
430
431 for ((family, proto),
432 (local_address, remote_address)) in addr_pairs_info:
433 sock = None
434 r_addr = None
435 try:
436 sock = socket.socket(
437 family=family, type=socket.SOCK_DGRAM, proto=proto)
438 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
439 sock.setblocking(False)
440
441 if local_addr:
442 sock.bind(local_address)
443 if remote_addr:
444 yield from self.sock_connect(sock, remote_address)
445 r_addr = remote_address
446 except OSError as exc:
447 if sock is not None:
448 sock.close()
449 exceptions.append(exc)
450 else:
451 break
452 else:
453 raise exceptions[0]
454
455 protocol = protocol_factory()
456 transport = self._make_datagram_transport(sock, protocol, r_addr)
457 return transport, protocol
458
459 @tasks.coroutine
460 def create_server(self, protocol_factory, host=None, port=None,
461 *,
462 family=socket.AF_UNSPEC,
463 flags=socket.AI_PASSIVE,
464 sock=None,
465 backlog=100,
466 ssl=None,
467 reuse_address=None):
468 """XXX"""
469 if host is not None or port is not None:
470 if sock is not None:
471 raise ValueError(
472 'host/port and sock can not be specified at the same time')
473
474 AF_INET6 = getattr(socket, 'AF_INET6', 0)
475 if reuse_address is None:
476 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
477 sockets = []
478 if host == '':
479 host = None
480
481 infos = yield from self.getaddrinfo(
482 host, port, family=family,
483 type=socket.SOCK_STREAM, proto=0, flags=flags)
484 if not infos:
485 raise OSError('getaddrinfo() returned empty list')
486
487 completed = False
488 try:
489 for res in infos:
490 af, socktype, proto, canonname, sa = res
Guido van Rossum32e46852013-10-19 17:04:25 -0700491 try:
492 sock = socket.socket(af, socktype, proto)
493 except socket.error:
494 # Assume it's a bad family/type/protocol combination.
495 continue
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700496 sockets.append(sock)
497 if reuse_address:
498 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
499 True)
500 # Disable IPv4/IPv6 dual stack support (enabled by
501 # default on Linux) which makes a single socket
502 # listen on both address families.
503 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
504 sock.setsockopt(socket.IPPROTO_IPV6,
505 socket.IPV6_V6ONLY,
506 True)
507 try:
508 sock.bind(sa)
509 except OSError as err:
510 raise OSError(err.errno, 'error while attempting '
511 'to bind on address %r: %s'
512 % (sa, err.strerror.lower()))
513 completed = True
514 finally:
515 if not completed:
516 for sock in sockets:
517 sock.close()
518 else:
519 if sock is None:
520 raise ValueError(
521 'host and port was not specified and no sock specified')
522 sockets = [sock]
523
524 server = Server(self, sockets)
525 for sock in sockets:
526 sock.listen(backlog)
527 sock.setblocking(False)
528 self._start_serving(protocol_factory, sock, ssl, server)
529 return server
530
531 @tasks.coroutine
532 def connect_read_pipe(self, protocol_factory, pipe):
533 protocol = protocol_factory()
534 waiter = futures.Future(loop=self)
535 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
536 yield from waiter
537 return transport, protocol
538
539 @tasks.coroutine
540 def connect_write_pipe(self, protocol_factory, pipe):
541 protocol = protocol_factory()
542 waiter = futures.Future(loop=self)
543 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
544 yield from waiter
545 return transport, protocol
546
547 @tasks.coroutine
548 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
549 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
550 universal_newlines=False, shell=True, bufsize=0,
551 **kwargs):
552 assert not universal_newlines, "universal_newlines must be False"
553 assert shell, "shell must be True"
554 assert isinstance(cmd, str), cmd
555 protocol = protocol_factory()
556 transport = yield from self._make_subprocess_transport(
557 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
558 return transport, protocol
559
560 @tasks.coroutine
561 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
562 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
563 universal_newlines=False, shell=False, bufsize=0,
564 **kwargs):
565 assert not universal_newlines, "universal_newlines must be False"
566 assert not shell, "shell must be False"
567 protocol = protocol_factory()
568 transport = yield from self._make_subprocess_transport(
569 protocol, args, False, stdin, stdout, stderr, bufsize, **kwargs)
570 return transport, protocol
571
572 def _add_callback(self, handle):
573 """Add a Handle to ready or scheduled."""
574 assert isinstance(handle, events.Handle), 'A Handle is required here'
575 if handle._cancelled:
576 return
577 if isinstance(handle, events.TimerHandle):
578 heapq.heappush(self._scheduled, handle)
579 else:
580 self._ready.append(handle)
581
582 def _add_callback_signalsafe(self, handle):
583 """Like _add_callback() but called from a signal handler."""
584 self._add_callback(handle)
585 self._write_to_self()
586
587 def _run_once(self):
588 """Run one full iteration of the event loop.
589
590 This calls all currently ready callbacks, polls for I/O,
591 schedules the resulting callbacks, and finally schedules
592 'call_later' callbacks.
593 """
594 # Remove delayed calls that were cancelled from head of queue.
595 while self._scheduled and self._scheduled[0]._cancelled:
596 heapq.heappop(self._scheduled)
597
598 timeout = None
599 if self._ready:
600 timeout = 0
601 elif self._scheduled:
602 # Compute the desired timeout.
603 when = self._scheduled[0]._when
604 deadline = max(0, when - self.time())
605 if timeout is None:
606 timeout = deadline
607 else:
608 timeout = min(timeout, deadline)
609
610 # TODO: Instrumentation only in debug mode?
611 t0 = self.time()
612 event_list = self._selector.select(timeout)
613 t1 = self.time()
614 argstr = '' if timeout is None else '{:.3f}'.format(timeout)
615 if t1-t0 >= 1:
616 level = logging.INFO
617 else:
618 level = logging.DEBUG
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700619 logger.log(level, 'poll%s took %.3f seconds', argstr, t1-t0)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700620 self._process_events(event_list)
621
622 # Handle 'later' callbacks that are ready.
623 now = self.time()
624 while self._scheduled:
625 handle = self._scheduled[0]
626 if handle._when > now:
627 break
628 handle = heapq.heappop(self._scheduled)
629 self._ready.append(handle)
630
631 # This is the only place where callbacks are actually *called*.
632 # All other places just add them to ready.
633 # Note: We run all currently scheduled callbacks, but not any
634 # callbacks scheduled by callbacks run this time around --
635 # they will be run the next time (after another I/O poll).
636 # Use an idiom that is threadsafe without using locks.
637 ntodo = len(self._ready)
638 for i in range(ntodo):
639 handle = self._ready.popleft()
640 if not handle._cancelled:
641 handle._run()
642 handle = None # Needed to break cycles when an exception occurs.