blob: bae9a49397fca6902c6b0be001f90ece88942aa8 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop using a selector and related classes.
2
3A selector is a "notify-when-ready" multiplexer. For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
6
7import collections
8import socket
9try:
10 import ssl
11except ImportError: # pragma: no cover
12 ssl = None
13
14from . import base_events
15from . import constants
16from . import events
17from . import futures
18from . import selectors
19from . import transports
20from .log import asyncio_log
21
22
23class BaseSelectorEventLoop(base_events.BaseEventLoop):
24 """Selector event loop.
25
26 See events.EventLoop for API specification.
27 """
28
29 def __init__(self, selector=None):
30 super().__init__()
31
32 if selector is None:
33 selector = selectors.DefaultSelector()
34 asyncio_log.debug('Using selector: %s', selector.__class__.__name__)
35 self._selector = selector
36 self._make_self_pipe()
37
38 def _make_socket_transport(self, sock, protocol, waiter=None, *,
39 extra=None, server=None):
40 return _SelectorSocketTransport(self, sock, protocol, waiter,
41 extra, server)
42
43 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
44 server_side=False, server_hostname=None,
45 extra=None, server=None):
46 return _SelectorSslTransport(
47 self, rawsock, protocol, sslcontext, waiter,
48 server_side, server_hostname, extra, server)
49
50 def _make_datagram_transport(self, sock, protocol,
51 address=None, extra=None):
52 return _SelectorDatagramTransport(self, sock, protocol, address, extra)
53
54 def close(self):
55 if self._selector is not None:
56 self._close_self_pipe()
57 self._selector.close()
58 self._selector = None
59
60 def _socketpair(self):
61 raise NotImplementedError
62
63 def _close_self_pipe(self):
64 self.remove_reader(self._ssock.fileno())
65 self._ssock.close()
66 self._ssock = None
67 self._csock.close()
68 self._csock = None
69 self._internal_fds -= 1
70
71 def _make_self_pipe(self):
72 # A self-socket, really. :-)
73 self._ssock, self._csock = self._socketpair()
74 self._ssock.setblocking(False)
75 self._csock.setblocking(False)
76 self._internal_fds += 1
77 self.add_reader(self._ssock.fileno(), self._read_from_self)
78
79 def _read_from_self(self):
80 try:
81 self._ssock.recv(1)
82 except (BlockingIOError, InterruptedError):
83 pass
84
85 def _write_to_self(self):
86 try:
87 self._csock.send(b'x')
88 except (BlockingIOError, InterruptedError):
89 pass
90
91 def _start_serving(self, protocol_factory, sock, ssl=None, server=None):
92 self.add_reader(sock.fileno(), self._accept_connection,
93 protocol_factory, sock, ssl, server)
94
95 def _accept_connection(self, protocol_factory, sock, ssl=None,
96 server=None):
97 try:
98 conn, addr = sock.accept()
99 conn.setblocking(False)
100 except (BlockingIOError, InterruptedError):
101 pass # False alarm.
102 except Exception:
103 # Bad error. Stop serving.
104 self.remove_reader(sock.fileno())
105 sock.close()
106 # There's nowhere to send the error, so just log it.
107 # TODO: Someone will want an error handler for this.
108 asyncio_log.exception('Accept failed')
109 else:
110 if ssl:
111 self._make_ssl_transport(
112 conn, protocol_factory(), ssl, None,
113 server_side=True, extra={'peername': addr}, server=server)
114 else:
115 self._make_socket_transport(
116 conn, protocol_factory(), extra={'peername': addr},
117 server=server)
118 # It's now up to the protocol to handle the connection.
119
120 def add_reader(self, fd, callback, *args):
121 """Add a reader callback."""
122 handle = events.make_handle(callback, args)
123 try:
124 key = self._selector.get_key(fd)
125 except KeyError:
126 self._selector.register(fd, selectors.EVENT_READ,
127 (handle, None))
128 else:
129 mask, (reader, writer) = key.events, key.data
130 self._selector.modify(fd, mask | selectors.EVENT_READ,
131 (handle, writer))
132 if reader is not None:
133 reader.cancel()
134
135 def remove_reader(self, fd):
136 """Remove a reader callback."""
137 try:
138 key = self._selector.get_key(fd)
139 except KeyError:
140 return False
141 else:
142 mask, (reader, writer) = key.events, key.data
143 mask &= ~selectors.EVENT_READ
144 if not mask:
145 self._selector.unregister(fd)
146 else:
147 self._selector.modify(fd, mask, (None, writer))
148
149 if reader is not None:
150 reader.cancel()
151 return True
152 else:
153 return False
154
155 def add_writer(self, fd, callback, *args):
156 """Add a writer callback.."""
157 handle = events.make_handle(callback, args)
158 try:
159 key = self._selector.get_key(fd)
160 except KeyError:
161 self._selector.register(fd, selectors.EVENT_WRITE,
162 (None, handle))
163 else:
164 mask, (reader, writer) = key.events, key.data
165 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
166 (reader, handle))
167 if writer is not None:
168 writer.cancel()
169
170 def remove_writer(self, fd):
171 """Remove a writer callback."""
172 try:
173 key = self._selector.get_key(fd)
174 except KeyError:
175 return False
176 else:
177 mask, (reader, writer) = key.events, key.data
178 # Remove both writer and connector.
179 mask &= ~selectors.EVENT_WRITE
180 if not mask:
181 self._selector.unregister(fd)
182 else:
183 self._selector.modify(fd, mask, (reader, None))
184
185 if writer is not None:
186 writer.cancel()
187 return True
188 else:
189 return False
190
191 def sock_recv(self, sock, n):
192 """XXX"""
193 fut = futures.Future(loop=self)
194 self._sock_recv(fut, False, sock, n)
195 return fut
196
197 def _sock_recv(self, fut, registered, sock, n):
198 fd = sock.fileno()
199 if registered:
200 # Remove the callback early. It should be rare that the
201 # selector says the fd is ready but the call still returns
202 # EAGAIN, and I am willing to take a hit in that case in
203 # order to simplify the common case.
204 self.remove_reader(fd)
205 if fut.cancelled():
206 return
207 try:
208 data = sock.recv(n)
209 except (BlockingIOError, InterruptedError):
210 self.add_reader(fd, self._sock_recv, fut, True, sock, n)
211 except Exception as exc:
212 fut.set_exception(exc)
213 else:
214 fut.set_result(data)
215
216 def sock_sendall(self, sock, data):
217 """XXX"""
218 fut = futures.Future(loop=self)
219 if data:
220 self._sock_sendall(fut, False, sock, data)
221 else:
222 fut.set_result(None)
223 return fut
224
225 def _sock_sendall(self, fut, registered, sock, data):
226 fd = sock.fileno()
227
228 if registered:
229 self.remove_writer(fd)
230 if fut.cancelled():
231 return
232
233 try:
234 n = sock.send(data)
235 except (BlockingIOError, InterruptedError):
236 n = 0
237 except Exception as exc:
238 fut.set_exception(exc)
239 return
240
241 if n == len(data):
242 fut.set_result(None)
243 else:
244 if n:
245 data = data[n:]
246 self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
247
248 def sock_connect(self, sock, address):
249 """XXX"""
250 # That address better not require a lookup! We're not calling
251 # self.getaddrinfo() for you here. But verifying this is
252 # complicated; the socket module doesn't have a pattern for
253 # IPv6 addresses (there are too many forms, apparently).
254 fut = futures.Future(loop=self)
255 self._sock_connect(fut, False, sock, address)
256 return fut
257
258 def _sock_connect(self, fut, registered, sock, address):
259 # TODO: Use getaddrinfo() to look up the address, to avoid the
260 # trap of hanging the entire event loop when the address
261 # requires doing a DNS lookup. (OTOH, the caller should
262 # already have done this, so it would be nice if we could
263 # easily tell whether the address needs looking up or not. I
264 # know how to do this for IPv4, but IPv6 addresses have many
265 # syntaxes.)
266 fd = sock.fileno()
267 if registered:
268 self.remove_writer(fd)
269 if fut.cancelled():
270 return
271 try:
272 if not registered:
273 # First time around.
274 sock.connect(address)
275 else:
276 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
277 if err != 0:
278 # Jump to the except clause below.
279 raise OSError(err, 'Connect call failed')
280 except (BlockingIOError, InterruptedError):
281 self.add_writer(fd, self._sock_connect, fut, True, sock, address)
282 except Exception as exc:
283 fut.set_exception(exc)
284 else:
285 fut.set_result(None)
286
287 def sock_accept(self, sock):
288 """XXX"""
289 fut = futures.Future(loop=self)
290 self._sock_accept(fut, False, sock)
291 return fut
292
293 def _sock_accept(self, fut, registered, sock):
294 fd = sock.fileno()
295 if registered:
296 self.remove_reader(fd)
297 if fut.cancelled():
298 return
299 try:
300 conn, address = sock.accept()
301 conn.setblocking(False)
302 except (BlockingIOError, InterruptedError):
303 self.add_reader(fd, self._sock_accept, fut, True, sock)
304 except Exception as exc:
305 fut.set_exception(exc)
306 else:
307 fut.set_result((conn, address))
308
309 def _process_events(self, event_list):
310 for key, mask in event_list:
311 fileobj, (reader, writer) = key.fileobj, key.data
312 if mask & selectors.EVENT_READ and reader is not None:
313 if reader._cancelled:
314 self.remove_reader(fileobj)
315 else:
316 self._add_callback(reader)
317 if mask & selectors.EVENT_WRITE and writer is not None:
318 if writer._cancelled:
319 self.remove_writer(fileobj)
320 else:
321 self._add_callback(writer)
322
323 def _stop_serving(self, sock):
324 self.remove_reader(sock.fileno())
325 sock.close()
326
327
328class _SelectorTransport(transports.Transport):
329
330 max_size = 256 * 1024 # Buffer size passed to recv().
331
332 def __init__(self, loop, sock, protocol, extra, server=None):
333 super().__init__(extra)
334 self._extra['socket'] = sock
335 self._extra['sockname'] = sock.getsockname()
336 if 'peername' not in self._extra:
337 try:
338 self._extra['peername'] = sock.getpeername()
339 except socket.error:
340 self._extra['peername'] = None
341 self._loop = loop
342 self._sock = sock
343 self._sock_fd = sock.fileno()
344 self._protocol = protocol
345 self._server = server
346 self._buffer = collections.deque()
347 self._conn_lost = 0
348 self._closing = False # Set when close() called.
349 if server is not None:
350 server.attach(self)
351
352 def abort(self):
353 self._force_close(None)
354
355 def close(self):
356 if self._closing:
357 return
358 self._closing = True
359 self._conn_lost += 1
360 self._loop.remove_reader(self._sock_fd)
361 if not self._buffer:
362 self._loop.call_soon(self._call_connection_lost, None)
363
364 def _fatal_error(self, exc):
365 # should be called from exception handler only
366 asyncio_log.exception('Fatal error for %s', self)
367 self._force_close(exc)
368
369 def _force_close(self, exc):
370 if self._buffer:
371 self._buffer.clear()
372 self._loop.remove_writer(self._sock_fd)
373
374 if self._closing:
375 return
376
377 self._closing = True
378 self._conn_lost += 1
379 self._loop.remove_reader(self._sock_fd)
380 self._loop.call_soon(self._call_connection_lost, exc)
381
382 def _call_connection_lost(self, exc):
383 try:
384 self._protocol.connection_lost(exc)
385 finally:
386 self._sock.close()
387 self._sock = None
388 self._protocol = None
389 self._loop = None
390 server = self._server
391 if server is not None:
392 server.detach(self)
393 self._server = None
394
395
396class _SelectorSocketTransport(_SelectorTransport):
397
398 def __init__(self, loop, sock, protocol, waiter=None,
399 extra=None, server=None):
400 super().__init__(loop, sock, protocol, extra, server)
401 self._eof = False
402 self._paused = False
403
404 self._loop.add_reader(self._sock_fd, self._read_ready)
405 self._loop.call_soon(self._protocol.connection_made, self)
406 if waiter is not None:
407 self._loop.call_soon(waiter.set_result, None)
408
409 def pause(self):
410 assert not self._closing, 'Cannot pause() when closing'
411 assert not self._paused, 'Already paused'
412 self._paused = True
413 self._loop.remove_reader(self._sock_fd)
414
415 def resume(self):
416 assert self._paused, 'Not paused'
417 self._paused = False
418 if self._closing:
419 return
420 self._loop.add_reader(self._sock_fd, self._read_ready)
421
422 def _read_ready(self):
423 try:
424 data = self._sock.recv(self.max_size)
425 except (BlockingIOError, InterruptedError):
426 pass
427 except ConnectionResetError as exc:
428 self._force_close(exc)
429 except Exception as exc:
430 self._fatal_error(exc)
431 else:
432 if data:
433 self._protocol.data_received(data)
434 else:
435 keep_open = self._protocol.eof_received()
436 if not keep_open:
437 self.close()
438
439 def write(self, data):
440 assert isinstance(data, bytes), repr(type(data))
441 assert not self._eof, 'Cannot call write() after write_eof()'
442 if not data:
443 return
444
445 if self._conn_lost:
446 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
447 asyncio_log.warning('socket.send() raised exception.')
448 self._conn_lost += 1
449 return
450
451 if not self._buffer:
452 # Attempt to send it right away first.
453 try:
454 n = self._sock.send(data)
455 except (BlockingIOError, InterruptedError):
456 n = 0
457 except (BrokenPipeError, ConnectionResetError) as exc:
458 self._force_close(exc)
459 return
460 except OSError as exc:
461 self._fatal_error(exc)
462 return
463 else:
464 data = data[n:]
465 if not data:
466 return
467 # Start async I/O.
468 self._loop.add_writer(self._sock_fd, self._write_ready)
469
470 self._buffer.append(data)
471
472 def _write_ready(self):
473 data = b''.join(self._buffer)
474 assert data, 'Data should not be empty'
475
476 self._buffer.clear()
477 try:
478 n = self._sock.send(data)
479 except (BlockingIOError, InterruptedError):
480 self._buffer.append(data)
481 except (BrokenPipeError, ConnectionResetError) as exc:
482 self._loop.remove_writer(self._sock_fd)
483 self._force_close(exc)
484 except Exception as exc:
485 self._loop.remove_writer(self._sock_fd)
486 self._fatal_error(exc)
487 else:
488 data = data[n:]
489 if not data:
490 self._loop.remove_writer(self._sock_fd)
491 if self._closing:
492 self._call_connection_lost(None)
493 elif self._eof:
494 self._sock.shutdown(socket.SHUT_WR)
495 return
496
497 self._buffer.append(data) # Try again later.
498
499 def write_eof(self):
500 if self._eof:
501 return
502 self._eof = True
503 if not self._buffer:
504 self._sock.shutdown(socket.SHUT_WR)
505
506 def can_write_eof(self):
507 return True
508
509
510class _SelectorSslTransport(_SelectorTransport):
511
512 def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,
513 server_side=False, server_hostname=None,
514 extra=None, server=None):
515 if server_side:
516 assert isinstance(
517 sslcontext, ssl.SSLContext), 'Must pass an SSLContext'
518 else:
519 # Client-side may pass ssl=True to use a default context.
520 # The default is the same as used by urllib.
521 if sslcontext is None:
522 sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
523 sslcontext.options |= ssl.OP_NO_SSLv2
524 sslcontext.set_default_verify_paths()
525 sslcontext.verify_mode = ssl.CERT_REQUIRED
526 wrap_kwargs = {
527 'server_side': server_side,
528 'do_handshake_on_connect': False,
529 }
530 if server_hostname is not None and not server_side and ssl.HAS_SNI:
531 wrap_kwargs['server_hostname'] = server_hostname
532 sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)
533
534 super().__init__(loop, sslsock, protocol, extra, server)
535
536 self._server_hostname = server_hostname
537 self._waiter = waiter
538 self._rawsock = rawsock
539 self._sslcontext = sslcontext
540 self._paused = False
541
542 # SSL-specific extra info. (peercert is set later)
543 self._extra.update(sslcontext=sslcontext)
544
545 self._on_handshake()
546
547 def _on_handshake(self):
548 try:
549 self._sock.do_handshake()
550 except ssl.SSLWantReadError:
551 self._loop.add_reader(self._sock_fd, self._on_handshake)
552 return
553 except ssl.SSLWantWriteError:
554 self._loop.add_writer(self._sock_fd, self._on_handshake)
555 return
556 except Exception as exc:
557 self._sock.close()
558 if self._waiter is not None:
559 self._waiter.set_exception(exc)
560 return
561 except BaseException as exc:
562 self._sock.close()
563 if self._waiter is not None:
564 self._waiter.set_exception(exc)
565 raise
566
567 # Verify hostname if requested.
568 peercert = self._sock.getpeercert()
569 if (self._server_hostname is not None and
570 self._sslcontext.verify_mode == ssl.CERT_REQUIRED):
571 try:
572 ssl.match_hostname(peercert, self._server_hostname)
573 except Exception as exc:
574 self._sock.close()
575 if self._waiter is not None:
576 self._waiter.set_exception(exc)
577 return
578
579 # Add extra info that becomes available after handshake.
580 self._extra.update(peercert=peercert,
581 cipher=self._sock.cipher(),
582 compression=self._sock.compression(),
583 )
584
585 self._loop.remove_reader(self._sock_fd)
586 self._loop.remove_writer(self._sock_fd)
587 self._loop.add_reader(self._sock_fd, self._on_ready)
588 self._loop.add_writer(self._sock_fd, self._on_ready)
589 self._loop.call_soon(self._protocol.connection_made, self)
590 if self._waiter is not None:
591 self._loop.call_soon(self._waiter.set_result, None)
592
593 def pause(self):
594 # XXX This is a bit icky, given the comment at the top of
595 # _on_ready(). Is it possible to evoke a deadlock? I don't
596 # know, although it doesn't look like it; write() will still
597 # accept more data for the buffer and eventually the app will
598 # call resume() again, and things will flow again.
599
600 assert not self._closing, 'Cannot pause() when closing'
601 assert not self._paused, 'Already paused'
602 self._paused = True
603 self._loop.remove_reader(self._sock_fd)
604
605 def resume(self):
606 assert self._paused, 'Not paused'
607 self._paused = False
608 if self._closing:
609 return
610 self._loop.add_reader(self._sock_fd, self._on_ready)
611
612 def _on_ready(self):
613 # Because of renegotiations (?), there's no difference between
614 # readable and writable. We just try both. XXX This may be
615 # incorrect; we probably need to keep state about what we
616 # should do next.
617
618 # First try reading.
619 if not self._closing and not self._paused:
620 try:
621 data = self._sock.recv(self.max_size)
622 except (BlockingIOError, InterruptedError,
623 ssl.SSLWantReadError, ssl.SSLWantWriteError):
624 pass
625 except ConnectionResetError as exc:
626 self._force_close(exc)
627 except Exception as exc:
628 self._fatal_error(exc)
629 else:
630 if data:
631 self._protocol.data_received(data)
632 else:
633 try:
634 self._protocol.eof_received()
635 finally:
636 self.close()
637
638 # Now try writing, if there's anything to write.
639 if self._buffer:
640 data = b''.join(self._buffer)
641 self._buffer.clear()
642 try:
643 n = self._sock.send(data)
644 except (BlockingIOError, InterruptedError,
645 ssl.SSLWantReadError, ssl.SSLWantWriteError):
646 n = 0
647 except (BrokenPipeError, ConnectionResetError) as exc:
648 self._loop.remove_writer(self._sock_fd)
649 self._force_close(exc)
650 return
651 except Exception as exc:
652 self._loop.remove_writer(self._sock_fd)
653 self._fatal_error(exc)
654 return
655
656 if n < len(data):
657 self._buffer.append(data[n:])
658
659 if self._closing and not self._buffer:
660 self._loop.remove_writer(self._sock_fd)
661 self._call_connection_lost(None)
662
663 def write(self, data):
664 assert isinstance(data, bytes), repr(type(data))
665 if not data:
666 return
667
668 if self._conn_lost:
669 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
670 asyncio_log.warning('socket.send() raised exception.')
671 self._conn_lost += 1
672 return
673
674 self._buffer.append(data)
675 # We could optimize, but the callback can do this for now.
676
677 def can_write_eof(self):
678 return False
679
680 def close(self):
681 if self._closing:
682 return
683 self._closing = True
684 self._conn_lost += 1
685 self._loop.remove_reader(self._sock_fd)
686
687
688class _SelectorDatagramTransport(_SelectorTransport):
689
690 def __init__(self, loop, sock, protocol, address=None, extra=None):
691 super().__init__(loop, sock, protocol, extra)
692
693 self._address = address
694 self._loop.add_reader(self._sock_fd, self._read_ready)
695 self._loop.call_soon(self._protocol.connection_made, self)
696
697 def _read_ready(self):
698 try:
699 data, addr = self._sock.recvfrom(self.max_size)
700 except (BlockingIOError, InterruptedError):
701 pass
702 except Exception as exc:
703 self._fatal_error(exc)
704 else:
705 self._protocol.datagram_received(data, addr)
706
707 def sendto(self, data, addr=None):
708 assert isinstance(data, bytes), repr(type(data))
709 if not data:
710 return
711
712 if self._address:
713 assert addr in (None, self._address)
714
715 if self._conn_lost and self._address:
716 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
717 asyncio_log.warning('socket.send() raised exception.')
718 self._conn_lost += 1
719 return
720
721 if not self._buffer:
722 # Attempt to send it right away first.
723 try:
724 if self._address:
725 self._sock.send(data)
726 else:
727 self._sock.sendto(data, addr)
728 return
729 except ConnectionRefusedError as exc:
730 if self._address:
731 self._fatal_error(exc)
732 return
733 except (BlockingIOError, InterruptedError):
734 self._loop.add_writer(self._sock_fd, self._sendto_ready)
735 except Exception as exc:
736 self._fatal_error(exc)
737 return
738
739 self._buffer.append((data, addr))
740
741 def _sendto_ready(self):
742 while self._buffer:
743 data, addr = self._buffer.popleft()
744 try:
745 if self._address:
746 self._sock.send(data)
747 else:
748 self._sock.sendto(data, addr)
749 except ConnectionRefusedError as exc:
750 if self._address:
751 self._fatal_error(exc)
752 return
753 except (BlockingIOError, InterruptedError):
754 self._buffer.appendleft((data, addr)) # Try again later.
755 break
756 except Exception as exc:
757 self._fatal_error(exc)
758 return
759
760 if not self._buffer:
761 self._loop.remove_writer(self._sock_fd)
762 if self._closing:
763 self._call_connection_lost(None)
764
765 def _force_close(self, exc):
766 if self._address and isinstance(exc, ConnectionRefusedError):
767 self._protocol.connection_refused(exc)
768
769 super()._force_close(exc)