blob: ed2b4d756feddec5a4b8846ff8f0e9d8a23ccc36 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop using a selector and related classes.
2
3A selector is a "notify-when-ready" multiplexer. For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
6
Victor Stinner8dffc452014-01-25 15:32:06 +01007__all__ = ['BaseSelectorEventLoop']
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import collections
Guido van Rossum3317a132013-11-01 14:12:50 -070010import errno
Victor Stinnerd5aeccf92014-08-31 15:07:57 +020011import functools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import socket
Victor Stinner978a9af2015-01-29 17:50:58 +010013import warnings
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014try:
15 import ssl
16except ImportError: # pragma: no cover
17 ssl = None
18
19from . import base_events
Yury Selivanov2a8911c2015-08-04 15:56:33 -040020from . import compat
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070021from . import constants
22from . import events
23from . import futures
24from . import selectors
25from . import transports
Victor Stinner231b4042015-01-14 00:19:09 +010026from . import sslproto
Victor Stinner29342622015-01-29 14:15:19 +010027from .coroutines import coroutine
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070028from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070029
30
Victor Stinnere912e652014-07-12 03:11:53 +020031def _test_selector_event(selector, fd, event):
32 # Test if the selector is monitoring 'event' events
33 # for the file descriptor 'fd'.
34 try:
35 key = selector.get_key(fd)
36 except KeyError:
37 return False
38 else:
39 return bool(key.events & event)
40
41
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042class BaseSelectorEventLoop(base_events.BaseEventLoop):
43 """Selector event loop.
44
45 See events.EventLoop for API specification.
46 """
47
48 def __init__(self, selector=None):
49 super().__init__()
50
51 if selector is None:
52 selector = selectors.DefaultSelector()
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070053 logger.debug('Using selector: %s', selector.__class__.__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070054 self._selector = selector
55 self._make_self_pipe()
56
57 def _make_socket_transport(self, sock, protocol, waiter=None, *,
58 extra=None, server=None):
59 return _SelectorSocketTransport(self, sock, protocol, waiter,
60 extra, server)
61
Victor Stinner15cc6782015-01-09 00:09:10 +010062 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
63 *, server_side=False, server_hostname=None,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070064 extra=None, server=None):
Victor Stinner231b4042015-01-14 00:19:09 +010065 if not sslproto._is_sslproto_available():
66 return self._make_legacy_ssl_transport(
67 rawsock, protocol, sslcontext, waiter,
68 server_side=server_side, server_hostname=server_hostname,
69 extra=extra, server=server)
70
71 ssl_protocol = sslproto.SSLProtocol(self, protocol, sslcontext, waiter,
72 server_side, server_hostname)
73 _SelectorSocketTransport(self, rawsock, ssl_protocol,
74 extra=extra, server=server)
75 return ssl_protocol._app_transport
76
77 def _make_legacy_ssl_transport(self, rawsock, protocol, sslcontext,
78 waiter, *,
79 server_side=False, server_hostname=None,
80 extra=None, server=None):
81 # Use the legacy API: SSL_write, SSL_read, etc. The legacy API is used
82 # on Python 3.4 and older, when ssl.MemoryBIO is not available.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083 return _SelectorSslTransport(
84 self, rawsock, protocol, sslcontext, waiter,
85 server_side, server_hostname, extra, server)
86
87 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +020088 address=None, waiter=None, extra=None):
89 return _SelectorDatagramTransport(self, sock, protocol,
90 address, waiter, extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070091
92 def close(self):
Victor Stinner956de692014-12-26 21:07:52 +010093 if self.is_running():
Victor Stinner5e631202014-11-21 00:23:27 +010094 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +020095 if self.is_closed():
96 return
97 self._close_self_pipe()
Victor Stinner5e631202014-11-21 00:23:27 +010098 super().close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070099 if self._selector is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700100 self._selector.close()
101 self._selector = None
102
103 def _socketpair(self):
104 raise NotImplementedError
105
106 def _close_self_pipe(self):
107 self.remove_reader(self._ssock.fileno())
108 self._ssock.close()
109 self._ssock = None
110 self._csock.close()
111 self._csock = None
112 self._internal_fds -= 1
113
114 def _make_self_pipe(self):
115 # A self-socket, really. :-)
116 self._ssock, self._csock = self._socketpair()
117 self._ssock.setblocking(False)
118 self._csock.setblocking(False)
119 self._internal_fds += 1
120 self.add_reader(self._ssock.fileno(), self._read_from_self)
121
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200122 def _process_self_data(self, data):
123 pass
124
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700125 def _read_from_self(self):
Victor Stinner54c4b8e2014-06-19 12:59:15 +0200126 while True:
127 try:
128 data = self._ssock.recv(4096)
129 if not data:
130 break
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200131 self._process_self_data(data)
Victor Stinner54c4b8e2014-06-19 12:59:15 +0200132 except InterruptedError:
133 continue
134 except BlockingIOError:
135 break
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700136
137 def _write_to_self(self):
Guido van Rossum3d139d82014-05-06 14:42:40 -0700138 # This may be called from a different thread, possibly after
139 # _close_self_pipe() has been called or even while it is
140 # running. Guard for self._csock being None or closed. When
141 # a socket is closed, send() raises OSError (with errno set to
142 # EBADF, but let's not rely on the exact error code).
143 csock = self._csock
144 if csock is not None:
145 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200146 csock.send(b'\0')
Guido van Rossum3d139d82014-05-06 14:42:40 -0700147 except OSError:
Victor Stinner65dd69a2014-07-25 22:36:05 +0200148 if self._debug:
149 logger.debug("Fail to write a null byte into the "
150 "self-pipe socket",
151 exc_info=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700152
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700153 def _start_serving(self, protocol_factory, sock,
154 sslcontext=None, server=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700155 self.add_reader(sock.fileno(), self._accept_connection,
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700156 protocol_factory, sock, sslcontext, server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700157
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700158 def _accept_connection(self, protocol_factory, sock,
159 sslcontext=None, server=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700160 try:
161 conn, addr = sock.accept()
Victor Stinnere912e652014-07-12 03:11:53 +0200162 if self._debug:
163 logger.debug("%r got a new connection from %r: %r",
164 server, addr, conn)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700165 conn.setblocking(False)
Guido van Rossum3317a132013-11-01 14:12:50 -0700166 except (BlockingIOError, InterruptedError, ConnectionAbortedError):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 pass # False alarm.
Guido van Rossum3317a132013-11-01 14:12:50 -0700168 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700169 # There's nowhere to send the error, so just log it.
Guido van Rossum3317a132013-11-01 14:12:50 -0700170 if exc.errno in (errno.EMFILE, errno.ENFILE,
171 errno.ENOBUFS, errno.ENOMEM):
172 # Some platforms (e.g. Linux keep reporting the FD as
173 # ready, so we remove the read handler temporarily.
174 # We'll try again in a while.
Yury Selivanovff827f02014-02-18 18:02:19 -0500175 self.call_exception_handler({
176 'message': 'socket.accept() out of system resource',
177 'exception': exc,
178 'socket': sock,
179 })
Guido van Rossum3317a132013-11-01 14:12:50 -0700180 self.remove_reader(sock.fileno())
181 self.call_later(constants.ACCEPT_RETRY_DELAY,
182 self._start_serving,
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700183 protocol_factory, sock, sslcontext, server)
Guido van Rossum3317a132013-11-01 14:12:50 -0700184 else:
185 raise # The event loop will catch, log and ignore it.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700186 else:
Victor Stinner29342622015-01-29 14:15:19 +0100187 extra = {'peername': addr}
188 accept = self._accept_connection2(protocol_factory, conn, extra,
189 sslcontext, server)
190 self.create_task(accept)
191
192 @coroutine
193 def _accept_connection2(self, protocol_factory, conn, extra,
194 sslcontext=None, server=None):
195 protocol = None
196 transport = None
197 try:
Victor Stinner29ad0112015-01-15 00:04:21 +0100198 protocol = protocol_factory()
Yury Selivanov7661db62016-05-16 15:38:39 -0400199 waiter = self.create_future()
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700200 if sslcontext:
Victor Stinner29342622015-01-29 14:15:19 +0100201 transport = self._make_ssl_transport(
202 conn, protocol, sslcontext, waiter=waiter,
203 server_side=True, extra=extra, server=server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700204 else:
Victor Stinner29342622015-01-29 14:15:19 +0100205 transport = self._make_socket_transport(
206 conn, protocol, waiter=waiter, extra=extra,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700207 server=server)
Victor Stinner29342622015-01-29 14:15:19 +0100208
209 try:
210 yield from waiter
211 except:
212 transport.close()
213 raise
214
215 # It's now up to the protocol to handle the connection.
216 except Exception as exc:
Victor Stinneraa41b9b2015-02-04 14:50:59 +0100217 if self._debug:
Victor Stinner29342622015-01-29 14:15:19 +0100218 context = {
219 'message': ('Error on transport creation '
220 'for incoming connection'),
221 'exception': exc,
222 }
223 if protocol is not None:
224 context['protocol'] = protocol
225 if transport is not None:
226 context['transport'] = transport
227 self.call_exception_handler(context)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700228
229 def add_reader(self, fd, callback, *args):
230 """Add a reader callback."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200231 self._check_closed()
Yury Selivanovff827f02014-02-18 18:02:19 -0500232 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700233 try:
234 key = self._selector.get_key(fd)
235 except KeyError:
236 self._selector.register(fd, selectors.EVENT_READ,
237 (handle, None))
238 else:
239 mask, (reader, writer) = key.events, key.data
240 self._selector.modify(fd, mask | selectors.EVENT_READ,
241 (handle, writer))
242 if reader is not None:
243 reader.cancel()
244
245 def remove_reader(self, fd):
246 """Remove a reader callback."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200247 if self.is_closed():
Victor Stinnereeeebcd2014-03-06 00:52:53 +0100248 return False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700249 try:
250 key = self._selector.get_key(fd)
251 except KeyError:
252 return False
253 else:
254 mask, (reader, writer) = key.events, key.data
255 mask &= ~selectors.EVENT_READ
256 if not mask:
257 self._selector.unregister(fd)
258 else:
259 self._selector.modify(fd, mask, (None, writer))
260
261 if reader is not None:
262 reader.cancel()
263 return True
264 else:
265 return False
266
267 def add_writer(self, fd, callback, *args):
268 """Add a writer callback.."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200269 self._check_closed()
Yury Selivanovff827f02014-02-18 18:02:19 -0500270 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271 try:
272 key = self._selector.get_key(fd)
273 except KeyError:
274 self._selector.register(fd, selectors.EVENT_WRITE,
275 (None, handle))
276 else:
277 mask, (reader, writer) = key.events, key.data
278 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
279 (reader, handle))
280 if writer is not None:
281 writer.cancel()
282
283 def remove_writer(self, fd):
284 """Remove a writer callback."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200285 if self.is_closed():
Victor Stinnereeeebcd2014-03-06 00:52:53 +0100286 return False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700287 try:
288 key = self._selector.get_key(fd)
289 except KeyError:
290 return False
291 else:
292 mask, (reader, writer) = key.events, key.data
293 # Remove both writer and connector.
294 mask &= ~selectors.EVENT_WRITE
295 if not mask:
296 self._selector.unregister(fd)
297 else:
298 self._selector.modify(fd, mask, (reader, None))
299
300 if writer is not None:
301 writer.cancel()
302 return True
303 else:
304 return False
305
306 def sock_recv(self, sock, n):
Victor Stinnerd1432092014-06-19 17:11:49 +0200307 """Receive data from the socket.
308
309 The return value is a bytes object representing the data received.
310 The maximum amount of data to be received at once is specified by
311 nbytes.
312
313 This method is a coroutine.
314 """
Victor Stinneraa41b9b2015-02-04 14:50:59 +0100315 if self._debug and sock.gettimeout() != 0:
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200316 raise ValueError("the socket must be non-blocking")
Yury Selivanov7661db62016-05-16 15:38:39 -0400317 fut = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700318 self._sock_recv(fut, False, sock, n)
319 return fut
320
321 def _sock_recv(self, fut, registered, sock, n):
Victor Stinner28773462014-02-13 09:24:37 +0100322 # _sock_recv() can add itself as an I/O callback if the operation can't
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500323 # be done immediately. Don't use it directly, call sock_recv().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700324 fd = sock.fileno()
325 if registered:
326 # Remove the callback early. It should be rare that the
327 # selector says the fd is ready but the call still returns
328 # EAGAIN, and I am willing to take a hit in that case in
329 # order to simplify the common case.
330 self.remove_reader(fd)
331 if fut.cancelled():
332 return
333 try:
334 data = sock.recv(n)
335 except (BlockingIOError, InterruptedError):
336 self.add_reader(fd, self._sock_recv, fut, True, sock, n)
337 except Exception as exc:
338 fut.set_exception(exc)
339 else:
340 fut.set_result(data)
341
342 def sock_sendall(self, sock, data):
Victor Stinnerd1432092014-06-19 17:11:49 +0200343 """Send data to the socket.
344
345 The socket must be connected to a remote socket. This method continues
346 to send data from data until either all data has been sent or an
347 error occurs. None is returned on success. On error, an exception is
348 raised, and there is no way to determine how much data, if any, was
349 successfully processed by the receiving end of the connection.
350
351 This method is a coroutine.
352 """
Victor Stinneraa41b9b2015-02-04 14:50:59 +0100353 if self._debug and sock.gettimeout() != 0:
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200354 raise ValueError("the socket must be non-blocking")
Yury Selivanov7661db62016-05-16 15:38:39 -0400355 fut = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700356 if data:
357 self._sock_sendall(fut, False, sock, data)
358 else:
359 fut.set_result(None)
360 return fut
361
362 def _sock_sendall(self, fut, registered, sock, data):
363 fd = sock.fileno()
364
365 if registered:
366 self.remove_writer(fd)
367 if fut.cancelled():
368 return
369
370 try:
371 n = sock.send(data)
372 except (BlockingIOError, InterruptedError):
373 n = 0
374 except Exception as exc:
375 fut.set_exception(exc)
376 return
377
378 if n == len(data):
379 fut.set_result(None)
380 else:
381 if n:
382 data = data[n:]
383 self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
384
385 def sock_connect(self, sock, address):
Victor Stinnerd1432092014-06-19 17:11:49 +0200386 """Connect to a remote socket at address.
387
Victor Stinnerd1432092014-06-19 17:11:49 +0200388 This method is a coroutine.
389 """
Victor Stinneraa41b9b2015-02-04 14:50:59 +0100390 if self._debug and sock.gettimeout() != 0:
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200391 raise ValueError("the socket must be non-blocking")
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400392
Yury Selivanov7661db62016-05-16 15:38:39 -0400393 fut = self.create_future()
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400394 if hasattr(socket, 'AF_UNIX') and sock.family == socket.AF_UNIX:
395 self._sock_connect(fut, sock, address)
396 else:
Yury Selivanov63bf4872016-06-28 11:00:22 -0400397 resolved = base_events._ensure_resolved(
398 address, family=sock.family, proto=sock.proto, loop=self)
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400399 resolved.add_done_callback(
400 lambda resolved: self._on_resolved(fut, sock, resolved))
401
402 return fut
403
404 def _on_resolved(self, fut, sock, resolved):
Victor Stinner28773462014-02-13 09:24:37 +0100405 try:
Yury Selivanovf1c6fa92016-06-08 12:33:31 -0400406 _, _, _, _, address = resolved.result()[0]
407 except Exception as exc:
408 fut.set_exception(exc)
Victor Stinner28773462014-02-13 09:24:37 +0100409 else:
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200410 self._sock_connect(fut, sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700411
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200412 def _sock_connect(self, fut, sock, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700413 fd = sock.fileno()
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200414 try:
Victor Stinnerc9d11c32015-04-07 21:38:04 +0200415 sock.connect(address)
416 except (BlockingIOError, InterruptedError):
417 # Issue #23618: When the C function connect() fails with EINTR, the
418 # connection runs in background. We have to wait until the socket
419 # becomes writable to be notified when the connection succeed or
420 # fails.
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200421 fut.add_done_callback(functools.partial(self._sock_connect_done,
Victor Stinner3531d902015-01-09 01:42:52 +0100422 fd))
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200423 self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
424 except Exception as exc:
425 fut.set_exception(exc)
426 else:
427 fut.set_result(None)
428
Victor Stinner3531d902015-01-09 01:42:52 +0100429 def _sock_connect_done(self, fd, fut):
430 self.remove_writer(fd)
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200431
432 def _sock_connect_cb(self, fut, sock, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700433 if fut.cancelled():
434 return
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200435
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700436 try:
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200437 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
438 if err != 0:
439 # Jump to any except clause below.
440 raise OSError(err, 'Connect call failed %s' % (address,))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700441 except (BlockingIOError, InterruptedError):
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200442 # socket is still registered, the callback will be retried later
443 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700444 except Exception as exc:
445 fut.set_exception(exc)
446 else:
447 fut.set_result(None)
448
449 def sock_accept(self, sock):
Victor Stinnerd1432092014-06-19 17:11:49 +0200450 """Accept a connection.
451
452 The socket must be bound to an address and listening for connections.
453 The return value is a pair (conn, address) where conn is a new socket
454 object usable to send and receive data on the connection, and address
455 is the address bound to the socket on the other end of the connection.
456
457 This method is a coroutine.
458 """
Victor Stinneraa41b9b2015-02-04 14:50:59 +0100459 if self._debug and sock.gettimeout() != 0:
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200460 raise ValueError("the socket must be non-blocking")
Yury Selivanov7661db62016-05-16 15:38:39 -0400461 fut = self.create_future()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700462 self._sock_accept(fut, False, sock)
463 return fut
464
465 def _sock_accept(self, fut, registered, sock):
466 fd = sock.fileno()
467 if registered:
468 self.remove_reader(fd)
469 if fut.cancelled():
470 return
471 try:
472 conn, address = sock.accept()
473 conn.setblocking(False)
474 except (BlockingIOError, InterruptedError):
475 self.add_reader(fd, self._sock_accept, fut, True, sock)
476 except Exception as exc:
477 fut.set_exception(exc)
478 else:
479 fut.set_result((conn, address))
480
481 def _process_events(self, event_list):
482 for key, mask in event_list:
483 fileobj, (reader, writer) = key.fileobj, key.data
484 if mask & selectors.EVENT_READ and reader is not None:
485 if reader._cancelled:
486 self.remove_reader(fileobj)
487 else:
488 self._add_callback(reader)
489 if mask & selectors.EVENT_WRITE and writer is not None:
490 if writer._cancelled:
491 self.remove_writer(fileobj)
492 else:
493 self._add_callback(writer)
494
495 def _stop_serving(self, sock):
496 self.remove_reader(sock.fileno())
497 sock.close()
498
499
Yury Selivanovc0982412014-02-18 18:41:13 -0500500class _SelectorTransport(transports._FlowControlMixin,
501 transports.Transport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700502
503 max_size = 256 * 1024 # Buffer size passed to recv().
504
Guido van Rossuma5062c52013-11-27 14:12:48 -0800505 _buffer_factory = bytearray # Constructs initial value for self._buffer.
506
Victor Stinner978a9af2015-01-29 17:50:58 +0100507 # Attribute used in the destructor: it must be set even if the constructor
508 # is not called (see _SelectorSslTransport which may start by raising an
509 # exception)
510 _sock = None
511
Victor Stinner47bbea72015-01-29 02:56:05 +0100512 def __init__(self, loop, sock, protocol, extra=None, server=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100513 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700514 self._extra['socket'] = sock
515 self._extra['sockname'] = sock.getsockname()
516 if 'peername' not in self._extra:
517 try:
518 self._extra['peername'] = sock.getpeername()
519 except socket.error:
520 self._extra['peername'] = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700521 self._sock = sock
522 self._sock_fd = sock.fileno()
523 self._protocol = protocol
Victor Stinner47bbea72015-01-29 02:56:05 +0100524 self._protocol_connected = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700525 self._server = server
Guido van Rossuma5062c52013-11-27 14:12:48 -0800526 self._buffer = self._buffer_factory()
Guido van Rossum2546a172013-10-18 10:10:36 -0700527 self._conn_lost = 0 # Set when call to connection_lost scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700528 self._closing = False # Set when close() called.
Guido van Rossum355491d2013-10-18 15:17:11 -0700529 if self._server is not None:
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200530 self._server._attach()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700531
Victor Stinnere912e652014-07-12 03:11:53 +0200532 def __repr__(self):
Victor Stinner0e34dc32014-10-12 09:52:11 +0200533 info = [self.__class__.__name__]
534 if self._sock is None:
535 info.append('closed')
536 elif self._closing:
537 info.append('closing')
538 info.append('fd=%s' % self._sock_fd)
Victor Stinnerb2614752014-08-25 23:20:52 +0200539 # test if the transport was closed
Victor Stinner79fd9622015-03-27 15:20:08 +0100540 if self._loop is not None and not self._loop.is_closed():
Victor Stinnerb2614752014-08-25 23:20:52 +0200541 polling = _test_selector_event(self._loop._selector,
542 self._sock_fd, selectors.EVENT_READ)
543 if polling:
544 info.append('read=polling')
545 else:
546 info.append('read=idle')
Victor Stinnere912e652014-07-12 03:11:53 +0200547
Victor Stinnerb2614752014-08-25 23:20:52 +0200548 polling = _test_selector_event(self._loop._selector,
Victor Stinner15cc6782015-01-09 00:09:10 +0100549 self._sock_fd,
550 selectors.EVENT_WRITE)
Victor Stinnerb2614752014-08-25 23:20:52 +0200551 if polling:
552 state = 'polling'
553 else:
554 state = 'idle'
Victor Stinnere912e652014-07-12 03:11:53 +0200555
Victor Stinnerb2614752014-08-25 23:20:52 +0200556 bufsize = self.get_write_buffer_size()
557 info.append('write=<%s, bufsize=%s>' % (state, bufsize))
Victor Stinnere912e652014-07-12 03:11:53 +0200558 return '<%s>' % ' '.join(info)
559
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700560 def abort(self):
561 self._force_close(None)
562
Yury Selivanov5bb1afb2015-11-16 12:43:21 -0500563 def is_closing(self):
564 return self._closing
565
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566 def close(self):
567 if self._closing:
568 return
569 self._closing = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700570 self._loop.remove_reader(self._sock_fd)
571 if not self._buffer:
Guido van Rossum2546a172013-10-18 10:10:36 -0700572 self._conn_lost += 1
Yury Selivanovca2e0a42016-06-11 11:19:47 -0400573 self._loop.remove_writer(self._sock_fd)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700574 self._loop.call_soon(self._call_connection_lost, None)
575
Victor Stinner978a9af2015-01-29 17:50:58 +0100576 # On Python 3.3 and older, objects with a destructor part of a reference
577 # cycle are never destroyed. It's not more the case on Python 3.4 thanks
578 # to the PEP 442.
Yury Selivanov2a8911c2015-08-04 15:56:33 -0400579 if compat.PY34:
Victor Stinner978a9af2015-01-29 17:50:58 +0100580 def __del__(self):
581 if self._sock is not None:
582 warnings.warn("unclosed transport %r" % self, ResourceWarning)
583 self._sock.close()
584
Victor Stinner065ca252014-02-19 01:40:41 +0100585 def _fatal_error(self, exc, message='Fatal error on transport'):
Guido van Rossum2546a172013-10-18 10:10:36 -0700586 # Should be called from exception handler only.
Victor Stinnerc94a93a2016-04-01 21:43:39 +0200587 if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
Victor Stinnere912e652014-07-12 03:11:53 +0200588 if self._loop.get_debug():
589 logger.debug("%r: %s", self, message, exc_info=True)
590 else:
Yury Selivanovff827f02014-02-18 18:02:19 -0500591 self._loop.call_exception_handler({
Victor Stinner065ca252014-02-19 01:40:41 +0100592 'message': message,
Yury Selivanovff827f02014-02-18 18:02:19 -0500593 'exception': exc,
594 'transport': self,
595 'protocol': self._protocol,
596 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700597 self._force_close(exc)
598
599 def _force_close(self, exc):
Guido van Rossum2546a172013-10-18 10:10:36 -0700600 if self._conn_lost:
601 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700602 if self._buffer:
603 self._buffer.clear()
604 self._loop.remove_writer(self._sock_fd)
Guido van Rossum2546a172013-10-18 10:10:36 -0700605 if not self._closing:
606 self._closing = True
607 self._loop.remove_reader(self._sock_fd)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700608 self._conn_lost += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700609 self._loop.call_soon(self._call_connection_lost, exc)
610
611 def _call_connection_lost(self, exc):
612 try:
Victor Stinner47bbea72015-01-29 02:56:05 +0100613 if self._protocol_connected:
614 self._protocol.connection_lost(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 finally:
616 self._sock.close()
617 self._sock = None
618 self._protocol = None
619 self._loop = None
620 server = self._server
621 if server is not None:
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200622 server._detach()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700623 self._server = None
624
Guido van Rossum355491d2013-10-18 15:17:11 -0700625 def get_write_buffer_size(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800626 return len(self._buffer)
Guido van Rossum355491d2013-10-18 15:17:11 -0700627
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628
629class _SelectorSocketTransport(_SelectorTransport):
630
631 def __init__(self, loop, sock, protocol, waiter=None,
632 extra=None, server=None):
633 super().__init__(loop, sock, protocol, extra, server)
634 self._eof = False
635 self._paused = False
636
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700637 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinnerfa737792015-01-29 00:36:51 +0100638 # only start reading when connection_made() has been called
639 self._loop.call_soon(self._loop.add_reader,
640 self._sock_fd, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700641 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +0100642 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -0500643 self._loop.call_soon(futures._set_result_unless_cancelled,
644 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700645
Guido van Rossum57497ad2013-10-18 07:58:20 -0700646 def pause_reading(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800647 if self._closing:
648 raise RuntimeError('Cannot pause_reading() when closing')
649 if self._paused:
650 raise RuntimeError('Already paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700651 self._paused = True
652 self._loop.remove_reader(self._sock_fd)
Victor Stinnere912e652014-07-12 03:11:53 +0200653 if self._loop.get_debug():
654 logger.debug("%r pauses reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700655
Guido van Rossum57497ad2013-10-18 07:58:20 -0700656 def resume_reading(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800657 if not self._paused:
658 raise RuntimeError('Not paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700659 self._paused = False
660 if self._closing:
661 return
662 self._loop.add_reader(self._sock_fd, self._read_ready)
Victor Stinnere912e652014-07-12 03:11:53 +0200663 if self._loop.get_debug():
664 logger.debug("%r resumes reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700665
666 def _read_ready(self):
Yury Selivanovca2e0a42016-06-11 11:19:47 -0400667 if self._conn_lost:
668 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700669 try:
670 data = self._sock.recv(self.max_size)
671 except (BlockingIOError, InterruptedError):
672 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700673 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100674 self._fatal_error(exc, 'Fatal read error on socket transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700675 else:
676 if data:
677 self._protocol.data_received(data)
678 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200679 if self._loop.get_debug():
680 logger.debug("%r received EOF", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700681 keep_open = self._protocol.eof_received()
Guido van Rossum1f683bb2013-10-30 14:36:58 -0700682 if keep_open:
683 # We're keeping the connection open so the
684 # protocol can write more, but we still can't
685 # receive more, so remove the reader callback.
686 self._loop.remove_reader(self._sock_fd)
687 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700688 self.close()
689
690 def write(self, data):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800691 if not isinstance(data, (bytes, bytearray, memoryview)):
Victor Stinner3e723092016-02-01 12:46:38 +0100692 raise TypeError('data argument must be a bytes-like object, '
693 'not %r' % type(data).__name__)
Guido van Rossuma5062c52013-11-27 14:12:48 -0800694 if self._eof:
695 raise RuntimeError('Cannot call write() after write_eof()')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700696 if not data:
697 return
698
699 if self._conn_lost:
700 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700701 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700702 self._conn_lost += 1
703 return
704
705 if not self._buffer:
Guido van Rossum355491d2013-10-18 15:17:11 -0700706 # Optimization: try to send now.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700707 try:
708 n = self._sock.send(data)
709 except (BlockingIOError, InterruptedError):
Guido van Rossum2546a172013-10-18 10:10:36 -0700710 pass
711 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100712 self._fatal_error(exc, 'Fatal write error on socket transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700713 return
714 else:
715 data = data[n:]
716 if not data:
717 return
Guido van Rossum355491d2013-10-18 15:17:11 -0700718 # Not all was written; register write handler.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700719 self._loop.add_writer(self._sock_fd, self._write_ready)
720
Guido van Rossum355491d2013-10-18 15:17:11 -0700721 # Add it to the buffer.
Guido van Rossuma5062c52013-11-27 14:12:48 -0800722 self._buffer.extend(data)
Guido van Rossum355491d2013-10-18 15:17:11 -0700723 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700724
725 def _write_ready(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800726 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700727
Yury Selivanovca2e0a42016-06-11 11:19:47 -0400728 if self._conn_lost:
729 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700730 try:
Guido van Rossuma5062c52013-11-27 14:12:48 -0800731 n = self._sock.send(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700732 except (BlockingIOError, InterruptedError):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800733 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700734 except Exception as exc:
735 self._loop.remove_writer(self._sock_fd)
Guido van Rossuma5062c52013-11-27 14:12:48 -0800736 self._buffer.clear()
Victor Stinner065ca252014-02-19 01:40:41 +0100737 self._fatal_error(exc, 'Fatal write error on socket transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700738 else:
Guido van Rossuma5062c52013-11-27 14:12:48 -0800739 if n:
740 del self._buffer[:n]
Guido van Rossum355491d2013-10-18 15:17:11 -0700741 self._maybe_resume_protocol() # May append to buffer.
742 if not self._buffer:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700743 self._loop.remove_writer(self._sock_fd)
744 if self._closing:
745 self._call_connection_lost(None)
746 elif self._eof:
747 self._sock.shutdown(socket.SHUT_WR)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700748
749 def write_eof(self):
750 if self._eof:
751 return
752 self._eof = True
753 if not self._buffer:
754 self._sock.shutdown(socket.SHUT_WR)
755
756 def can_write_eof(self):
757 return True
758
759
760class _SelectorSslTransport(_SelectorTransport):
761
Guido van Rossuma5062c52013-11-27 14:12:48 -0800762 _buffer_factory = bytearray
763
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700764 def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,
765 server_side=False, server_hostname=None,
766 extra=None, server=None):
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700767 if ssl is None:
768 raise RuntimeError('stdlib ssl module not available')
769
Victor Stinner231b4042015-01-14 00:19:09 +0100770 if not sslcontext:
771 sslcontext = sslproto._create_transport_context(server_side, server_hostname)
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700772
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700773 wrap_kwargs = {
774 'server_side': server_side,
775 'do_handshake_on_connect': False,
776 }
Benjamin Peterson7243b572014-11-23 17:04:34 -0600777 if server_hostname and not server_side:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700778 wrap_kwargs['server_hostname'] = server_hostname
779 sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)
780
781 super().__init__(loop, sslsock, protocol, extra, server)
Victor Stinner47bbea72015-01-29 02:56:05 +0100782 # the protocol connection is only made after the SSL handshake
783 self._protocol_connected = False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700784
785 self._server_hostname = server_hostname
786 self._waiter = waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700787 self._sslcontext = sslcontext
788 self._paused = False
789
790 # SSL-specific extra info. (peercert is set later)
791 self._extra.update(sslcontext=sslcontext)
792
Victor Stinnere912e652014-07-12 03:11:53 +0200793 if self._loop.get_debug():
794 logger.debug("%r starts SSL handshake", self)
795 start_time = self._loop.time()
796 else:
797 start_time = None
798 self._on_handshake(start_time)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700799
Victor Stinnerf07801b2015-01-29 00:36:35 +0100800 def _wakeup_waiter(self, exc=None):
801 if self._waiter is None:
802 return
803 if not self._waiter.cancelled():
804 if exc is not None:
805 self._waiter.set_exception(exc)
806 else:
807 self._waiter.set_result(None)
808 self._waiter = None
809
Victor Stinnere912e652014-07-12 03:11:53 +0200810 def _on_handshake(self, start_time):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700811 try:
812 self._sock.do_handshake()
813 except ssl.SSLWantReadError:
Victor Stinnere912e652014-07-12 03:11:53 +0200814 self._loop.add_reader(self._sock_fd,
815 self._on_handshake, start_time)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700816 return
817 except ssl.SSLWantWriteError:
Victor Stinnere912e652014-07-12 03:11:53 +0200818 self._loop.add_writer(self._sock_fd,
819 self._on_handshake, start_time)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700820 return
821 except BaseException as exc:
Victor Stinnere912e652014-07-12 03:11:53 +0200822 if self._loop.get_debug():
823 logger.warning("%r: SSL handshake failed",
824 self, exc_info=True)
Guido van Rossum355491d2013-10-18 15:17:11 -0700825 self._loop.remove_reader(self._sock_fd)
826 self._loop.remove_writer(self._sock_fd)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700827 self._sock.close()
Victor Stinnerf07801b2015-01-29 00:36:35 +0100828 self._wakeup_waiter(exc)
Victor Stinnere912e652014-07-12 03:11:53 +0200829 if isinstance(exc, Exception):
830 return
831 else:
832 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700833
Guido van Rossum355491d2013-10-18 15:17:11 -0700834 self._loop.remove_reader(self._sock_fd)
835 self._loop.remove_writer(self._sock_fd)
836
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700837 peercert = self._sock.getpeercert()
Christian Heimes6d8c1ab2013-12-06 00:23:13 +0100838 if not hasattr(self._sslcontext, 'check_hostname'):
839 # Verify hostname if requested, Python 3.4+ uses check_hostname
840 # and checks the hostname in do_handshake()
841 if (self._server_hostname and
842 self._sslcontext.verify_mode != ssl.CERT_NONE):
843 try:
844 ssl.match_hostname(peercert, self._server_hostname)
845 except Exception as exc:
Victor Stinnere912e652014-07-12 03:11:53 +0200846 if self._loop.get_debug():
847 logger.warning("%r: SSL handshake failed "
848 "on matching the hostname",
849 self, exc_info=True)
Christian Heimes6d8c1ab2013-12-06 00:23:13 +0100850 self._sock.close()
Victor Stinnerf07801b2015-01-29 00:36:35 +0100851 self._wakeup_waiter(exc)
Christian Heimes6d8c1ab2013-12-06 00:23:13 +0100852 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700853
854 # Add extra info that becomes available after handshake.
855 self._extra.update(peercert=peercert,
856 cipher=self._sock.cipher(),
857 compression=self._sock.compression(),
Victor Stinnerf7dc7fb2015-09-21 18:06:17 +0200858 ssl_object=self._sock,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700859 )
860
Guido van Rossum2b570162013-11-01 14:18:02 -0700861 self._read_wants_write = False
862 self._write_wants_read = False
863 self._loop.add_reader(self._sock_fd, self._read_ready)
Victor Stinner47bbea72015-01-29 02:56:05 +0100864 self._protocol_connected = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700865 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinnerf07801b2015-01-29 00:36:35 +0100866 # only wake up the waiter when connection_made() has been called
867 self._loop.call_soon(self._wakeup_waiter)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700868
Victor Stinnere912e652014-07-12 03:11:53 +0200869 if self._loop.get_debug():
870 dt = self._loop.time() - start_time
871 logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
872
Guido van Rossum57497ad2013-10-18 07:58:20 -0700873 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700874 # XXX This is a bit icky, given the comment at the top of
Guido van Rossum2b570162013-11-01 14:18:02 -0700875 # _read_ready(). Is it possible to evoke a deadlock? I don't
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700876 # know, although it doesn't look like it; write() will still
877 # accept more data for the buffer and eventually the app will
Guido van Rossum57497ad2013-10-18 07:58:20 -0700878 # call resume_reading() again, and things will flow again.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700879
Guido van Rossuma5062c52013-11-27 14:12:48 -0800880 if self._closing:
881 raise RuntimeError('Cannot pause_reading() when closing')
882 if self._paused:
883 raise RuntimeError('Already paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700884 self._paused = True
885 self._loop.remove_reader(self._sock_fd)
Victor Stinnere912e652014-07-12 03:11:53 +0200886 if self._loop.get_debug():
887 logger.debug("%r pauses reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700888
Guido van Rossum57497ad2013-10-18 07:58:20 -0700889 def resume_reading(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800890 if not self._paused:
Andrew Svetlov3207a032014-05-27 21:24:43 +0300891 raise RuntimeError('Not paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700892 self._paused = False
893 if self._closing:
894 return
Guido van Rossum2b570162013-11-01 14:18:02 -0700895 self._loop.add_reader(self._sock_fd, self._read_ready)
Victor Stinnere912e652014-07-12 03:11:53 +0200896 if self._loop.get_debug():
897 logger.debug("%r resumes reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700898
Guido van Rossum2b570162013-11-01 14:18:02 -0700899 def _read_ready(self):
Yury Selivanovca2e0a42016-06-11 11:19:47 -0400900 if self._conn_lost:
901 return
Guido van Rossum2b570162013-11-01 14:18:02 -0700902 if self._write_wants_read:
903 self._write_wants_read = False
904 self._write_ready()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700905
Guido van Rossum2b570162013-11-01 14:18:02 -0700906 if self._buffer:
907 self._loop.add_writer(self._sock_fd, self._write_ready)
908
909 try:
910 data = self._sock.recv(self.max_size)
911 except (BlockingIOError, InterruptedError, ssl.SSLWantReadError):
912 pass
913 except ssl.SSLWantWriteError:
914 self._read_wants_write = True
915 self._loop.remove_reader(self._sock_fd)
916 self._loop.add_writer(self._sock_fd, self._write_ready)
917 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100918 self._fatal_error(exc, 'Fatal read error on SSL transport')
Guido van Rossum2b570162013-11-01 14:18:02 -0700919 else:
920 if data:
921 self._protocol.data_received(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700922 else:
Guido van Rossum2b570162013-11-01 14:18:02 -0700923 try:
Victor Stinnere912e652014-07-12 03:11:53 +0200924 if self._loop.get_debug():
925 logger.debug("%r received EOF", self)
Guido van Rossum3a703922013-11-01 14:19:35 -0700926 keep_open = self._protocol.eof_received()
927 if keep_open:
928 logger.warning('returning true from eof_received() '
929 'has no effect when using ssl')
Guido van Rossum2b570162013-11-01 14:18:02 -0700930 finally:
931 self.close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700932
Guido van Rossum2b570162013-11-01 14:18:02 -0700933 def _write_ready(self):
Yury Selivanovca2e0a42016-06-11 11:19:47 -0400934 if self._conn_lost:
935 return
Guido van Rossum2b570162013-11-01 14:18:02 -0700936 if self._read_wants_write:
937 self._read_wants_write = False
938 self._read_ready()
939
940 if not (self._paused or self._closing):
941 self._loop.add_reader(self._sock_fd, self._read_ready)
942
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700943 if self._buffer:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700944 try:
Guido van Rossuma5062c52013-11-27 14:12:48 -0800945 n = self._sock.send(self._buffer)
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100946 except (BlockingIOError, InterruptedError, ssl.SSLWantWriteError):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700947 n = 0
Guido van Rossum2b570162013-11-01 14:18:02 -0700948 except ssl.SSLWantReadError:
949 n = 0
950 self._loop.remove_writer(self._sock_fd)
951 self._write_wants_read = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700952 except Exception as exc:
953 self._loop.remove_writer(self._sock_fd)
Guido van Rossuma5062c52013-11-27 14:12:48 -0800954 self._buffer.clear()
Victor Stinner065ca252014-02-19 01:40:41 +0100955 self._fatal_error(exc, 'Fatal write error on SSL transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700956 return
957
Guido van Rossuma5062c52013-11-27 14:12:48 -0800958 if n:
959 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700960
Guido van Rossum2b570162013-11-01 14:18:02 -0700961 self._maybe_resume_protocol() # May append to buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700962
Guido van Rossum2b570162013-11-01 14:18:02 -0700963 if not self._buffer:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700964 self._loop.remove_writer(self._sock_fd)
Guido van Rossum2b570162013-11-01 14:18:02 -0700965 if self._closing:
966 self._call_connection_lost(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700967
968 def write(self, data):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800969 if not isinstance(data, (bytes, bytearray, memoryview)):
Victor Stinner3e723092016-02-01 12:46:38 +0100970 raise TypeError('data argument must be a bytes-like object, '
971 'not %r' % type(data).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700972 if not data:
973 return
974
975 if self._conn_lost:
976 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700977 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700978 self._conn_lost += 1
979 return
980
Guido van Rossum2b570162013-11-01 14:18:02 -0700981 if not self._buffer:
982 self._loop.add_writer(self._sock_fd, self._write_ready)
983
984 # Add it to the buffer.
Guido van Rossuma5062c52013-11-27 14:12:48 -0800985 self._buffer.extend(data)
Guido van Rossum355491d2013-10-18 15:17:11 -0700986 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700987
988 def can_write_eof(self):
989 return False
990
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700991
992class _SelectorDatagramTransport(_SelectorTransport):
993
Guido van Rossuma5062c52013-11-27 14:12:48 -0800994 _buffer_factory = collections.deque
995
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200996 def __init__(self, loop, sock, protocol, address=None,
997 waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700998 super().__init__(loop, sock, protocol, extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700999 self._address = address
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001000 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinner47bbea72015-01-29 02:56:05 +01001001 # only start reading when connection_made() has been called
1002 self._loop.call_soon(self._loop.add_reader,
1003 self._sock_fd, self._read_ready)
Victor Stinnerbfff45d2014-07-08 23:57:31 +02001004 if waiter is not None:
Victor Stinnerf07801b2015-01-29 00:36:35 +01001005 # only wake up the waiter when connection_made() has been called
Yury Selivanov5d7e3b62015-11-17 12:19:41 -05001006 self._loop.call_soon(futures._set_result_unless_cancelled,
1007 waiter, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001008
Guido van Rossum355491d2013-10-18 15:17:11 -07001009 def get_write_buffer_size(self):
1010 return sum(len(data) for data, _ in self._buffer)
1011
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001012 def _read_ready(self):
Yury Selivanovca2e0a42016-06-11 11:19:47 -04001013 if self._conn_lost:
1014 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001015 try:
1016 data, addr = self._sock.recvfrom(self.max_size)
1017 except (BlockingIOError, InterruptedError):
1018 pass
Guido van Rossum2335de72013-11-15 16:51:48 -08001019 except OSError as exc:
1020 self._protocol.error_received(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001021 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +01001022 self._fatal_error(exc, 'Fatal read error on datagram transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001023 else:
1024 self._protocol.datagram_received(data, addr)
1025
1026 def sendto(self, data, addr=None):
Guido van Rossuma5062c52013-11-27 14:12:48 -08001027 if not isinstance(data, (bytes, bytearray, memoryview)):
Victor Stinner3e723092016-02-01 12:46:38 +01001028 raise TypeError('data argument must be a bytes-like object, '
1029 'not %r' % type(data).__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001030 if not data:
1031 return
1032
Guido van Rossuma5062c52013-11-27 14:12:48 -08001033 if self._address and addr not in (None, self._address):
1034 raise ValueError('Invalid address: must be None or %s' %
1035 (self._address,))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001036
1037 if self._conn_lost and self._address:
1038 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -07001039 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001040 self._conn_lost += 1
1041 return
1042
1043 if not self._buffer:
1044 # Attempt to send it right away first.
1045 try:
1046 if self._address:
1047 self._sock.send(data)
1048 else:
1049 self._sock.sendto(data, addr)
1050 return
Guido van Rossum2546a172013-10-18 10:10:36 -07001051 except (BlockingIOError, InterruptedError):
1052 self._loop.add_writer(self._sock_fd, self._sendto_ready)
Guido van Rossum2335de72013-11-15 16:51:48 -08001053 except OSError as exc:
1054 self._protocol.error_received(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001055 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001056 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +01001057 self._fatal_error(exc,
1058 'Fatal write error on datagram transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001059 return
1060
Guido van Rossuma5062c52013-11-27 14:12:48 -08001061 # Ensure that what we buffer is immutable.
1062 self._buffer.append((bytes(data), addr))
Guido van Rossum355491d2013-10-18 15:17:11 -07001063 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001064
1065 def _sendto_ready(self):
1066 while self._buffer:
1067 data, addr = self._buffer.popleft()
1068 try:
1069 if self._address:
1070 self._sock.send(data)
1071 else:
1072 self._sock.sendto(data, addr)
Guido van Rossum2546a172013-10-18 10:10:36 -07001073 except (BlockingIOError, InterruptedError):
1074 self._buffer.appendleft((data, addr)) # Try again later.
1075 break
Guido van Rossum2335de72013-11-15 16:51:48 -08001076 except OSError as exc:
1077 self._protocol.error_received(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001078 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001079 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +01001080 self._fatal_error(exc,
1081 'Fatal write error on datagram transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001082 return
1083
Guido van Rossum355491d2013-10-18 15:17:11 -07001084 self._maybe_resume_protocol() # May append to buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001085 if not self._buffer:
1086 self._loop.remove_writer(self._sock_fd)
1087 if self._closing:
1088 self._call_connection_lost(None)