blob: 2e00713741426aea4ab56b0883fb3e9953f1757d [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Base implementation of event loop.
2
3The event loop can be broken up into a multiplexer (the part
4responsible for notifying us of IO events) and the event loop proper,
5which wraps a multiplexer with functionality for scheduling callbacks,
6immediately or at a given time in the future.
7
8Whenever a public API takes a callback, subsequent positional
9arguments will be passed to the callback if/when it is called. This
10avoids the proliferation of trivial lambdas implementing closures.
11Keyword arguments for the callback are not supported; this is a
12conscious design decision, leaving the door open for keyword arguments
13to modify the meaning of the API call itself.
14"""
15
16
17import collections
18import concurrent.futures
19import heapq
20import logging
21import socket
22import subprocess
23import time
24import os
25import sys
26
27from . import events
28from . import futures
29from . import tasks
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070030from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070031
32
33__all__ = ['BaseEventLoop', 'Server']
34
35
36# Argument for default thread pool executor creation.
37_MAX_WORKERS = 5
38
39
40class _StopError(BaseException):
41 """Raised to stop the event loop."""
42
43
44def _raise_stop_error(*args):
45 raise _StopError
46
47
48class Server(events.AbstractServer):
49
50 def __init__(self, loop, sockets):
51 self.loop = loop
52 self.sockets = sockets
53 self.active_count = 0
54 self.waiters = []
55
56 def attach(self, transport):
57 assert self.sockets is not None
58 self.active_count += 1
59
60 def detach(self, transport):
61 assert self.active_count > 0
62 self.active_count -= 1
63 if self.active_count == 0 and self.sockets is None:
64 self._wakeup()
65
66 def close(self):
67 sockets = self.sockets
68 if sockets is not None:
69 self.sockets = None
70 for sock in sockets:
71 self.loop._stop_serving(sock)
72 if self.active_count == 0:
73 self._wakeup()
74
75 def _wakeup(self):
76 waiters = self.waiters
77 self.waiters = None
78 for waiter in waiters:
79 if not waiter.done():
80 waiter.set_result(waiter)
81
82 @tasks.coroutine
83 def wait_closed(self):
84 if self.sockets is None or self.waiters is None:
85 return
86 waiter = futures.Future(loop=self.loop)
87 self.waiters.append(waiter)
88 yield from waiter
89
90
91class BaseEventLoop(events.AbstractEventLoop):
92
93 def __init__(self):
94 self._ready = collections.deque()
95 self._scheduled = []
96 self._default_executor = None
97 self._internal_fds = 0
98 self._running = False
99
100 def _make_socket_transport(self, sock, protocol, waiter=None, *,
101 extra=None, server=None):
102 """Create socket transport."""
103 raise NotImplementedError
104
105 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
106 server_side=False, server_hostname=None,
107 extra=None, server=None):
108 """Create SSL transport."""
109 raise NotImplementedError
110
111 def _make_datagram_transport(self, sock, protocol,
112 address=None, extra=None):
113 """Create datagram transport."""
114 raise NotImplementedError
115
116 def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
117 extra=None):
118 """Create read pipe transport."""
119 raise NotImplementedError
120
121 def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
122 extra=None):
123 """Create write pipe transport."""
124 raise NotImplementedError
125
126 @tasks.coroutine
127 def _make_subprocess_transport(self, protocol, args, shell,
128 stdin, stdout, stderr, bufsize,
129 extra=None, **kwargs):
130 """Create subprocess transport."""
131 raise NotImplementedError
132
133 def _read_from_self(self):
134 """XXX"""
135 raise NotImplementedError
136
137 def _write_to_self(self):
138 """XXX"""
139 raise NotImplementedError
140
141 def _process_events(self, event_list):
142 """Process selector events."""
143 raise NotImplementedError
144
145 def run_forever(self):
146 """Run until stop() is called."""
147 if self._running:
148 raise RuntimeError('Event loop is running.')
149 self._running = True
150 try:
151 while True:
152 try:
153 self._run_once()
154 except _StopError:
155 break
156 finally:
157 self._running = False
158
159 def run_until_complete(self, future):
160 """Run until the Future is done.
161
162 If the argument is a coroutine, it is wrapped in a Task.
163
164 XXX TBD: It would be disastrous to call run_until_complete()
165 with the same coroutine twice -- it would wrap it in two
166 different Tasks and that can't be good.
167
168 Return the Future's result, or raise its exception.
169 """
170 future = tasks.async(future, loop=self)
171 future.add_done_callback(_raise_stop_error)
172 self.run_forever()
173 future.remove_done_callback(_raise_stop_error)
174 if not future.done():
175 raise RuntimeError('Event loop stopped before Future completed.')
176
177 return future.result()
178
179 def stop(self):
180 """Stop running the event loop.
181
182 Every callback scheduled before stop() is called will run.
183 Callback scheduled after stop() is called won't. However,
184 those callbacks will run if run() is called again later.
185 """
186 self.call_soon(_raise_stop_error)
187
Antoine Pitrou4ca73552013-10-20 00:54:10 +0200188 def close(self):
189 self._ready.clear()
190 self._scheduled.clear()
191 executor = self._default_executor
192 if executor is not None:
193 self._default_executor = None
194 executor.shutdown(wait=False)
195
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700196 def is_running(self):
197 """Returns running status of event loop."""
198 return self._running
199
200 def time(self):
201 """Return the time according to the event loop's clock."""
202 return time.monotonic()
203
204 def call_later(self, delay, callback, *args):
205 """Arrange for a callback to be called at a given time.
206
207 Return a Handle: an opaque object with a cancel() method that
208 can be used to cancel the call.
209
210 The delay can be an int or float, expressed in seconds. It is
211 always a relative time.
212
213 Each callback will be called exactly once. If two callbacks
214 are scheduled for exactly the same time, it undefined which
215 will be called first.
216
217 Any positional arguments after the callback will be passed to
218 the callback when it is called.
219 """
220 return self.call_at(self.time() + delay, callback, *args)
221
222 def call_at(self, when, callback, *args):
223 """Like call_later(), but uses an absolute time."""
224 timer = events.TimerHandle(when, callback, args)
225 heapq.heappush(self._scheduled, timer)
226 return timer
227
228 def call_soon(self, callback, *args):
229 """Arrange for a callback to be called as soon as possible.
230
231 This operates as a FIFO queue, callbacks are called in the
232 order in which they are registered. Each callback will be
233 called exactly once.
234
235 Any positional arguments after the callback will be passed to
236 the callback when it is called.
237 """
238 handle = events.make_handle(callback, args)
239 self._ready.append(handle)
240 return handle
241
242 def call_soon_threadsafe(self, callback, *args):
243 """XXX"""
244 handle = self.call_soon(callback, *args)
245 self._write_to_self()
246 return handle
247
248 def run_in_executor(self, executor, callback, *args):
249 if isinstance(callback, events.Handle):
250 assert not args
251 assert not isinstance(callback, events.TimerHandle)
252 if callback._cancelled:
253 f = futures.Future(loop=self)
254 f.set_result(None)
255 return f
256 callback, args = callback._callback, callback._args
257 if executor is None:
258 executor = self._default_executor
259 if executor is None:
260 executor = concurrent.futures.ThreadPoolExecutor(_MAX_WORKERS)
261 self._default_executor = executor
262 return futures.wrap_future(executor.submit(callback, *args), loop=self)
263
264 def set_default_executor(self, executor):
265 self._default_executor = executor
266
267 def getaddrinfo(self, host, port, *,
268 family=0, type=0, proto=0, flags=0):
269 return self.run_in_executor(None, socket.getaddrinfo,
270 host, port, family, type, proto, flags)
271
272 def getnameinfo(self, sockaddr, flags=0):
273 return self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
274
275 @tasks.coroutine
276 def create_connection(self, protocol_factory, host=None, port=None, *,
277 ssl=None, family=0, proto=0, flags=0, sock=None,
278 local_addr=None):
279 """XXX"""
280 if host is not None or port is not None:
281 if sock is not None:
282 raise ValueError(
283 'host/port and sock can not be specified at the same time')
284
285 f1 = self.getaddrinfo(
286 host, port, family=family,
287 type=socket.SOCK_STREAM, proto=proto, flags=flags)
288 fs = [f1]
289 if local_addr is not None:
290 f2 = self.getaddrinfo(
291 *local_addr, family=family,
292 type=socket.SOCK_STREAM, proto=proto, flags=flags)
293 fs.append(f2)
294 else:
295 f2 = None
296
297 yield from tasks.wait(fs, loop=self)
298
299 infos = f1.result()
300 if not infos:
301 raise OSError('getaddrinfo() returned empty list')
302 if f2 is not None:
303 laddr_infos = f2.result()
304 if not laddr_infos:
305 raise OSError('getaddrinfo() returned empty list')
306
307 exceptions = []
308 for family, type, proto, cname, address in infos:
309 try:
310 sock = socket.socket(family=family, type=type, proto=proto)
311 sock.setblocking(False)
312 if f2 is not None:
313 for _, _, _, _, laddr in laddr_infos:
314 try:
315 sock.bind(laddr)
316 break
317 except OSError as exc:
318 exc = OSError(
319 exc.errno, 'error while '
320 'attempting to bind on address '
321 '{!r}: {}'.format(
322 laddr, exc.strerror.lower()))
323 exceptions.append(exc)
324 else:
325 sock.close()
326 sock = None
327 continue
328 yield from self.sock_connect(sock, address)
329 except OSError as exc:
330 if sock is not None:
331 sock.close()
332 exceptions.append(exc)
333 else:
334 break
335 else:
336 if len(exceptions) == 1:
337 raise exceptions[0]
338 else:
339 # If they all have the same str(), raise one.
340 model = str(exceptions[0])
341 if all(str(exc) == model for exc in exceptions):
342 raise exceptions[0]
343 # Raise a combined exception so the user can see all
344 # the various error messages.
345 raise OSError('Multiple exceptions: {}'.format(
346 ', '.join(str(exc) for exc in exceptions)))
347
348 elif sock is None:
349 raise ValueError(
350 'host and port was not specified and no sock specified')
351
352 sock.setblocking(False)
353
354 protocol = protocol_factory()
355 waiter = futures.Future(loop=self)
356 if ssl:
357 sslcontext = None if isinstance(ssl, bool) else ssl
358 transport = self._make_ssl_transport(
359 sock, protocol, sslcontext, waiter,
360 server_side=False, server_hostname=host)
361 else:
362 transport = self._make_socket_transport(sock, protocol, waiter)
363
364 yield from waiter
365 return transport, protocol
366
367 @tasks.coroutine
368 def create_datagram_endpoint(self, protocol_factory,
369 local_addr=None, remote_addr=None, *,
370 family=0, proto=0, flags=0):
371 """Create datagram connection."""
372 if not (local_addr or remote_addr):
373 if family == 0:
374 raise ValueError('unexpected address family')
375 addr_pairs_info = (((family, proto), (None, None)),)
376 else:
377 # join addresss by (family, protocol)
378 addr_infos = collections.OrderedDict()
379 for idx, addr in ((0, local_addr), (1, remote_addr)):
380 if addr is not None:
381 assert isinstance(addr, tuple) and len(addr) == 2, (
382 '2-tuple is expected')
383
384 infos = yield from self.getaddrinfo(
385 *addr, family=family, type=socket.SOCK_DGRAM,
386 proto=proto, flags=flags)
387 if not infos:
388 raise OSError('getaddrinfo() returned empty list')
389
390 for fam, _, pro, _, address in infos:
391 key = (fam, pro)
392 if key not in addr_infos:
393 addr_infos[key] = [None, None]
394 addr_infos[key][idx] = address
395
396 # each addr has to have info for each (family, proto) pair
397 addr_pairs_info = [
398 (key, addr_pair) for key, addr_pair in addr_infos.items()
399 if not ((local_addr and addr_pair[0] is None) or
400 (remote_addr and addr_pair[1] is None))]
401
402 if not addr_pairs_info:
403 raise ValueError('can not get address information')
404
405 exceptions = []
406
407 for ((family, proto),
408 (local_address, remote_address)) in addr_pairs_info:
409 sock = None
410 r_addr = None
411 try:
412 sock = socket.socket(
413 family=family, type=socket.SOCK_DGRAM, proto=proto)
414 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
415 sock.setblocking(False)
416
417 if local_addr:
418 sock.bind(local_address)
419 if remote_addr:
420 yield from self.sock_connect(sock, remote_address)
421 r_addr = remote_address
422 except OSError as exc:
423 if sock is not None:
424 sock.close()
425 exceptions.append(exc)
426 else:
427 break
428 else:
429 raise exceptions[0]
430
431 protocol = protocol_factory()
432 transport = self._make_datagram_transport(sock, protocol, r_addr)
433 return transport, protocol
434
435 @tasks.coroutine
436 def create_server(self, protocol_factory, host=None, port=None,
437 *,
438 family=socket.AF_UNSPEC,
439 flags=socket.AI_PASSIVE,
440 sock=None,
441 backlog=100,
442 ssl=None,
443 reuse_address=None):
444 """XXX"""
445 if host is not None or port is not None:
446 if sock is not None:
447 raise ValueError(
448 'host/port and sock can not be specified at the same time')
449
450 AF_INET6 = getattr(socket, 'AF_INET6', 0)
451 if reuse_address is None:
452 reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
453 sockets = []
454 if host == '':
455 host = None
456
457 infos = yield from self.getaddrinfo(
458 host, port, family=family,
459 type=socket.SOCK_STREAM, proto=0, flags=flags)
460 if not infos:
461 raise OSError('getaddrinfo() returned empty list')
462
463 completed = False
464 try:
465 for res in infos:
466 af, socktype, proto, canonname, sa = res
467 sock = socket.socket(af, socktype, proto)
468 sockets.append(sock)
469 if reuse_address:
470 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
471 True)
472 # Disable IPv4/IPv6 dual stack support (enabled by
473 # default on Linux) which makes a single socket
474 # listen on both address families.
475 if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
476 sock.setsockopt(socket.IPPROTO_IPV6,
477 socket.IPV6_V6ONLY,
478 True)
479 try:
480 sock.bind(sa)
481 except OSError as err:
482 raise OSError(err.errno, 'error while attempting '
483 'to bind on address %r: %s'
484 % (sa, err.strerror.lower()))
485 completed = True
486 finally:
487 if not completed:
488 for sock in sockets:
489 sock.close()
490 else:
491 if sock is None:
492 raise ValueError(
493 'host and port was not specified and no sock specified')
494 sockets = [sock]
495
496 server = Server(self, sockets)
497 for sock in sockets:
498 sock.listen(backlog)
499 sock.setblocking(False)
500 self._start_serving(protocol_factory, sock, ssl, server)
501 return server
502
503 @tasks.coroutine
504 def connect_read_pipe(self, protocol_factory, pipe):
505 protocol = protocol_factory()
506 waiter = futures.Future(loop=self)
507 transport = self._make_read_pipe_transport(pipe, protocol, waiter)
508 yield from waiter
509 return transport, protocol
510
511 @tasks.coroutine
512 def connect_write_pipe(self, protocol_factory, pipe):
513 protocol = protocol_factory()
514 waiter = futures.Future(loop=self)
515 transport = self._make_write_pipe_transport(pipe, protocol, waiter)
516 yield from waiter
517 return transport, protocol
518
519 @tasks.coroutine
520 def subprocess_shell(self, protocol_factory, cmd, *, stdin=subprocess.PIPE,
521 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
522 universal_newlines=False, shell=True, bufsize=0,
523 **kwargs):
524 assert not universal_newlines, "universal_newlines must be False"
525 assert shell, "shell must be True"
526 assert isinstance(cmd, str), cmd
527 protocol = protocol_factory()
528 transport = yield from self._make_subprocess_transport(
529 protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
530 return transport, protocol
531
532 @tasks.coroutine
533 def subprocess_exec(self, protocol_factory, *args, stdin=subprocess.PIPE,
534 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
535 universal_newlines=False, shell=False, bufsize=0,
536 **kwargs):
537 assert not universal_newlines, "universal_newlines must be False"
538 assert not shell, "shell must be False"
539 protocol = protocol_factory()
540 transport = yield from self._make_subprocess_transport(
541 protocol, args, False, stdin, stdout, stderr, bufsize, **kwargs)
542 return transport, protocol
543
544 def _add_callback(self, handle):
545 """Add a Handle to ready or scheduled."""
546 assert isinstance(handle, events.Handle), 'A Handle is required here'
547 if handle._cancelled:
548 return
549 if isinstance(handle, events.TimerHandle):
550 heapq.heappush(self._scheduled, handle)
551 else:
552 self._ready.append(handle)
553
554 def _add_callback_signalsafe(self, handle):
555 """Like _add_callback() but called from a signal handler."""
556 self._add_callback(handle)
557 self._write_to_self()
558
559 def _run_once(self):
560 """Run one full iteration of the event loop.
561
562 This calls all currently ready callbacks, polls for I/O,
563 schedules the resulting callbacks, and finally schedules
564 'call_later' callbacks.
565 """
566 # Remove delayed calls that were cancelled from head of queue.
567 while self._scheduled and self._scheduled[0]._cancelled:
568 heapq.heappop(self._scheduled)
569
570 timeout = None
571 if self._ready:
572 timeout = 0
573 elif self._scheduled:
574 # Compute the desired timeout.
575 when = self._scheduled[0]._when
576 deadline = max(0, when - self.time())
577 if timeout is None:
578 timeout = deadline
579 else:
580 timeout = min(timeout, deadline)
581
582 # TODO: Instrumentation only in debug mode?
583 t0 = self.time()
584 event_list = self._selector.select(timeout)
585 t1 = self.time()
586 argstr = '' if timeout is None else '{:.3f}'.format(timeout)
587 if t1-t0 >= 1:
588 level = logging.INFO
589 else:
590 level = logging.DEBUG
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700591 logger.log(level, 'poll%s took %.3f seconds', argstr, t1-t0)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700592 self._process_events(event_list)
593
594 # Handle 'later' callbacks that are ready.
595 now = self.time()
596 while self._scheduled:
597 handle = self._scheduled[0]
598 if handle._when > now:
599 break
600 handle = heapq.heappop(self._scheduled)
601 self._ready.append(handle)
602
603 # This is the only place where callbacks are actually *called*.
604 # All other places just add them to ready.
605 # Note: We run all currently scheduled callbacks, but not any
606 # callbacks scheduled by callbacks run this time around --
607 # they will be run the next time (after another I/O poll).
608 # Use an idiom that is threadsafe without using locks.
609 ntodo = len(self._ready)
610 for i in range(ntodo):
611 handle = self._ready.popleft()
612 if not handle._cancelled:
613 handle._run()
614 handle = None # Needed to break cycles when an exception occurs.