blob: 5f1bff71f2dfbd4980fb959c8eaefd5f967b43ea [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
20import logging
21import socket
22import subprocess
23import time
24import os
25import sys
26
27from . import events
28from . import futures
29from . import tasks
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070030from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33__all__ = ['BaseEventLoop', 'Server']
34
35
36# Argument for default thread pool executor creation.
37_MAX_WORKERS = 5
38
39
40class _StopError(BaseException):
41 """Raised to stop the event loop."""
42
43
44def _raise_stop_error(*args):
45 raise _StopError
46
47
48class Server(events.AbstractServer):
49
50 def __init__(self, loop, sockets):
51 self.loop = loop
52 self.sockets = sockets
53 self.active_count = 0
54 self.waiters = []
55
56 def attach(self, transport):
57 assert self.sockets is not None
58 self.active_count += 1
59
60 def detach(self, transport):
61 assert self.active_count > 0
62 self.active_count -= 1
63 if self.active_count == 0 and self.sockets is None:
64 self._wakeup()
65
66 def close(self):
67 sockets = self.sockets
68 if sockets is not None:
69 self.sockets = None
70 for sock in sockets:
71 self.loop._stop_serving(sock)
72 if self.active_count == 0:
73 self._wakeup()
74
75 def _wakeup(self):
76 waiters = self.waiters
77 self.waiters = None
78 for waiter in waiters:
79 if not waiter.done():
80 waiter.set_result(waiter)
81
82 @tasks.coroutine
83 def wait_closed(self):
84 if self.sockets is None or self.waiters is None:
85 return
86 waiter = futures.Future(loop=self.loop)
87 self.waiters.append(waiter)
88 yield from waiter
89
90
91class BaseEventLoop(events.AbstractEventLoop):
92
93 def __init__(self):
94 self._ready = collections.deque()
95 self._scheduled = []
96 self._default_executor = None
97 self._internal_fds = 0
98 self._running = False
99
100 def _make_socket_transport(self, sock, protocol, waiter=None, *,
101 extra=None, server=None):
102 """Create socket transport."""
103 raise NotImplementedError
104
105 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
106 server_side=False, server_hostname=None,
107 extra=None, server=None):
108 """Create SSL transport."""
109 raise NotImplementedError
110
111 def _make_datagram_transport(self, sock, protocol,
112 address=None, extra=None):
113 """Create datagram transport."""
114 raise NotImplementedError
115
116 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
117 extra=None):
118 """Create read pipe transport."""
119 raise NotImplementedError
120
121 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
122 extra=None):
123 """Create write pipe transport."""
124 raise NotImplementedError
125
126 @tasks.coroutine
127 def _make_subprocess_transport(self, protocol, args, shell,
128 stdin, stdout, stderr, bufsize,
129 extra=None, **kwargs):
130 """Create subprocess transport."""
131 raise NotImplementedError
132
133 def _read_from_self(self):
134 """XXX"""
135 raise NotImplementedError
136
137 def _write_to_self(self):
138 """XXX"""
139 raise NotImplementedError
140
141 def _process_events(self, event_list):
142 """Process selector events."""
143 raise NotImplementedError
144
145 def run_forever(self):
146 """Run until stop() is called."""
147 if self._running:
148 raise RuntimeError('Event loop is running.')
149 self._running = True
150 try:
151 while True:
152 try:
153 self._run_once()
154 except _StopError:
155 break
156 finally:
157 self._running = False
158
159 def run_until_complete(self, future):
160 """Run until the Future is done.
161
162 If the argument is a coroutine, it is wrapped in a Task.
163
164 XXX TBD: It would be disastrous to call run_until_complete()
165 with the same coroutine twice -- it would wrap it in two
166 different Tasks and that can't be good.
167
168 Return the Future's result, or raise its exception.
169 """
170 future = tasks.async(future, loop=self)
171 future.add_done_callback(_raise_stop_error)
172 self.run_forever()
173 future.remove_done_callback(_raise_stop_error)
174 if not future.done():
175 raise RuntimeError('Event loop stopped before Future completed.')
176
177 return future.result()
178
179 def stop(self):
180 """Stop running the event loop.
181
182 Every callback scheduled before stop() is called will run.
183 Callback scheduled after stop() is called won't. However,
184 those callbacks will run if run() is called again later.
185 """
186 self.call_soon(_raise_stop_error)
187
188 def is_running(self):
189 """Returns running status of event loop."""
190 return self._running
191
192 def time(self):
193 """Return the time according to the event loop's clock."""
194 return time.monotonic()
195
196 def call_later(self, delay, callback, *args):
197 """Arrange for a callback to be called at a given time.
198
199 Return a Handle: an opaque object with a cancel() method that
200 can be used to cancel the call.
201
202 The delay can be an int or float, expressed in seconds. It is
203 always a relative time.
204
205 Each callback will be called exactly once. If two callbacks
206 are scheduled for exactly the same time, it undefined which
207 will be called first.
208
209 Any positional arguments after the callback will be passed to
210 the callback when it is called.
211 """
212 return self.call_at(self.time() + delay, callback, *args)
213
214 def call_at(self, when, callback, *args):
215 """Like call_later(), but uses an absolute time."""
216 timer = events.TimerHandle(when, callback, args)
217 heapq.heappush(self._scheduled, timer)
218 return timer
219
220 def call_soon(self, callback, *args):
221 """Arrange for a callback to be called as soon as possible.
222
223 This operates as a FIFO queue, callbacks are called in the
224 order in which they are registered. Each callback will be
225 called exactly once.
226
227 Any positional arguments after the callback will be passed to
228 the callback when it is called.
229 """
230 handle = events.make_handle(callback, args)
231 self._ready.append(handle)
232 return handle
233
234 def call_soon_threadsafe(self, callback, *args):
235 """XXX"""
236 handle = self.call_soon(callback, *args)
237 self._write_to_self()
238 return handle
239
240 def run_in_executor(self, executor, callback, *args):
241 if isinstance(callback, events.Handle):
242 assert not args
243 assert not isinstance(callback, events.TimerHandle)
244 if callback._cancelled:
245 f = futures.Future(loop=self)
246 f.set_result(None)
247 return f
248 callback, args = callback._callback, callback._args
249 if executor is None:
250 executor = self._default_executor
251 if executor is None:
252 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
253 self._default_executor = executor
254 return futures.wrap_future(executor.submit(callback, *args), loop=self)
255
256 def set_default_executor(self, executor):
257 self._default_executor = executor
258
259 def getaddrinfo(self, host, port, *,
260 family=0, type=0, proto=0, flags=0):
261 return self.run_in_executor(None, socket.getaddrinfo,
262 host, port, family, type, proto, flags)
263
264 def getnameinfo(self, sockaddr, flags=0):
265 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
266
267 @tasks.coroutine
268 def create_connection(self, protocol_factory, host=None, port=None, *,
269 ssl=None, family=0, proto=0, flags=0, sock=None,
270 local_addr=None):
271 """XXX"""
272 if host is not None or port is not None:
273 if sock is not None:
274 raise ValueError(
275 'host/port and sock can not be specified at the same time')
276
277 f1 = self.getaddrinfo(
278 host, port, family=family,
279 type=socket.SOCK_STREAM, proto=proto, flags=flags)
280 fs = [f1]
281 if local_addr is not None:
282 f2 = self.getaddrinfo(
283 *local_addr, family=family,
284 type=socket.SOCK_STREAM, proto=proto, flags=flags)
285 fs.append(f2)
286 else:
287 f2 = None
288
289 yield from tasks.wait(fs, loop=self)
290
291 infos = f1.result()
292 if not infos:
293 raise OSError('getaddrinfo() returned empty list')
294 if f2 is not None:
295 laddr_infos = f2.result()
296 if not laddr_infos:
297 raise OSError('getaddrinfo() returned empty list')
298
299 exceptions = []
300 for family, type, proto, cname, address in infos:
301 try:
302 sock = socket.socket(family=family, type=type, proto=proto)
303 sock.setblocking(False)
304 if f2 is not None:
305 for _, _, _, _, laddr in laddr_infos:
306 try:
307 sock.bind(laddr)
308 break
309 except OSError as exc:
310 exc = OSError(
311 exc.errno, 'error while '
312 'attempting to bind on address '
313 '{!r}: {}'.format(
314 laddr, exc.strerror.lower()))
315 exceptions.append(exc)
316 else:
317 sock.close()
318 sock = None
319 continue
320 yield from self.sock_connect(sock, address)
321 except OSError as exc:
322 if sock is not None:
323 sock.close()
324 exceptions.append(exc)
325 else:
326 break
327 else:
328 if len(exceptions) == 1:
329 raise exceptions[0]
330 else:
331 # If they all have the same str(), raise one.
332 model = str(exceptions[0])
333 if all(str(exc) == model for exc in exceptions):
334 raise exceptions[0]
335 # Raise a combined exception so the user can see all
336 # the various error messages.
337 raise OSError('Multiple exceptions: {}'.format(
338 ', '.join(str(exc) for exc in exceptions)))
339
340 elif sock is None:
341 raise ValueError(
342 'host and port was not specified and no sock specified')
343
344 sock.setblocking(False)
345
346 protocol = protocol_factory()
347 waiter = futures.Future(loop=self)
348 if ssl:
349 sslcontext = None if isinstance(ssl, bool) else ssl
350 transport = self._make_ssl_transport(
351 sock, protocol, sslcontext, waiter,
352 server_side=False, server_hostname=host)
353 else:
354 transport = self._make_socket_transport(sock, protocol, waiter)
355
356 yield from waiter
357 return transport, protocol
358
359 @tasks.coroutine
360 def create_datagram_endpoint(self, protocol_factory,
361 local_addr=None, remote_addr=None, *,
362 family=0, proto=0, flags=0):
363 """Create datagram connection."""
364 if not (local_addr or remote_addr):
365 if family == 0:
366 raise ValueError('unexpected address family')
367 addr_pairs_info = (((family, proto), (None, None)),)
368 else:
369 # join addresss by (family, protocol)
370 addr_infos = collections.OrderedDict()
371 for idx, addr in ((0, local_addr), (1, remote_addr)):
372 if addr is not None:
373 assert isinstance(addr, tuple) and len(addr) == 2, (
374 '2-tuple is expected')
375
376 infos = yield from self.getaddrinfo(
377 *addr, family=family, type=socket.SOCK_DGRAM,
378 proto=proto, flags=flags)
379 if not infos:
380 raise OSError('getaddrinfo() returned empty list')
381
382 for fam, _, pro, _, address in infos:
383 key = (fam, pro)
384 if key not in addr_infos:
385 addr_infos[key] = [None, None]
386 addr_infos[key][idx] = address
387
388 # each addr has to have info for each (family, proto) pair
389 addr_pairs_info = [
390 (key, addr_pair) for key, addr_pair in addr_infos.items()
391 if not ((local_addr and addr_pair[0] is None) or
392 (remote_addr and addr_pair[1] is None))]
393
394 if not addr_pairs_info:
395 raise ValueError('can not get address information')
396
397 exceptions = []
398
399 for ((family, proto),
400 (local_address, remote_address)) in addr_pairs_info:
401 sock = None
402 r_addr = None
403 try:
404 sock = socket.socket(
405 family=family, type=socket.SOCK_DGRAM, proto=proto)
406 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
407 sock.setblocking(False)
408
409 if local_addr:
410 sock.bind(local_address)
411 if remote_addr:
412 yield from self.sock_connect(sock, remote_address)
413 r_addr = remote_address
414 except OSError as exc:
415 if sock is not None:
416 sock.close()
417 exceptions.append(exc)
418 else:
419 break
420 else:
421 raise exceptions[0]
422
423 protocol = protocol_factory()
424 transport = self._make_datagram_transport(sock, protocol, r_addr)
425 return transport, protocol
426
427 @tasks.coroutine
428 def create_server(self, protocol_factory, host=None, port=None,
429 *,
430 family=socket.AF_UNSPEC,
431 flags=socket.AI_PASSIVE,
432 sock=None,
433 backlog=100,
434 ssl=None,
435 reuse_address=None):
436 """XXX"""
437 if host is not None or port is not None:
438 if sock is not None:
439 raise ValueError(
440 'host/port and sock can not be specified at the same time')
441
442 AF_INET6 = getattr(socket, 'AF_INET6', 0)
443 if reuse_address is None:
444 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
445 sockets = []
446 if host == '':
447 host = None
448
449 infos = yield from self.getaddrinfo(
450 host, port, family=family,
451 type=socket.SOCK_STREAM, proto=0, flags=flags)
452 if not infos:
453 raise OSError('getaddrinfo() returned empty list')
454
455 completed = False
456 try:
457 for res in infos:
458 af, socktype, proto, canonname, sa = res
459 sock = socket.socket(af, socktype, proto)
460 sockets.append(sock)
461 if reuse_address:
462 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
463 True)
464 # Disable IPv4/IPv6 dual stack support (enabled by
465 # default on Linux) which makes a single socket
466 # listen on both address families.
467 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
468 sock.setsockopt(socket.IPPROTO_IPV6,
469 socket.IPV6_V6ONLY,
470 True)
471 try:
472 sock.bind(sa)
473 except OSError as err:
474 raise OSError(err.errno, 'error while attempting '
475 'to bind on address %r: %s'
476 % (sa, err.strerror.lower()))
477 completed = True
478 finally:
479 if not completed:
480 for sock in sockets:
481 sock.close()
482 else:
483 if sock is None:
484 raise ValueError(
485 'host and port was not specified and no sock specified')
486 sockets = [sock]
487
488 server = Server(self, sockets)
489 for sock in sockets:
490 sock.listen(backlog)
491 sock.setblocking(False)
492 self._start_serving(protocol_factory, sock, ssl, server)
493 return server
494
495 @tasks.coroutine
496 def connect_read_pipe(self, protocol_factory, pipe):
497 protocol = protocol_factory()
498 waiter = futures.Future(loop=self)
499 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
500 yield from waiter
501 return transport, protocol
502
503 @tasks.coroutine
504 def connect_write_pipe(self, protocol_factory, pipe):
505 protocol = protocol_factory()
506 waiter = futures.Future(loop=self)
507 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
508 yield from waiter
509 return transport, protocol
510
511 @tasks.coroutine
512 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
513 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
514 universal_newlines=False, shell=True, bufsize=0,
515 **kwargs):
516 assert not universal_newlines, "universal_newlines must be False"
517 assert shell, "shell must be True"
518 assert isinstance(cmd, str), cmd
519 protocol = protocol_factory()
520 transport = yield from self._make_subprocess_transport(
521 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
522 return transport, protocol
523
524 @tasks.coroutine
525 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
526 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
527 universal_newlines=False, shell=False, bufsize=0,
528 **kwargs):
529 assert not universal_newlines, "universal_newlines must be False"
530 assert not shell, "shell must be False"
531 protocol = protocol_factory()
532 transport = yield from self._make_subprocess_transport(
533 protocol, args, False, stdin, stdout, stderr, bufsize, **kwargs)
534 return transport, protocol
535
536 def _add_callback(self, handle):
537 """Add a Handle to ready or scheduled."""
538 assert isinstance(handle, events.Handle), 'A Handle is required here'
539 if handle._cancelled:
540 return
541 if isinstance(handle, events.TimerHandle):
542 heapq.heappush(self._scheduled, handle)
543 else:
544 self._ready.append(handle)
545
546 def _add_callback_signalsafe(self, handle):
547 """Like _add_callback() but called from a signal handler."""
548 self._add_callback(handle)
549 self._write_to_self()
550
551 def _run_once(self):
552 """Run one full iteration of the event loop.
553
554 This calls all currently ready callbacks, polls for I/O,
555 schedules the resulting callbacks, and finally schedules
556 'call_later' callbacks.
557 """
558 # Remove delayed calls that were cancelled from head of queue.
559 while self._scheduled and self._scheduled[0]._cancelled:
560 heapq.heappop(self._scheduled)
561
562 timeout = None
563 if self._ready:
564 timeout = 0
565 elif self._scheduled:
566 # Compute the desired timeout.
567 when = self._scheduled[0]._when
568 deadline = max(0, when - self.time())
569 if timeout is None:
570 timeout = deadline
571 else:
572 timeout = min(timeout, deadline)
573
574 # TODO: Instrumentation only in debug mode?
575 t0 = self.time()
576 event_list = self._selector.select(timeout)
577 t1 = self.time()
578 argstr = '' if timeout is None else '{:.3f}'.format(timeout)
579 if t1-t0 >= 1:
580 level = logging.INFO
581 else:
582 level = logging.DEBUG
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700583 logger.log(level, 'poll%s took %.3f seconds', argstr, t1-t0)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700584 self._process_events(event_list)
585
586 # Handle 'later' callbacks that are ready.
587 now = self.time()
588 while self._scheduled:
589 handle = self._scheduled[0]
590 if handle._when > now:
591 break
592 handle = heapq.heappop(self._scheduled)
593 self._ready.append(handle)
594
595 # This is the only place where callbacks are actually *called*.
596 # All other places just add them to ready.
597 # Note: We run all currently scheduled callbacks, but not any
598 # callbacks scheduled by callbacks run this time around --
599 # they will be run the next time (after another I/O poll).
600 # Use an idiom that is threadsafe without using locks.
601 ntodo = len(self._ready)
602 for i in range(ntodo):
603 handle = self._ready.popleft()
604 if not handle._cancelled:
605 handle._run()
606 handle = None # Needed to break cycles when an exception occurs.