blob: 9dbe550b01774ad9960ebd263893b4a9f65a864e [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop using a selector and related classes.
2
3A selector is a "notify-when-ready" multiplexer. For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
6
Victor Stinner8dffc452014-01-25 15:32:06 +01007__all__ = ['BaseSelectorEventLoop']
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import collections
Guido van Rossum3317a132013-11-01 14:12:50 -070010import errno
Victor Stinnerd5aeccf92014-08-31 15:07:57 +020011import functools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import socket
Victor Stinner978a9af2015-01-29 17:50:58 +010013import warnings
Yury Selivanov5b8d4f92016-10-05 17:48:59 -040014import weakref
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070015try:
16 import ssl
17except ImportError: # pragma: no cover
18 ssl = None
19
20from . import base_events
Yury Selivanov2a8911c2015-08-04 15:56:33 -040021from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070022from . import constants
23from . import events
24from . import futures
25from . import selectors
26from . import transports
Victor Stinner231b4042015-01-14 00:19:09 +010027from . import sslproto
Victor Stinner29342622015-01-29 14:15:19 +010028from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070029from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070030
31
Victor Stinnere912e652014-07-12 03:11:53 +020032def _test_selector_event(selector, fd, event):
33 # Test if the selector is monitoring 'event' events
34 # for the file descriptor 'fd'.
35 try:
36 key = selector.get_key(fd)
37 except KeyError:
38 return False
39 else:
40 return bool(key.events & event)
41
42
Yury Selivanov44c19ec2016-09-11 21:39:31 -040043if hasattr(socket, 'TCP_NODELAY'):
44 def _set_nodelay(sock):
45 if (sock.family in {socket.AF_INET, socket.AF_INET6} and
46 sock.type == socket.SOCK_STREAM and
47 sock.proto == socket.IPPROTO_TCP):
48 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
49else:
50 def _set_nodelay(sock):
51 pass
52
53
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054class BaseSelectorEventLoop(base_events.BaseEventLoop):
55 """Selector event loop.
56
57 See events.EventLoop for API specification.
58 """
59
60 def __init__(self, selector=None):
61 super().__init__()
62
63 if selector is None:
64 selector = selectors.DefaultSelector()
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070065 logger.debug('Using selector: %s', selector.__class__.__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070066 self._selector = selector
67 self._make_self_pipe()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -040068 self._transports = weakref.WeakValueDictionary()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069
70 def _make_socket_transport(self, sock, protocol, waiter=None, *,
71 extra=None, server=None):
72 return _SelectorSocketTransport(self, sock, protocol, waiter,
73 extra, server)
74
Victor Stinner15cc6782015-01-09 00:09:10 +010075 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
76 *, server_side=False, server_hostname=None,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 extra=None, server=None):
Victor Stinner231b4042015-01-14 00:19:09 +010078 if not sslproto._is_sslproto_available():
79 return self._make_legacy_ssl_transport(
80 rawsock, protocol, sslcontext, waiter,
81 server_side=server_side, server_hostname=server_hostname,
82 extra=extra, server=server)
83
84 ssl_protocol = sslproto.SSLProtocol(self, protocol, sslcontext, waiter,
85 server_side, server_hostname)
86 _SelectorSocketTransport(self, rawsock, ssl_protocol,
87 extra=extra, server=server)
88 return ssl_protocol._app_transport
89
90 def _make_legacy_ssl_transport(self, rawsock, protocol, sslcontext,
91 waiter, *,
92 server_side=False, server_hostname=None,
93 extra=None, server=None):
94 # Use the legacy API: SSL_write, SSL_read, etc. The legacy API is used
95 # on Python 3.4 and older, when ssl.MemoryBIO is not available.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096 return _SelectorSslTransport(
97 self, rawsock, protocol, sslcontext, waiter,
98 server_side, server_hostname, extra, server)
99
100 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200101 address=None, waiter=None, extra=None):
102 return _SelectorDatagramTransport(self, sock, protocol,
103 address, waiter, extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700104
105 def close(self):
Victor Stinner956de692014-12-26 21:07:52 +0100106 if self.is_running():
Victor Stinner5e631202014-11-21 00:23:27 +0100107 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200108 if self.is_closed():
109 return
110 self._close_self_pipe()
Victor Stinner5e631202014-11-21 00:23:27 +0100111 super().close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700112 if self._selector is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113 self._selector.close()
114 self._selector = None
115
116 def _socketpair(self):
117 raise NotImplementedError
118
119 def _close_self_pipe(self):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400120 self._remove_reader(self._ssock.fileno())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700121 self._ssock.close()
122 self._ssock = None
123 self._csock.close()
124 self._csock = None
125 self._internal_fds -= 1
126
127 def _make_self_pipe(self):
128 # A self-socket, really. :-)
129 self._ssock, self._csock = self._socketpair()
130 self._ssock.setblocking(False)
131 self._csock.setblocking(False)
132 self._internal_fds += 1
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400133 self._add_reader(self._ssock.fileno(), self._read_from_self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700134
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200135 def _process_self_data(self, data):
136 pass
137
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700138 def _read_from_self(self):
Victor Stinner54c4b8e2014-06-19 12:59:15 +0200139 while True:
140 try:
141 data = self._ssock.recv(4096)
142 if not data:
143 break
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200144 self._process_self_data(data)
Victor Stinner54c4b8e2014-06-19 12:59:15 +0200145 except InterruptedError:
146 continue
147 except BlockingIOError:
148 break
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149
150 def _write_to_self(self):
Guido van Rossum3d139d82014-05-06 14:42:40 -0700151 # This may be called from a different thread, possibly after
152 # _close_self_pipe() has been called or even while it is
153 # running. Guard for self._csock being None or closed. When
154 # a socket is closed, send() raises OSError (with errno set to
155 # EBADF, but let's not rely on the exact error code).
156 csock = self._csock
157 if csock is not None:
158 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200159 csock.send(b'\0')
Guido van Rossum3d139d82014-05-06 14:42:40 -0700160 except OSError:
Victor Stinner65dd69a2014-07-25 22:36:05 +0200161 if self._debug:
162 logger.debug("Fail to write a null byte into the "
163 "self-pipe socket",
164 exc_info=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700165
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700166 def _start_serving(self, protocol_factory, sock,
Yury Selivanova1b0e7d2016-09-15 14:13:15 -0400167 sslcontext=None, server=None, backlog=100):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400168 self._add_reader(sock.fileno(), self._accept_connection,
169 protocol_factory, sock, sslcontext, server, backlog)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700170
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700171 def _accept_connection(self, protocol_factory, sock,
Yury Selivanova1b0e7d2016-09-15 14:13:15 -0400172 sslcontext=None, server=None, backlog=100):
173 # This method is only called once for each event loop tick where the
174 # listening socket has triggered an EVENT_READ. There may be multiple
175 # connections waiting for an .accept() so it is called in a loop.
176 # See https://bugs.python.org/issue27906 for more details.
177 for _ in range(backlog):
178 try:
179 conn, addr = sock.accept()
180 if self._debug:
181 logger.debug("%r got a new connection from %r: %r",
182 server, addr, conn)
183 conn.setblocking(False)
184 except (BlockingIOError, InterruptedError, ConnectionAbortedError):
185 # Early exit because the socket accept buffer is empty.
186 return None
187 except OSError as exc:
188 # There's nowhere to send the error, so just log it.
189 if exc.errno in (errno.EMFILE, errno.ENFILE,
190 errno.ENOBUFS, errno.ENOMEM):
191 # Some platforms (e.g. Linux keep reporting the FD as
192 # ready, so we remove the read handler temporarily.
193 # We'll try again in a while.
194 self.call_exception_handler({
195 'message': 'socket.accept() out of system resource',
196 'exception': exc,
197 'socket': sock,
198 })
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400199 self._remove_reader(sock.fileno())
Yury Selivanova1b0e7d2016-09-15 14:13:15 -0400200 self.call_later(constants.ACCEPT_RETRY_DELAY,
201 self._start_serving,
202 protocol_factory, sock, sslcontext, server,
203 backlog)
204 else:
205 raise # The event loop will catch, log and ignore it.
Guido van Rossum3317a132013-11-01 14:12:50 -0700206 else:
Yury Selivanova1b0e7d2016-09-15 14:13:15 -0400207 extra = {'peername': addr}
208 accept = self._accept_connection2(protocol_factory, conn, extra,
209 sslcontext, server)
210 self.create_task(accept)
Victor Stinner29342622015-01-29 14:15:19 +0100211
212 @coroutine
213 def _accept_connection2(self, protocol_factory, conn, extra,
214 sslcontext=None, server=None):
215 protocol = None
216 transport = None
217 try:
Victor Stinner29ad0112015-01-15 00:04:21 +0100218 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -0400219 waiter = self.create_future()
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700220 if sslcontext:
Victor Stinner29342622015-01-29 14:15:19 +0100221 transport = self._make_ssl_transport(
222 conn, protocol, sslcontext, waiter=waiter,
223 server_side=True, extra=extra, server=server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700224 else:
Victor Stinner29342622015-01-29 14:15:19 +0100225 transport = self._make_socket_transport(
226 conn, protocol, waiter=waiter, extra=extra,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700227 server=server)
Victor Stinner29342622015-01-29 14:15:19 +0100228
229 try:
230 yield from waiter
231 except:
232 transport.close()
233 raise
234
235 # It's now up to the protocol to handle the connection.
236 except Exception as exc:
Victor Stinneraa41b9b2015-02-04 14:50:59 +0100237 if self._debug:
Victor Stinner29342622015-01-29 14:15:19 +0100238 context = {
239 'message': ('Error on transport creation '
240 'for incoming connection'),
241 'exception': exc,
242 }
243 if protocol is not None:
244 context['protocol'] = protocol
245 if transport is not None:
246 context['transport'] = transport
247 self.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700248
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400249 def _ensure_fd_no_transport(self, fd):
250 try:
251 transport = self._transports[fd]
252 except KeyError:
253 pass
254 else:
255 if not transport.is_closing():
256 raise RuntimeError(
257 'File descriptor {!r} is used by transport {!r}'.format(
258 fd, transport))
259
260 def _add_reader(self, fd, callback, *args):
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200261 self._check_closed()
Yury Selivanovff827f02014-02-18 18:02:19 -0500262 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700263 try:
264 key = self._selector.get_key(fd)
265 except KeyError:
266 self._selector.register(fd, selectors.EVENT_READ,
267 (handle, None))
268 else:
269 mask, (reader, writer) = key.events, key.data
270 self._selector.modify(fd, mask | selectors.EVENT_READ,
271 (handle, writer))
272 if reader is not None:
273 reader.cancel()
274
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400275 def _remove_reader(self, fd):
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200276 if self.is_closed():
Victor Stinnereeeebcd2014-03-06 00:52:53 +0100277 return False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700278 try:
279 key = self._selector.get_key(fd)
280 except KeyError:
281 return False
282 else:
283 mask, (reader, writer) = key.events, key.data
284 mask &= ~selectors.EVENT_READ
285 if not mask:
286 self._selector.unregister(fd)
287 else:
288 self._selector.modify(fd, mask, (None, writer))
289
290 if reader is not None:
291 reader.cancel()
292 return True
293 else:
294 return False
295
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400296 def _add_writer(self, fd, callback, *args):
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200297 self._check_closed()
Yury Selivanovff827f02014-02-18 18:02:19 -0500298 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700299 try:
300 key = self._selector.get_key(fd)
301 except KeyError:
302 self._selector.register(fd, selectors.EVENT_WRITE,
303 (None, handle))
304 else:
305 mask, (reader, writer) = key.events, key.data
306 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
307 (reader, handle))
308 if writer is not None:
309 writer.cancel()
310
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400311 def _remove_writer(self, fd):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700312 """Remove a writer callback."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200313 if self.is_closed():
Victor Stinnereeeebcd2014-03-06 00:52:53 +0100314 return False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700315 try:
316 key = self._selector.get_key(fd)
317 except KeyError:
318 return False
319 else:
320 mask, (reader, writer) = key.events, key.data
321 # Remove both writer and connector.
322 mask &= ~selectors.EVENT_WRITE
323 if not mask:
324 self._selector.unregister(fd)
325 else:
326 self._selector.modify(fd, mask, (reader, None))
327
328 if writer is not None:
329 writer.cancel()
330 return True
331 else:
332 return False
333
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400334 def add_reader(self, fd, callback, *args):
335 """Add a reader callback."""
336 self._ensure_fd_no_transport(fd)
337 return self._add_reader(fd, callback, *args)
338
339 def remove_reader(self, fd):
340 """Remove a reader callback."""
341 self._ensure_fd_no_transport(fd)
342 return self._remove_reader(fd)
343
344 def add_writer(self, fd, callback, *args):
345 """Add a writer callback.."""
346 self._ensure_fd_no_transport(fd)
347 return self._add_writer(fd, callback, *args)
348
349 def remove_writer(self, fd):
350 """Remove a writer callback."""
351 self._ensure_fd_no_transport(fd)
352 return self._remove_writer(fd)
353
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700354 def sock_recv(self, sock, n):
Victor Stinnerd1432092014-06-19 17:11:49 +0200355 """Receive data from the socket.
356
357 The return value is a bytes object representing the data received.
358 The maximum amount of data to be received at once is specified by
359 nbytes.
360
361 This method is a coroutine.
362 """
Victor Stinneraa41b9b2015-02-04 14:50:59 +0100363 if self._debug and sock.gettimeout() != 0:
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200364 raise ValueError("the socket must be non-blocking")
Yury Selivanov7661db62016-05-16 15:38:39 -0400365 fut = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700366 self._sock_recv(fut, False, sock, n)
367 return fut
368
369 def _sock_recv(self, fut, registered, sock, n):
Victor Stinner28773462014-02-13 09:24:37 +0100370 # _sock_recv() can add itself as an I/O callback if the operation can't
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500371 # be done immediately. Don't use it directly, call sock_recv().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372 fd = sock.fileno()
373 if registered:
374 # Remove the callback early. It should be rare that the
375 # selector says the fd is ready but the call still returns
376 # EAGAIN, and I am willing to take a hit in that case in
377 # order to simplify the common case.
378 self.remove_reader(fd)
379 if fut.cancelled():
380 return
381 try:
382 data = sock.recv(n)
383 except (BlockingIOError, InterruptedError):
384 self.add_reader(fd, self._sock_recv, fut, True, sock, n)
385 except Exception as exc:
386 fut.set_exception(exc)
387 else:
388 fut.set_result(data)
389
390 def sock_sendall(self, sock, data):
Victor Stinnerd1432092014-06-19 17:11:49 +0200391 """Send data to the socket.
392
393 The socket must be connected to a remote socket. This method continues
394 to send data from data until either all data has been sent or an
395 error occurs. None is returned on success. On error, an exception is
396 raised, and there is no way to determine how much data, if any, was
397 successfully processed by the receiving end of the connection.
398
399 This method is a coroutine.
400 """
Victor Stinneraa41b9b2015-02-04 14:50:59 +0100401 if self._debug and sock.gettimeout() != 0:
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200402 raise ValueError("the socket must be non-blocking")
Yury Selivanov7661db62016-05-16 15:38:39 -0400403 fut = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700404 if data:
405 self._sock_sendall(fut, False, sock, data)
406 else:
407 fut.set_result(None)
408 return fut
409
410 def _sock_sendall(self, fut, registered, sock, data):
411 fd = sock.fileno()
412
413 if registered:
414 self.remove_writer(fd)
415 if fut.cancelled():
416 return
417
418 try:
419 n = sock.send(data)
420 except (BlockingIOError, InterruptedError):
421 n = 0
422 except Exception as exc:
423 fut.set_exception(exc)
424 return
425
426 if n == len(data):
427 fut.set_result(None)
428 else:
429 if n:
430 data = data[n:]
431 self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
432
Yury Selivanovd6c67712016-09-15 17:56:36 -0400433 @coroutine
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700434 def sock_connect(self, sock, address):
Victor Stinnerd1432092014-06-19 17:11:49 +0200435 """Connect to a remote socket at address.
436
Victor Stinnerd1432092014-06-19 17:11:49 +0200437 This method is a coroutine.
438 """
Victor Stinneraa41b9b2015-02-04 14:50:59 +0100439 if self._debug and sock.gettimeout() != 0:
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200440 raise ValueError("the socket must be non-blocking")
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400441
Yury Selivanovd6c67712016-09-15 17:56:36 -0400442 if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX:
Yury Selivanov63bf4872016-06-28 11:00:22 -0400443 resolved = base_events._ensure_resolved(
444 address, family=sock.family, proto=sock.proto, loop=self)
Yury Selivanovd6c67712016-09-15 17:56:36 -0400445 if not resolved.done():
446 yield from resolved
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400447 _, _, _, _, address = resolved.result()[0]
Yury Selivanovd6c67712016-09-15 17:56:36 -0400448
449 fut = self.create_future()
450 self._sock_connect(fut, sock, address)
451 return (yield from fut)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700452
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200453 def _sock_connect(self, fut, sock, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700454 fd = sock.fileno()
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200455 try:
Victor Stinnerc9d11c32015-04-07 21:38:04 +0200456 sock.connect(address)
457 except (BlockingIOError, InterruptedError):
458 # Issue #23618: When the C function connect() fails with EINTR, the
459 # connection runs in background. We have to wait until the socket
460 # becomes writable to be notified when the connection succeed or
461 # fails.
Yury Selivanovd6c67712016-09-15 17:56:36 -0400462 fut.add_done_callback(
463 functools.partial(self._sock_connect_done, fd))
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200464 self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
465 except Exception as exc:
466 fut.set_exception(exc)
467 else:
468 fut.set_result(None)
469
Victor Stinner3531d902015-01-09 01:42:52 +0100470 def _sock_connect_done(self, fd, fut):
471 self.remove_writer(fd)
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200472
473 def _sock_connect_cb(self, fut, sock, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700474 if fut.cancelled():
475 return
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200476
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700477 try:
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200478 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
479 if err != 0:
480 # Jump to any except clause below.
481 raise OSError(err, 'Connect call failed %s' % (address,))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700482 except (BlockingIOError, InterruptedError):
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200483 # socket is still registered, the callback will be retried later
484 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700485 except Exception as exc:
486 fut.set_exception(exc)
487 else:
488 fut.set_result(None)
489
490 def sock_accept(self, sock):
Victor Stinnerd1432092014-06-19 17:11:49 +0200491 """Accept a connection.
492
493 The socket must be bound to an address and listening for connections.
494 The return value is a pair (conn, address) where conn is a new socket
495 object usable to send and receive data on the connection, and address
496 is the address bound to the socket on the other end of the connection.
497
498 This method is a coroutine.
499 """
Victor Stinneraa41b9b2015-02-04 14:50:59 +0100500 if self._debug and sock.gettimeout() != 0:
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200501 raise ValueError("the socket must be non-blocking")
Yury Selivanov7661db62016-05-16 15:38:39 -0400502 fut = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700503 self._sock_accept(fut, False, sock)
504 return fut
505
506 def _sock_accept(self, fut, registered, sock):
507 fd = sock.fileno()
508 if registered:
509 self.remove_reader(fd)
510 if fut.cancelled():
511 return
512 try:
513 conn, address = sock.accept()
514 conn.setblocking(False)
515 except (BlockingIOError, InterruptedError):
516 self.add_reader(fd, self._sock_accept, fut, True, sock)
517 except Exception as exc:
518 fut.set_exception(exc)
519 else:
520 fut.set_result((conn, address))
521
522 def _process_events(self, event_list):
523 for key, mask in event_list:
524 fileobj, (reader, writer) = key.fileobj, key.data
525 if mask & selectors.EVENT_READ and reader is not None:
526 if reader._cancelled:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400527 self._remove_reader(fileobj)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700528 else:
529 self._add_callback(reader)
530 if mask & selectors.EVENT_WRITE and writer is not None:
531 if writer._cancelled:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400532 self._remove_writer(fileobj)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533 else:
534 self._add_callback(writer)
535
536 def _stop_serving(self, sock):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400537 self._remove_reader(sock.fileno())
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700538 sock.close()
539
540
Yury Selivanovc0982412014-02-18 18:41:13 -0500541class _SelectorTransport(transports._FlowControlMixin,
542 transports.Transport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543
544 max_size = 256 * 1024 # Buffer size passed to recv().
545
Guido van Rossuma5062c52013-11-27 14:12:48 -0800546 _buffer_factory = bytearray # Constructs initial value for self._buffer.
547
Victor Stinner978a9af2015-01-29 17:50:58 +0100548 # Attribute used in the destructor: it must be set even if the constructor
549 # is not called (see _SelectorSslTransport which may start by raising an
550 # exception)
551 _sock = None
552
Victor Stinner47bbea72015-01-29 02:56:05 +0100553 def __init__(self, loop, sock, protocol, extra=None, server=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100554 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700555 self._extra['socket'] = sock
556 self._extra['sockname'] = sock.getsockname()
557 if 'peername' not in self._extra:
558 try:
559 self._extra['peername'] = sock.getpeername()
560 except socket.error:
561 self._extra['peername'] = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700562 self._sock = sock
563 self._sock_fd = sock.fileno()
564 self._protocol = protocol
Victor Stinner47bbea72015-01-29 02:56:05 +0100565 self._protocol_connected = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566 self._server = server
Guido van Rossuma5062c52013-11-27 14:12:48 -0800567 self._buffer = self._buffer_factory()
Guido van Rossum2546a172013-10-18 10:10:36 -0700568 self._conn_lost = 0 # Set when call to connection_lost scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700569 self._closing = False # Set when close() called.
Guido van Rossum355491d2013-10-18 15:17:11 -0700570 if self._server is not None:
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200571 self._server._attach()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400572 loop._transports[self._sock_fd] = self
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700573
Victor Stinnere912e652014-07-12 03:11:53 +0200574 def __repr__(self):
Victor Stinner0e34dc32014-10-12 09:52:11 +0200575 info = [self.__class__.__name__]
576 if self._sock is None:
577 info.append('closed')
578 elif self._closing:
579 info.append('closing')
580 info.append('fd=%s' % self._sock_fd)
Victor Stinnerb2614752014-08-25 23:20:52 +0200581 # test if the transport was closed
Victor Stinner79fd9622015-03-27 15:20:08 +0100582 if self._loop is not None and not self._loop.is_closed():
Victor Stinnerb2614752014-08-25 23:20:52 +0200583 polling = _test_selector_event(self._loop._selector,
584 self._sock_fd, selectors.EVENT_READ)
585 if polling:
586 info.append('read=polling')
587 else:
588 info.append('read=idle')
Victor Stinnere912e652014-07-12 03:11:53 +0200589
Victor Stinnerb2614752014-08-25 23:20:52 +0200590 polling = _test_selector_event(self._loop._selector,
Victor Stinner15cc6782015-01-09 00:09:10 +0100591 self._sock_fd,
592 selectors.EVENT_WRITE)
Victor Stinnerb2614752014-08-25 23:20:52 +0200593 if polling:
594 state = 'polling'
595 else:
596 state = 'idle'
Victor Stinnere912e652014-07-12 03:11:53 +0200597
Victor Stinnerb2614752014-08-25 23:20:52 +0200598 bufsize = self.get_write_buffer_size()
599 info.append('write=<%s, bufsize=%s>' % (state, bufsize))
Victor Stinnere912e652014-07-12 03:11:53 +0200600 return '<%s>' % ' '.join(info)
601
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700602 def abort(self):
603 self._force_close(None)
604
Yury Selivanova05a6ef2016-09-11 21:11:02 -0400605 def set_protocol(self, protocol):
606 self._protocol = protocol
607
608 def get_protocol(self):
609 return self._protocol
610
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500611 def is_closing(self):
612 return self._closing
613
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700614 def close(self):
615 if self._closing:
616 return
617 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400618 self._loop._remove_reader(self._sock_fd)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700619 if not self._buffer:
Guido van Rossum2546a172013-10-18 10:10:36 -0700620 self._conn_lost += 1
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400621 self._loop._remove_writer(self._sock_fd)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700622 self._loop.call_soon(self._call_connection_lost, None)
623
Victor Stinner978a9af2015-01-29 17:50:58 +0100624 # On Python 3.3 and older, objects with a destructor part of a reference
625 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
626 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400627 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100628 def __del__(self):
629 if self._sock is not None:
Victor Stinnere19558a2016-03-23 00:28:08 +0100630 warnings.warn("unclosed transport %r" % self, ResourceWarning,
631 source=self)
Victor Stinner978a9af2015-01-29 17:50:58 +0100632 self._sock.close()
633
Victor Stinner065ca252014-02-19 01:40:41 +0100634 def _fatal_error(self, exc, message='Fatal error on transport'):
Guido van Rossum2546a172013-10-18 10:10:36 -0700635 # Should be called from exception handler only.
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200636 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnere912e652014-07-12 03:11:53 +0200637 if self._loop.get_debug():
638 logger.debug("%r: %s", self, message, exc_info=True)
639 else:
Yury Selivanovff827f02014-02-18 18:02:19 -0500640 self._loop.call_exception_handler({
Victor Stinner065ca252014-02-19 01:40:41 +0100641 'message': message,
Yury Selivanovff827f02014-02-18 18:02:19 -0500642 'exception': exc,
643 'transport': self,
644 'protocol': self._protocol,
645 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700646 self._force_close(exc)
647
648 def _force_close(self, exc):
Guido van Rossum2546a172013-10-18 10:10:36 -0700649 if self._conn_lost:
650 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700651 if self._buffer:
652 self._buffer.clear()
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400653 self._loop._remove_writer(self._sock_fd)
Guido van Rossum2546a172013-10-18 10:10:36 -0700654 if not self._closing:
655 self._closing = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400656 self._loop._remove_reader(self._sock_fd)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700657 self._conn_lost += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700658 self._loop.call_soon(self._call_connection_lost, exc)
659
660 def _call_connection_lost(self, exc):
661 try:
Victor Stinner47bbea72015-01-29 02:56:05 +0100662 if self._protocol_connected:
663 self._protocol.connection_lost(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700664 finally:
665 self._sock.close()
666 self._sock = None
667 self._protocol = None
668 self._loop = None
669 server = self._server
670 if server is not None:
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200671 server._detach()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700672 self._server = None
673
Guido van Rossum355491d2013-10-18 15:17:11 -0700674 def get_write_buffer_size(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800675 return len(self._buffer)
Guido van Rossum355491d2013-10-18 15:17:11 -0700676
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700677
678class _SelectorSocketTransport(_SelectorTransport):
679
680 def __init__(self, loop, sock, protocol, waiter=None,
681 extra=None, server=None):
682 super().__init__(loop, sock, protocol, extra, server)
683 self._eof = False
684 self._paused = False
685
Yury Selivanov44c19ec2016-09-11 21:39:31 -0400686 # Disable the Nagle algorithm -- small writes will be
687 # sent without waiting for the TCP ACK. This generally
688 # decreases the latency (in some cases significantly.)
689 _set_nodelay(self._sock)
690
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700691 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinnerfa737792015-01-29 00:36:51 +0100692 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400693 self._loop.call_soon(self._loop._add_reader,
Victor Stinnerfa737792015-01-29 00:36:51 +0100694 self._sock_fd, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700695 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100696 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500697 self._loop.call_soon(futures._set_result_unless_cancelled,
698 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700699
Guido van Rossum57497ad2013-10-18 07:58:20 -0700700 def pause_reading(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800701 if self._closing:
702 raise RuntimeError('Cannot pause_reading() when closing')
703 if self._paused:
704 raise RuntimeError('Already paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700705 self._paused = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400706 self._loop._remove_reader(self._sock_fd)
Victor Stinnere912e652014-07-12 03:11:53 +0200707 if self._loop.get_debug():
708 logger.debug("%r pauses reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700709
Guido van Rossum57497ad2013-10-18 07:58:20 -0700710 def resume_reading(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800711 if not self._paused:
712 raise RuntimeError('Not paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700713 self._paused = False
714 if self._closing:
715 return
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400716 self._loop._add_reader(self._sock_fd, self._read_ready)
Victor Stinnere912e652014-07-12 03:11:53 +0200717 if self._loop.get_debug():
718 logger.debug("%r resumes reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700719
720 def _read_ready(self):
Yury Selivanovca2e0a42016-06-11 11:19:47 -0400721 if self._conn_lost:
722 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700723 try:
724 data = self._sock.recv(self.max_size)
725 except (BlockingIOError, InterruptedError):
726 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700727 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100728 self._fatal_error(exc, 'Fatal read error on socket transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700729 else:
730 if data:
731 self._protocol.data_received(data)
732 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200733 if self._loop.get_debug():
734 logger.debug("%r received EOF", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700735 keep_open = self._protocol.eof_received()
Guido van Rossum1f683bb2013-10-30 14:36:58 -0700736 if keep_open:
737 # We're keeping the connection open so the
738 # protocol can write more, but we still can't
739 # receive more, so remove the reader callback.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400740 self._loop._remove_reader(self._sock_fd)
Guido van Rossum1f683bb2013-10-30 14:36:58 -0700741 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700742 self.close()
743
744 def write(self, data):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800745 if not isinstance(data, (bytes, bytearray, memoryview)):
Victor Stinner3e723092016-02-01 12:46:38 +0100746 raise TypeError('data argument must be a bytes-like object, '
747 'not %r' % type(data).__name__)
Guido van Rossuma5062c52013-11-27 14:12:48 -0800748 if self._eof:
749 raise RuntimeError('Cannot call write() after write_eof()')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700750 if not data:
751 return
752
753 if self._conn_lost:
754 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700755 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700756 self._conn_lost += 1
757 return
758
759 if not self._buffer:
Guido van Rossum355491d2013-10-18 15:17:11 -0700760 # Optimization: try to send now.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700761 try:
762 n = self._sock.send(data)
763 except (BlockingIOError, InterruptedError):
Guido van Rossum2546a172013-10-18 10:10:36 -0700764 pass
765 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100766 self._fatal_error(exc, 'Fatal write error on socket transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700767 return
768 else:
769 data = data[n:]
770 if not data:
771 return
Guido van Rossum355491d2013-10-18 15:17:11 -0700772 # Not all was written; register write handler.
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400773 self._loop._add_writer(self._sock_fd, self._write_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700774
Guido van Rossum355491d2013-10-18 15:17:11 -0700775 # Add it to the buffer.
Guido van Rossuma5062c52013-11-27 14:12:48 -0800776 self._buffer.extend(data)
Guido van Rossum355491d2013-10-18 15:17:11 -0700777 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700778
779 def _write_ready(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800780 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700781
Yury Selivanovca2e0a42016-06-11 11:19:47 -0400782 if self._conn_lost:
783 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700784 try:
Guido van Rossuma5062c52013-11-27 14:12:48 -0800785 n = self._sock.send(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700786 except (BlockingIOError, InterruptedError):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800787 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700788 except Exception as exc:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400789 self._loop._remove_writer(self._sock_fd)
Guido van Rossuma5062c52013-11-27 14:12:48 -0800790 self._buffer.clear()
Victor Stinner065ca252014-02-19 01:40:41 +0100791 self._fatal_error(exc, 'Fatal write error on socket transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700792 else:
Guido van Rossuma5062c52013-11-27 14:12:48 -0800793 if n:
794 del self._buffer[:n]
Guido van Rossum355491d2013-10-18 15:17:11 -0700795 self._maybe_resume_protocol() # May append to buffer.
796 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400797 self._loop._remove_writer(self._sock_fd)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700798 if self._closing:
799 self._call_connection_lost(None)
800 elif self._eof:
801 self._sock.shutdown(socket.SHUT_WR)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700802
803 def write_eof(self):
804 if self._eof:
805 return
806 self._eof = True
807 if not self._buffer:
808 self._sock.shutdown(socket.SHUT_WR)
809
810 def can_write_eof(self):
811 return True
812
813
814class _SelectorSslTransport(_SelectorTransport):
815
Guido van Rossuma5062c52013-11-27 14:12:48 -0800816 _buffer_factory = bytearray
817
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700818 def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,
819 server_side=False, server_hostname=None,
820 extra=None, server=None):
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700821 if ssl is None:
822 raise RuntimeError('stdlib ssl module not available')
823
Victor Stinner231b4042015-01-14 00:19:09 +0100824 if not sslcontext:
825 sslcontext = sslproto._create_transport_context(server_side, server_hostname)
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700826
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700827 wrap_kwargs = {
828 'server_side': server_side,
829 'do_handshake_on_connect': False,
830 }
Benjamin Peterson7243b572014-11-23 17:04:34 -0600831 if server_hostname and not server_side:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700832 wrap_kwargs['server_hostname'] = server_hostname
833 sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)
834
835 super().__init__(loop, sslsock, protocol, extra, server)
Victor Stinner47bbea72015-01-29 02:56:05 +0100836 # the protocol connection is only made after the SSL handshake
837 self._protocol_connected = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700838
839 self._server_hostname = server_hostname
840 self._waiter = waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700841 self._sslcontext = sslcontext
842 self._paused = False
843
844 # SSL-specific extra info. (peercert is set later)
845 self._extra.update(sslcontext=sslcontext)
846
Victor Stinnere912e652014-07-12 03:11:53 +0200847 if self._loop.get_debug():
848 logger.debug("%r starts SSL handshake", self)
849 start_time = self._loop.time()
850 else:
851 start_time = None
852 self._on_handshake(start_time)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700853
Victor Stinnerf07801b2015-01-29 00:36:35 +0100854 def _wakeup_waiter(self, exc=None):
855 if self._waiter is None:
856 return
857 if not self._waiter.cancelled():
858 if exc is not None:
859 self._waiter.set_exception(exc)
860 else:
861 self._waiter.set_result(None)
862 self._waiter = None
863
Victor Stinnere912e652014-07-12 03:11:53 +0200864 def _on_handshake(self, start_time):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700865 try:
866 self._sock.do_handshake()
867 except ssl.SSLWantReadError:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400868 self._loop._add_reader(self._sock_fd,
869 self._on_handshake, start_time)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700870 return
871 except ssl.SSLWantWriteError:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400872 self._loop._add_writer(self._sock_fd,
873 self._on_handshake, start_time)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700874 return
875 except BaseException as exc:
Victor Stinnere912e652014-07-12 03:11:53 +0200876 if self._loop.get_debug():
877 logger.warning("%r: SSL handshake failed",
878 self, exc_info=True)
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400879 self._loop._remove_reader(self._sock_fd)
880 self._loop._remove_writer(self._sock_fd)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700881 self._sock.close()
Victor Stinnerf07801b2015-01-29 00:36:35 +0100882 self._wakeup_waiter(exc)
Victor Stinnere912e652014-07-12 03:11:53 +0200883 if isinstance(exc, Exception):
884 return
885 else:
886 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700887
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400888 self._loop._remove_reader(self._sock_fd)
889 self._loop._remove_writer(self._sock_fd)
Guido van Rossum355491d2013-10-18 15:17:11 -0700890
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700891 peercert = self._sock.getpeercert()
Christian Heimes6d8c1ab2013-12-06 00:23:13 +0100892 if not hasattr(self._sslcontext, 'check_hostname'):
893 # Verify hostname if requested, Python 3.4+ uses check_hostname
894 # and checks the hostname in do_handshake()
895 if (self._server_hostname and
896 self._sslcontext.verify_mode != ssl.CERT_NONE):
897 try:
898 ssl.match_hostname(peercert, self._server_hostname)
899 except Exception as exc:
Victor Stinnere912e652014-07-12 03:11:53 +0200900 if self._loop.get_debug():
901 logger.warning("%r: SSL handshake failed "
902 "on matching the hostname",
903 self, exc_info=True)
Christian Heimes6d8c1ab2013-12-06 00:23:13 +0100904 self._sock.close()
Victor Stinnerf07801b2015-01-29 00:36:35 +0100905 self._wakeup_waiter(exc)
Christian Heimes6d8c1ab2013-12-06 00:23:13 +0100906 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700907
908 # Add extra info that becomes available after handshake.
909 self._extra.update(peercert=peercert,
910 cipher=self._sock.cipher(),
911 compression=self._sock.compression(),
Victor Stinnerf7dc7fb2015-09-21 18:06:17 +0200912 ssl_object=self._sock,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700913 )
914
Guido van Rossum2b570162013-11-01 14:18:02 -0700915 self._read_wants_write = False
916 self._write_wants_read = False
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400917 self._loop._add_reader(self._sock_fd, self._read_ready)
Victor Stinner47bbea72015-01-29 02:56:05 +0100918 self._protocol_connected = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700919 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinnerf07801b2015-01-29 00:36:35 +0100920 # only wake up the waiter when connection_made() has been called
921 self._loop.call_soon(self._wakeup_waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700922
Victor Stinnere912e652014-07-12 03:11:53 +0200923 if self._loop.get_debug():
924 dt = self._loop.time() - start_time
925 logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
926
Guido van Rossum57497ad2013-10-18 07:58:20 -0700927 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700928 # XXX This is a bit icky, given the comment at the top of
Guido van Rossum2b570162013-11-01 14:18:02 -0700929 # _read_ready(). Is it possible to evoke a deadlock? I don't
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700930 # know, although it doesn't look like it; write() will still
931 # accept more data for the buffer and eventually the app will
Guido van Rossum57497ad2013-10-18 07:58:20 -0700932 # call resume_reading() again, and things will flow again.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700933
Guido van Rossuma5062c52013-11-27 14:12:48 -0800934 if self._closing:
935 raise RuntimeError('Cannot pause_reading() when closing')
936 if self._paused:
937 raise RuntimeError('Already paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700938 self._paused = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400939 self._loop._remove_reader(self._sock_fd)
Victor Stinnere912e652014-07-12 03:11:53 +0200940 if self._loop.get_debug():
941 logger.debug("%r pauses reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700942
Guido van Rossum57497ad2013-10-18 07:58:20 -0700943 def resume_reading(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800944 if not self._paused:
Andrew Svetlov3207a032014-05-27 21:24:43 +0300945 raise RuntimeError('Not paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700946 self._paused = False
947 if self._closing:
948 return
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400949 self._loop._add_reader(self._sock_fd, self._read_ready)
Victor Stinnere912e652014-07-12 03:11:53 +0200950 if self._loop.get_debug():
951 logger.debug("%r resumes reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700952
Guido van Rossum2b570162013-11-01 14:18:02 -0700953 def _read_ready(self):
Yury Selivanovca2e0a42016-06-11 11:19:47 -0400954 if self._conn_lost:
955 return
Guido van Rossum2b570162013-11-01 14:18:02 -0700956 if self._write_wants_read:
957 self._write_wants_read = False
958 self._write_ready()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700959
Guido van Rossum2b570162013-11-01 14:18:02 -0700960 if self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400961 self._loop._add_writer(self._sock_fd, self._write_ready)
Guido van Rossum2b570162013-11-01 14:18:02 -0700962
963 try:
964 data = self._sock.recv(self.max_size)
965 except (BlockingIOError, InterruptedError, ssl.SSLWantReadError):
966 pass
967 except ssl.SSLWantWriteError:
968 self._read_wants_write = True
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400969 self._loop._remove_reader(self._sock_fd)
970 self._loop._add_writer(self._sock_fd, self._write_ready)
Guido van Rossum2b570162013-11-01 14:18:02 -0700971 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100972 self._fatal_error(exc, 'Fatal read error on SSL transport')
Guido van Rossum2b570162013-11-01 14:18:02 -0700973 else:
974 if data:
975 self._protocol.data_received(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700976 else:
Guido van Rossum2b570162013-11-01 14:18:02 -0700977 try:
Victor Stinnere912e652014-07-12 03:11:53 +0200978 if self._loop.get_debug():
979 logger.debug("%r received EOF", self)
Guido van Rossum3a703922013-11-01 14:19:35 -0700980 keep_open = self._protocol.eof_received()
981 if keep_open:
982 logger.warning('returning true from eof_received() '
983 'has no effect when using ssl')
Guido van Rossum2b570162013-11-01 14:18:02 -0700984 finally:
985 self.close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700986
Guido van Rossum2b570162013-11-01 14:18:02 -0700987 def _write_ready(self):
Yury Selivanovca2e0a42016-06-11 11:19:47 -0400988 if self._conn_lost:
989 return
Guido van Rossum2b570162013-11-01 14:18:02 -0700990 if self._read_wants_write:
991 self._read_wants_write = False
992 self._read_ready()
993
994 if not (self._paused or self._closing):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -0400995 self._loop._add_reader(self._sock_fd, self._read_ready)
Guido van Rossum2b570162013-11-01 14:18:02 -0700996
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700997 if self._buffer:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700998 try:
Guido van Rossuma5062c52013-11-27 14:12:48 -0800999 n = self._sock.send(self._buffer)
Victor Stinnerc89c8a72014-02-26 17:35:30 +01001000 except (BlockingIOError, InterruptedError, ssl.SSLWantWriteError):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001001 n = 0
Guido van Rossum2b570162013-11-01 14:18:02 -07001002 except ssl.SSLWantReadError:
1003 n = 0
Yury Selivanov5b8d4f92016-10-05 17:48:59 -04001004 self._loop._remove_writer(self._sock_fd)
Guido van Rossum2b570162013-11-01 14:18:02 -07001005 self._write_wants_read = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001006 except Exception as exc:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -04001007 self._loop._remove_writer(self._sock_fd)
Guido van Rossuma5062c52013-11-27 14:12:48 -08001008 self._buffer.clear()
Victor Stinner065ca252014-02-19 01:40:41 +01001009 self._fatal_error(exc, 'Fatal write error on SSL transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001010 return
1011
Guido van Rossuma5062c52013-11-27 14:12:48 -08001012 if n:
1013 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001014
Guido van Rossum2b570162013-11-01 14:18:02 -07001015 self._maybe_resume_protocol() # May append to buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -07001016
Guido van Rossum2b570162013-11-01 14:18:02 -07001017 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -04001018 self._loop._remove_writer(self._sock_fd)
Guido van Rossum2b570162013-11-01 14:18:02 -07001019 if self._closing:
1020 self._call_connection_lost(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001021
1022 def write(self, data):
Guido van Rossuma5062c52013-11-27 14:12:48 -08001023 if not isinstance(data, (bytes, bytearray, memoryview)):
Victor Stinner3e723092016-02-01 12:46:38 +01001024 raise TypeError('data argument must be a bytes-like object, '
1025 'not %r' % type(data).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001026 if not data:
1027 return
1028
1029 if self._conn_lost:
1030 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -07001031 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001032 self._conn_lost += 1
1033 return
1034
Guido van Rossum2b570162013-11-01 14:18:02 -07001035 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -04001036 self._loop._add_writer(self._sock_fd, self._write_ready)
Guido van Rossum2b570162013-11-01 14:18:02 -07001037
1038 # Add it to the buffer.
Guido van Rossuma5062c52013-11-27 14:12:48 -08001039 self._buffer.extend(data)
Guido van Rossum355491d2013-10-18 15:17:11 -07001040 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001041
1042 def can_write_eof(self):
1043 return False
1044
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001045
1046class _SelectorDatagramTransport(_SelectorTransport):
1047
Guido van Rossuma5062c52013-11-27 14:12:48 -08001048 _buffer_factory = collections.deque
1049
Victor Stinnerbfff45d2014-07-08 23:57:31 +02001050 def __init__(self, loop, sock, protocol, address=None,
1051 waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001052 super().__init__(loop, sock, protocol, extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001053 self._address = address
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001054 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner47bbea72015-01-29 02:56:05 +01001055 # only start reading when connection_made() has been called
Yury Selivanov5b8d4f92016-10-05 17:48:59 -04001056 self._loop.call_soon(self._loop._add_reader,
Victor Stinner47bbea72015-01-29 02:56:05 +01001057 self._sock_fd, self._read_ready)
Victor Stinnerbfff45d2014-07-08 23:57:31 +02001058 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +01001059 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -05001060 self._loop.call_soon(futures._set_result_unless_cancelled,
1061 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001062
Guido van Rossum355491d2013-10-18 15:17:11 -07001063 def get_write_buffer_size(self):
1064 return sum(len(data) for data, _ in self._buffer)
1065
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001066 def _read_ready(self):
Yury Selivanovca2e0a42016-06-11 11:19:47 -04001067 if self._conn_lost:
1068 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001069 try:
1070 data, addr = self._sock.recvfrom(self.max_size)
1071 except (BlockingIOError, InterruptedError):
1072 pass
Guido van Rossum2335de72013-11-15 16:51:48 -08001073 except OSError as exc:
1074 self._protocol.error_received(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001075 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +01001076 self._fatal_error(exc, 'Fatal read error on datagram transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001077 else:
1078 self._protocol.datagram_received(data, addr)
1079
1080 def sendto(self, data, addr=None):
Guido van Rossuma5062c52013-11-27 14:12:48 -08001081 if not isinstance(data, (bytes, bytearray, memoryview)):
Victor Stinner3e723092016-02-01 12:46:38 +01001082 raise TypeError('data argument must be a bytes-like object, '
1083 'not %r' % type(data).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001084 if not data:
1085 return
1086
Guido van Rossuma5062c52013-11-27 14:12:48 -08001087 if self._address and addr not in (None, self._address):
1088 raise ValueError('Invalid address: must be None or %s' %
1089 (self._address,))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001090
1091 if self._conn_lost and self._address:
1092 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -07001093 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001094 self._conn_lost += 1
1095 return
1096
1097 if not self._buffer:
1098 # Attempt to send it right away first.
1099 try:
1100 if self._address:
1101 self._sock.send(data)
1102 else:
1103 self._sock.sendto(data, addr)
1104 return
Guido van Rossum2546a172013-10-18 10:10:36 -07001105 except (BlockingIOError, InterruptedError):
Yury Selivanov5b8d4f92016-10-05 17:48:59 -04001106 self._loop._add_writer(self._sock_fd, self._sendto_ready)
Guido van Rossum2335de72013-11-15 16:51:48 -08001107 except OSError as exc:
1108 self._protocol.error_received(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001109 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001110 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +01001111 self._fatal_error(exc,
1112 'Fatal write error on datagram transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001113 return
1114
Guido van Rossuma5062c52013-11-27 14:12:48 -08001115 # Ensure that what we buffer is immutable.
1116 self._buffer.append((bytes(data), addr))
Guido van Rossum355491d2013-10-18 15:17:11 -07001117 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001118
1119 def _sendto_ready(self):
1120 while self._buffer:
1121 data, addr = self._buffer.popleft()
1122 try:
1123 if self._address:
1124 self._sock.send(data)
1125 else:
1126 self._sock.sendto(data, addr)
Guido van Rossum2546a172013-10-18 10:10:36 -07001127 except (BlockingIOError, InterruptedError):
1128 self._buffer.appendleft((data, addr)) # Try again later.
1129 break
Guido van Rossum2335de72013-11-15 16:51:48 -08001130 except OSError as exc:
1131 self._protocol.error_received(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001132 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001133 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +01001134 self._fatal_error(exc,
1135 'Fatal write error on datagram transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001136 return
1137
Guido van Rossum355491d2013-10-18 15:17:11 -07001138 self._maybe_resume_protocol() # May append to buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001139 if not self._buffer:
Yury Selivanov5b8d4f92016-10-05 17:48:59 -04001140 self._loop._remove_writer(self._sock_fd)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001141 if self._closing:
1142 self._call_connection_lost(None)