blob: 58b61f1c89ebf7832d155994237c2f95f5aeadde [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop using a selector and related classes.
2
3A selector is a "notify-when-ready" multiplexer. For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
6
Victor Stinner8dffc452014-01-25 15:32:06 +01007__all__ = ['BaseSelectorEventLoop']
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import collections
Guido van Rossum3317a132013-11-01 14:12:50 -070010import errno
Victor Stinnerd5aeccf92014-08-31 15:07:57 +020011import functools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import socket
13try:
14 import ssl
15except ImportError: # pragma: no cover
16 ssl = None
17
18from . import base_events
19from . import constants
20from . import events
21from . import futures
22from . import selectors
23from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070024from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
Victor Stinnere912e652014-07-12 03:11:53 +020027def _test_selector_event(selector, fd, event):
28 # Test if the selector is monitoring 'event' events
29 # for the file descriptor 'fd'.
30 try:
31 key = selector.get_key(fd)
32 except KeyError:
33 return False
34 else:
35 return bool(key.events & event)
36
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038class BaseSelectorEventLoop(base_events.BaseEventLoop):
39 """Selector event loop.
40
41 See events.EventLoop for API specification.
42 """
43
44 def __init__(self, selector=None):
45 super().__init__()
46
47 if selector is None:
48 selector = selectors.DefaultSelector()
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070049 logger.debug('Using selector: %s', selector.__class__.__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050 self._selector = selector
51 self._make_self_pipe()
52
53 def _make_socket_transport(self, sock, protocol, waiter=None, *,
54 extra=None, server=None):
55 return _SelectorSocketTransport(self, sock, protocol, waiter,
56 extra, server)
57
Victor Stinner15cc6782015-01-09 00:09:10 +010058 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
59 *, server_side=False, server_hostname=None,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070060 extra=None, server=None):
61 return _SelectorSslTransport(
62 self, rawsock, protocol, sslcontext, waiter,
63 server_side, server_hostname, extra, server)
64
65 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +020066 address=None, waiter=None, extra=None):
67 return _SelectorDatagramTransport(self, sock, protocol,
68 address, waiter, extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069
70 def close(self):
Victor Stinner956de692014-12-26 21:07:52 +010071 if self.is_running():
Victor Stinner5e631202014-11-21 00:23:27 +010072 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +020073 if self.is_closed():
74 return
75 self._close_self_pipe()
Victor Stinner5e631202014-11-21 00:23:27 +010076 super().close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 if self._selector is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078 self._selector.close()
79 self._selector = None
80
81 def _socketpair(self):
82 raise NotImplementedError
83
84 def _close_self_pipe(self):
85 self.remove_reader(self._ssock.fileno())
86 self._ssock.close()
87 self._ssock = None
88 self._csock.close()
89 self._csock = None
90 self._internal_fds -= 1
91
92 def _make_self_pipe(self):
93 # A self-socket, really. :-)
94 self._ssock, self._csock = self._socketpair()
95 self._ssock.setblocking(False)
96 self._csock.setblocking(False)
97 self._internal_fds += 1
98 self.add_reader(self._ssock.fileno(), self._read_from_self)
99
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200100 def _process_self_data(self, data):
101 pass
102
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700103 def _read_from_self(self):
Victor Stinner54c4b8e2014-06-19 12:59:15 +0200104 while True:
105 try:
106 data = self._ssock.recv(4096)
107 if not data:
108 break
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200109 self._process_self_data(data)
Victor Stinner54c4b8e2014-06-19 12:59:15 +0200110 except InterruptedError:
111 continue
112 except BlockingIOError:
113 break
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114
115 def _write_to_self(self):
Guido van Rossum3d139d82014-05-06 14:42:40 -0700116 # This may be called from a different thread, possibly after
117 # _close_self_pipe() has been called or even while it is
118 # running. Guard for self._csock being None or closed. When
119 # a socket is closed, send() raises OSError (with errno set to
120 # EBADF, but let's not rely on the exact error code).
121 csock = self._csock
122 if csock is not None:
123 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200124 csock.send(b'\0')
Guido van Rossum3d139d82014-05-06 14:42:40 -0700125 except OSError:
Victor Stinner65dd69a2014-07-25 22:36:05 +0200126 if self._debug:
127 logger.debug("Fail to write a null byte into the "
128 "self-pipe socket",
129 exc_info=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700130
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700131 def _start_serving(self, protocol_factory, sock,
132 sslcontext=None, server=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700133 self.add_reader(sock.fileno(), self._accept_connection,
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700134 protocol_factory, sock, sslcontext, server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700135
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700136 def _accept_connection(self, protocol_factory, sock,
137 sslcontext=None, server=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700138 try:
139 conn, addr = sock.accept()
Victor Stinnere912e652014-07-12 03:11:53 +0200140 if self._debug:
141 logger.debug("%r got a new connection from %r: %r",
142 server, addr, conn)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700143 conn.setblocking(False)
Guido van Rossum3317a132013-11-01 14:12:50 -0700144 except (BlockingIOError, InterruptedError, ConnectionAbortedError):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700145 pass # False alarm.
Guido van Rossum3317a132013-11-01 14:12:50 -0700146 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147 # There's nowhere to send the error, so just log it.
148 # TODO: Someone will want an error handler for this.
Guido van Rossum3317a132013-11-01 14:12:50 -0700149 if exc.errno in (errno.EMFILE, errno.ENFILE,
150 errno.ENOBUFS, errno.ENOMEM):
151 # Some platforms (e.g. Linux keep reporting the FD as
152 # ready, so we remove the read handler temporarily.
153 # We'll try again in a while.
Yury Selivanovff827f02014-02-18 18:02:19 -0500154 self.call_exception_handler({
155 'message': 'socket.accept() out of system resource',
156 'exception': exc,
157 'socket': sock,
158 })
Guido van Rossum3317a132013-11-01 14:12:50 -0700159 self.remove_reader(sock.fileno())
160 self.call_later(constants.ACCEPT_RETRY_DELAY,
161 self._start_serving,
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700162 protocol_factory, sock, sslcontext, server)
Guido van Rossum3317a132013-11-01 14:12:50 -0700163 else:
164 raise # The event loop will catch, log and ignore it.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700165 else:
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700166 if sslcontext:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 self._make_ssl_transport(
Victor Stinner8d9c1452015-01-08 12:06:36 +0100168 conn, protocol_factory(), sslcontext,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700169 server_side=True, extra={'peername': addr}, server=server)
170 else:
171 self._make_socket_transport(
172 conn, protocol_factory(), extra={'peername': addr},
173 server=server)
174 # It's now up to the protocol to handle the connection.
175
176 def add_reader(self, fd, callback, *args):
177 """Add a reader callback."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200178 self._check_closed()
Yury Selivanovff827f02014-02-18 18:02:19 -0500179 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700180 try:
181 key = self._selector.get_key(fd)
182 except KeyError:
183 self._selector.register(fd, selectors.EVENT_READ,
184 (handle, None))
185 else:
186 mask, (reader, writer) = key.events, key.data
187 self._selector.modify(fd, mask | selectors.EVENT_READ,
188 (handle, writer))
189 if reader is not None:
190 reader.cancel()
191
192 def remove_reader(self, fd):
193 """Remove a reader callback."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200194 if self.is_closed():
Victor Stinnereeeebcd2014-03-06 00:52:53 +0100195 return False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700196 try:
197 key = self._selector.get_key(fd)
198 except KeyError:
199 return False
200 else:
201 mask, (reader, writer) = key.events, key.data
202 mask &= ~selectors.EVENT_READ
203 if not mask:
204 self._selector.unregister(fd)
205 else:
206 self._selector.modify(fd, mask, (None, writer))
207
208 if reader is not None:
209 reader.cancel()
210 return True
211 else:
212 return False
213
214 def add_writer(self, fd, callback, *args):
215 """Add a writer callback.."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200216 self._check_closed()
Yury Selivanovff827f02014-02-18 18:02:19 -0500217 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218 try:
219 key = self._selector.get_key(fd)
220 except KeyError:
221 self._selector.register(fd, selectors.EVENT_WRITE,
222 (None, handle))
223 else:
224 mask, (reader, writer) = key.events, key.data
225 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
226 (reader, handle))
227 if writer is not None:
228 writer.cancel()
229
230 def remove_writer(self, fd):
231 """Remove a writer callback."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200232 if self.is_closed():
Victor Stinnereeeebcd2014-03-06 00:52:53 +0100233 return False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234 try:
235 key = self._selector.get_key(fd)
236 except KeyError:
237 return False
238 else:
239 mask, (reader, writer) = key.events, key.data
240 # Remove both writer and connector.
241 mask &= ~selectors.EVENT_WRITE
242 if not mask:
243 self._selector.unregister(fd)
244 else:
245 self._selector.modify(fd, mask, (reader, None))
246
247 if writer is not None:
248 writer.cancel()
249 return True
250 else:
251 return False
252
253 def sock_recv(self, sock, n):
Victor Stinnerd1432092014-06-19 17:11:49 +0200254 """Receive data from the socket.
255
256 The return value is a bytes object representing the data received.
257 The maximum amount of data to be received at once is specified by
258 nbytes.
259
260 This method is a coroutine.
261 """
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200262 if self.get_debug() and sock.gettimeout() != 0:
263 raise ValueError("the socket must be non-blocking")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264 fut = futures.Future(loop=self)
265 self._sock_recv(fut, False, sock, n)
266 return fut
267
268 def _sock_recv(self, fut, registered, sock, n):
Victor Stinner28773462014-02-13 09:24:37 +0100269 # _sock_recv() can add itself as an I/O callback if the operation can't
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500270 # be done immediately. Don't use it directly, call sock_recv().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271 fd = sock.fileno()
272 if registered:
273 # Remove the callback early. It should be rare that the
274 # selector says the fd is ready but the call still returns
275 # EAGAIN, and I am willing to take a hit in that case in
276 # order to simplify the common case.
277 self.remove_reader(fd)
278 if fut.cancelled():
279 return
280 try:
281 data = sock.recv(n)
282 except (BlockingIOError, InterruptedError):
283 self.add_reader(fd, self._sock_recv, fut, True, sock, n)
284 except Exception as exc:
285 fut.set_exception(exc)
286 else:
287 fut.set_result(data)
288
289 def sock_sendall(self, sock, data):
Victor Stinnerd1432092014-06-19 17:11:49 +0200290 """Send data to the socket.
291
292 The socket must be connected to a remote socket. This method continues
293 to send data from data until either all data has been sent or an
294 error occurs. None is returned on success. On error, an exception is
295 raised, and there is no way to determine how much data, if any, was
296 successfully processed by the receiving end of the connection.
297
298 This method is a coroutine.
299 """
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200300 if self.get_debug() and sock.gettimeout() != 0:
301 raise ValueError("the socket must be non-blocking")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302 fut = futures.Future(loop=self)
303 if data:
304 self._sock_sendall(fut, False, sock, data)
305 else:
306 fut.set_result(None)
307 return fut
308
309 def _sock_sendall(self, fut, registered, sock, data):
310 fd = sock.fileno()
311
312 if registered:
313 self.remove_writer(fd)
314 if fut.cancelled():
315 return
316
317 try:
318 n = sock.send(data)
319 except (BlockingIOError, InterruptedError):
320 n = 0
321 except Exception as exc:
322 fut.set_exception(exc)
323 return
324
325 if n == len(data):
326 fut.set_result(None)
327 else:
328 if n:
329 data = data[n:]
330 self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
331
332 def sock_connect(self, sock, address):
Victor Stinnerd1432092014-06-19 17:11:49 +0200333 """Connect to a remote socket at address.
334
335 The address must be already resolved to avoid the trap of hanging the
336 entire event loop when the address requires doing a DNS lookup. For
337 example, it must be an IP address, not an hostname, for AF_INET and
338 AF_INET6 address families. Use getaddrinfo() to resolve the hostname
339 asynchronously.
340
341 This method is a coroutine.
342 """
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200343 if self.get_debug() and sock.gettimeout() != 0:
344 raise ValueError("the socket must be non-blocking")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 fut = futures.Future(loop=self)
Victor Stinner28773462014-02-13 09:24:37 +0100346 try:
347 base_events._check_resolved_address(sock, address)
348 except ValueError as err:
349 fut.set_exception(err)
350 else:
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200351 self._sock_connect(fut, sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352 return fut
353
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200354 def _sock_connect(self, fut, sock, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355 fd = sock.fileno()
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200356 try:
357 while True:
358 try:
359 sock.connect(address)
360 except InterruptedError:
361 continue
362 else:
363 break
364 except BlockingIOError:
365 fut.add_done_callback(functools.partial(self._sock_connect_done,
Victor Stinner3531d902015-01-09 01:42:52 +0100366 fd))
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200367 self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
368 except Exception as exc:
369 fut.set_exception(exc)
370 else:
371 fut.set_result(None)
372
Victor Stinner3531d902015-01-09 01:42:52 +0100373 def _sock_connect_done(self, fd, fut):
374 self.remove_writer(fd)
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200375
376 def _sock_connect_cb(self, fut, sock, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377 if fut.cancelled():
378 return
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200379
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380 try:
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200381 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
382 if err != 0:
383 # Jump to any except clause below.
384 raise OSError(err, 'Connect call failed %s' % (address,))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385 except (BlockingIOError, InterruptedError):
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200386 # socket is still registered, the callback will be retried later
387 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 except Exception as exc:
389 fut.set_exception(exc)
390 else:
391 fut.set_result(None)
392
393 def sock_accept(self, sock):
Victor Stinnerd1432092014-06-19 17:11:49 +0200394 """Accept a connection.
395
396 The socket must be bound to an address and listening for connections.
397 The return value is a pair (conn, address) where conn is a new socket
398 object usable to send and receive data on the connection, and address
399 is the address bound to the socket on the other end of the connection.
400
401 This method is a coroutine.
402 """
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200403 if self.get_debug() and sock.gettimeout() != 0:
404 raise ValueError("the socket must be non-blocking")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405 fut = futures.Future(loop=self)
406 self._sock_accept(fut, False, sock)
407 return fut
408
409 def _sock_accept(self, fut, registered, sock):
410 fd = sock.fileno()
411 if registered:
412 self.remove_reader(fd)
413 if fut.cancelled():
414 return
415 try:
416 conn, address = sock.accept()
417 conn.setblocking(False)
418 except (BlockingIOError, InterruptedError):
419 self.add_reader(fd, self._sock_accept, fut, True, sock)
420 except Exception as exc:
421 fut.set_exception(exc)
422 else:
423 fut.set_result((conn, address))
424
425 def _process_events(self, event_list):
426 for key, mask in event_list:
427 fileobj, (reader, writer) = key.fileobj, key.data
428 if mask & selectors.EVENT_READ and reader is not None:
429 if reader._cancelled:
430 self.remove_reader(fileobj)
431 else:
432 self._add_callback(reader)
433 if mask & selectors.EVENT_WRITE and writer is not None:
434 if writer._cancelled:
435 self.remove_writer(fileobj)
436 else:
437 self._add_callback(writer)
438
439 def _stop_serving(self, sock):
440 self.remove_reader(sock.fileno())
441 sock.close()
442
443
Yury Selivanovc0982412014-02-18 18:41:13 -0500444class _SelectorTransport(transports._FlowControlMixin,
445 transports.Transport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446
447 max_size = 256 * 1024 # Buffer size passed to recv().
448
Guido van Rossuma5062c52013-11-27 14:12:48 -0800449 _buffer_factory = bytearray # Constructs initial value for self._buffer.
450
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451 def __init__(self, loop, sock, protocol, extra, server=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100452 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453 self._extra['socket'] = sock
454 self._extra['sockname'] = sock.getsockname()
455 if 'peername' not in self._extra:
456 try:
457 self._extra['peername'] = sock.getpeername()
458 except socket.error:
459 self._extra['peername'] = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460 self._sock = sock
461 self._sock_fd = sock.fileno()
462 self._protocol = protocol
463 self._server = server
Guido van Rossuma5062c52013-11-27 14:12:48 -0800464 self._buffer = self._buffer_factory()
Guido van Rossum2546a172013-10-18 10:10:36 -0700465 self._conn_lost = 0 # Set when call to connection_lost scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466 self._closing = False # Set when close() called.
Guido van Rossum355491d2013-10-18 15:17:11 -0700467 if self._server is not None:
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200468 self._server._attach()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469
Victor Stinnere912e652014-07-12 03:11:53 +0200470 def __repr__(self):
Victor Stinner0e34dc32014-10-12 09:52:11 +0200471 info = [self.__class__.__name__]
472 if self._sock is None:
473 info.append('closed')
474 elif self._closing:
475 info.append('closing')
476 info.append('fd=%s' % self._sock_fd)
Victor Stinnerb2614752014-08-25 23:20:52 +0200477 # test if the transport was closed
478 if self._loop is not None:
479 polling = _test_selector_event(self._loop._selector,
480 self._sock_fd, selectors.EVENT_READ)
481 if polling:
482 info.append('read=polling')
483 else:
484 info.append('read=idle')
Victor Stinnere912e652014-07-12 03:11:53 +0200485
Victor Stinnerb2614752014-08-25 23:20:52 +0200486 polling = _test_selector_event(self._loop._selector,
Victor Stinner15cc6782015-01-09 00:09:10 +0100487 self._sock_fd,
488 selectors.EVENT_WRITE)
Victor Stinnerb2614752014-08-25 23:20:52 +0200489 if polling:
490 state = 'polling'
491 else:
492 state = 'idle'
Victor Stinnere912e652014-07-12 03:11:53 +0200493
Victor Stinnerb2614752014-08-25 23:20:52 +0200494 bufsize = self.get_write_buffer_size()
495 info.append('write=<%s, bufsize=%s>' % (state, bufsize))
Victor Stinnere912e652014-07-12 03:11:53 +0200496 return '<%s>' % ' '.join(info)
497
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700498 def abort(self):
499 self._force_close(None)
500
501 def close(self):
502 if self._closing:
503 return
504 self._closing = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700505 self._loop.remove_reader(self._sock_fd)
506 if not self._buffer:
Guido van Rossum2546a172013-10-18 10:10:36 -0700507 self._conn_lost += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700508 self._loop.call_soon(self._call_connection_lost, None)
509
Victor Stinner065ca252014-02-19 01:40:41 +0100510 def _fatal_error(self, exc, message='Fatal error on transport'):
Guido van Rossum2546a172013-10-18 10:10:36 -0700511 # Should be called from exception handler only.
Victor Stinnere912e652014-07-12 03:11:53 +0200512 if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
513 if self._loop.get_debug():
514 logger.debug("%r: %s", self, message, exc_info=True)
515 else:
Yury Selivanovff827f02014-02-18 18:02:19 -0500516 self._loop.call_exception_handler({
Victor Stinner065ca252014-02-19 01:40:41 +0100517 'message': message,
Yury Selivanovff827f02014-02-18 18:02:19 -0500518 'exception': exc,
519 'transport': self,
520 'protocol': self._protocol,
521 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700522 self._force_close(exc)
523
524 def _force_close(self, exc):
Guido van Rossum2546a172013-10-18 10:10:36 -0700525 if self._conn_lost:
526 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700527 if self._buffer:
528 self._buffer.clear()
529 self._loop.remove_writer(self._sock_fd)
Guido van Rossum2546a172013-10-18 10:10:36 -0700530 if not self._closing:
531 self._closing = True
532 self._loop.remove_reader(self._sock_fd)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533 self._conn_lost += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700534 self._loop.call_soon(self._call_connection_lost, exc)
535
536 def _call_connection_lost(self, exc):
537 try:
538 self._protocol.connection_lost(exc)
539 finally:
540 self._sock.close()
541 self._sock = None
542 self._protocol = None
543 self._loop = None
544 server = self._server
545 if server is not None:
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200546 server._detach()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700547 self._server = None
548
Guido van Rossum355491d2013-10-18 15:17:11 -0700549 def get_write_buffer_size(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800550 return len(self._buffer)
Guido van Rossum355491d2013-10-18 15:17:11 -0700551
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700552
553class _SelectorSocketTransport(_SelectorTransport):
554
555 def __init__(self, loop, sock, protocol, waiter=None,
556 extra=None, server=None):
557 super().__init__(loop, sock, protocol, extra, server)
558 self._eof = False
559 self._paused = False
560
561 self._loop.add_reader(self._sock_fd, self._read_ready)
562 self._loop.call_soon(self._protocol.connection_made, self)
563 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200564 # wait until protocol.connection_made() has been called
Victor Stinner799a60c2014-07-07 18:08:22 +0200565 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700566
Guido van Rossum57497ad2013-10-18 07:58:20 -0700567 def pause_reading(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800568 if self._closing:
569 raise RuntimeError('Cannot pause_reading() when closing')
570 if self._paused:
571 raise RuntimeError('Already paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700572 self._paused = True
573 self._loop.remove_reader(self._sock_fd)
Victor Stinnere912e652014-07-12 03:11:53 +0200574 if self._loop.get_debug():
575 logger.debug("%r pauses reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700576
Guido van Rossum57497ad2013-10-18 07:58:20 -0700577 def resume_reading(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800578 if not self._paused:
579 raise RuntimeError('Not paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700580 self._paused = False
581 if self._closing:
582 return
583 self._loop.add_reader(self._sock_fd, self._read_ready)
Victor Stinnere912e652014-07-12 03:11:53 +0200584 if self._loop.get_debug():
585 logger.debug("%r resumes reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700586
587 def _read_ready(self):
588 try:
589 data = self._sock.recv(self.max_size)
590 except (BlockingIOError, InterruptedError):
591 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700592 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100593 self._fatal_error(exc, 'Fatal read error on socket transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700594 else:
595 if data:
596 self._protocol.data_received(data)
597 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200598 if self._loop.get_debug():
599 logger.debug("%r received EOF", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700600 keep_open = self._protocol.eof_received()
Guido van Rossum1f683bb2013-10-30 14:36:58 -0700601 if keep_open:
602 # We're keeping the connection open so the
603 # protocol can write more, but we still can't
604 # receive more, so remove the reader callback.
605 self._loop.remove_reader(self._sock_fd)
606 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607 self.close()
608
609 def write(self, data):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800610 if not isinstance(data, (bytes, bytearray, memoryview)):
611 raise TypeError('data argument must be byte-ish (%r)',
612 type(data))
613 if self._eof:
614 raise RuntimeError('Cannot call write() after write_eof()')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 if not data:
616 return
617
618 if self._conn_lost:
619 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700620 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700621 self._conn_lost += 1
622 return
623
624 if not self._buffer:
Guido van Rossum355491d2013-10-18 15:17:11 -0700625 # Optimization: try to send now.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700626 try:
627 n = self._sock.send(data)
628 except (BlockingIOError, InterruptedError):
Guido van Rossum2546a172013-10-18 10:10:36 -0700629 pass
630 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100631 self._fatal_error(exc, 'Fatal write error on socket transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700632 return
633 else:
634 data = data[n:]
635 if not data:
636 return
Guido van Rossum355491d2013-10-18 15:17:11 -0700637 # Not all was written; register write handler.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700638 self._loop.add_writer(self._sock_fd, self._write_ready)
639
Guido van Rossum355491d2013-10-18 15:17:11 -0700640 # Add it to the buffer.
Guido van Rossuma5062c52013-11-27 14:12:48 -0800641 self._buffer.extend(data)
Guido van Rossum355491d2013-10-18 15:17:11 -0700642 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700643
644 def _write_ready(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800645 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700646
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647 try:
Guido van Rossuma5062c52013-11-27 14:12:48 -0800648 n = self._sock.send(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700649 except (BlockingIOError, InterruptedError):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800650 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700651 except Exception as exc:
652 self._loop.remove_writer(self._sock_fd)
Guido van Rossuma5062c52013-11-27 14:12:48 -0800653 self._buffer.clear()
Victor Stinner065ca252014-02-19 01:40:41 +0100654 self._fatal_error(exc, 'Fatal write error on socket transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700655 else:
Guido van Rossuma5062c52013-11-27 14:12:48 -0800656 if n:
657 del self._buffer[:n]
Guido van Rossum355491d2013-10-18 15:17:11 -0700658 self._maybe_resume_protocol() # May append to buffer.
659 if not self._buffer:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700660 self._loop.remove_writer(self._sock_fd)
661 if self._closing:
662 self._call_connection_lost(None)
663 elif self._eof:
664 self._sock.shutdown(socket.SHUT_WR)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700665
666 def write_eof(self):
667 if self._eof:
668 return
669 self._eof = True
670 if not self._buffer:
671 self._sock.shutdown(socket.SHUT_WR)
672
673 def can_write_eof(self):
674 return True
675
676
677class _SelectorSslTransport(_SelectorTransport):
678
Guido van Rossuma5062c52013-11-27 14:12:48 -0800679 _buffer_factory = bytearray
680
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700681 def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,
682 server_side=False, server_hostname=None,
683 extra=None, server=None):
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700684 if ssl is None:
685 raise RuntimeError('stdlib ssl module not available')
686
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700687 if server_side:
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700688 if not sslcontext:
689 raise ValueError('Server side ssl needs a valid SSLContext')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700690 else:
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700691 if not sslcontext:
692 # Client side may pass ssl=True to use a default
693 # context; in that case the sslcontext passed is None.
Antoine Pitroufd39a892014-10-15 16:58:21 +0200694 # The default is secure for client connections.
695 if hasattr(ssl, 'create_default_context'):
696 # Python 3.4+: use up-to-date strong settings.
697 sslcontext = ssl.create_default_context()
698 if not server_hostname:
699 sslcontext.check_hostname = False
Guido van Rossum7fa6e1a2013-11-23 15:36:43 -0800700 else:
701 # Fallback for Python 3.3.
702 sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
703 sslcontext.options |= ssl.OP_NO_SSLv2
Antoine Pitroufd39a892014-10-15 16:58:21 +0200704 sslcontext.options |= ssl.OP_NO_SSLv3
Guido van Rossum7fa6e1a2013-11-23 15:36:43 -0800705 sslcontext.set_default_verify_paths()
706 sslcontext.verify_mode = ssl.CERT_REQUIRED
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700707
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700708 wrap_kwargs = {
709 'server_side': server_side,
710 'do_handshake_on_connect': False,
711 }
Benjamin Peterson7243b572014-11-23 17:04:34 -0600712 if server_hostname and not server_side:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700713 wrap_kwargs['server_hostname'] = server_hostname
714 sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)
715
716 super().__init__(loop, sslsock, protocol, extra, server)
717
718 self._server_hostname = server_hostname
719 self._waiter = waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700720 self._sslcontext = sslcontext
721 self._paused = False
722
723 # SSL-specific extra info. (peercert is set later)
724 self._extra.update(sslcontext=sslcontext)
725
Victor Stinnere912e652014-07-12 03:11:53 +0200726 if self._loop.get_debug():
727 logger.debug("%r starts SSL handshake", self)
728 start_time = self._loop.time()
729 else:
730 start_time = None
731 self._on_handshake(start_time)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700732
Victor Stinnere912e652014-07-12 03:11:53 +0200733 def _on_handshake(self, start_time):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700734 try:
735 self._sock.do_handshake()
736 except ssl.SSLWantReadError:
Victor Stinnere912e652014-07-12 03:11:53 +0200737 self._loop.add_reader(self._sock_fd,
738 self._on_handshake, start_time)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700739 return
740 except ssl.SSLWantWriteError:
Victor Stinnere912e652014-07-12 03:11:53 +0200741 self._loop.add_writer(self._sock_fd,
742 self._on_handshake, start_time)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700743 return
744 except BaseException as exc:
Victor Stinnere912e652014-07-12 03:11:53 +0200745 if self._loop.get_debug():
746 logger.warning("%r: SSL handshake failed",
747 self, exc_info=True)
Guido van Rossum355491d2013-10-18 15:17:11 -0700748 self._loop.remove_reader(self._sock_fd)
749 self._loop.remove_writer(self._sock_fd)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700750 self._sock.close()
751 if self._waiter is not None:
752 self._waiter.set_exception(exc)
Victor Stinnere912e652014-07-12 03:11:53 +0200753 if isinstance(exc, Exception):
754 return
755 else:
756 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700757
Guido van Rossum355491d2013-10-18 15:17:11 -0700758 self._loop.remove_reader(self._sock_fd)
759 self._loop.remove_writer(self._sock_fd)
760
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700761 peercert = self._sock.getpeercert()
Christian Heimes6d8c1ab2013-12-06 00:23:13 +0100762 if not hasattr(self._sslcontext, 'check_hostname'):
763 # Verify hostname if requested, Python 3.4+ uses check_hostname
764 # and checks the hostname in do_handshake()
765 if (self._server_hostname and
766 self._sslcontext.verify_mode != ssl.CERT_NONE):
767 try:
768 ssl.match_hostname(peercert, self._server_hostname)
769 except Exception as exc:
Victor Stinnere912e652014-07-12 03:11:53 +0200770 if self._loop.get_debug():
771 logger.warning("%r: SSL handshake failed "
772 "on matching the hostname",
773 self, exc_info=True)
Christian Heimes6d8c1ab2013-12-06 00:23:13 +0100774 self._sock.close()
775 if self._waiter is not None:
776 self._waiter.set_exception(exc)
777 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700778
779 # Add extra info that becomes available after handshake.
780 self._extra.update(peercert=peercert,
781 cipher=self._sock.cipher(),
782 compression=self._sock.compression(),
783 )
784
Guido van Rossum2b570162013-11-01 14:18:02 -0700785 self._read_wants_write = False
786 self._write_wants_read = False
787 self._loop.add_reader(self._sock_fd, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700788 self._loop.call_soon(self._protocol.connection_made, self)
789 if self._waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200790 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200791 self._loop.call_soon(self._waiter._set_result_unless_cancelled,
792 None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700793
Victor Stinnere912e652014-07-12 03:11:53 +0200794 if self._loop.get_debug():
795 dt = self._loop.time() - start_time
796 logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
797
Guido van Rossum57497ad2013-10-18 07:58:20 -0700798 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700799 # XXX This is a bit icky, given the comment at the top of
Guido van Rossum2b570162013-11-01 14:18:02 -0700800 # _read_ready(). Is it possible to evoke a deadlock? I don't
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700801 # know, although it doesn't look like it; write() will still
802 # accept more data for the buffer and eventually the app will
Guido van Rossum57497ad2013-10-18 07:58:20 -0700803 # call resume_reading() again, and things will flow again.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700804
Guido van Rossuma5062c52013-11-27 14:12:48 -0800805 if self._closing:
806 raise RuntimeError('Cannot pause_reading() when closing')
807 if self._paused:
808 raise RuntimeError('Already paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700809 self._paused = True
810 self._loop.remove_reader(self._sock_fd)
Victor Stinnere912e652014-07-12 03:11:53 +0200811 if self._loop.get_debug():
812 logger.debug("%r pauses reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700813
Guido van Rossum57497ad2013-10-18 07:58:20 -0700814 def resume_reading(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800815 if not self._paused:
Andrew Svetlov3207a032014-05-27 21:24:43 +0300816 raise RuntimeError('Not paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700817 self._paused = False
818 if self._closing:
819 return
Guido van Rossum2b570162013-11-01 14:18:02 -0700820 self._loop.add_reader(self._sock_fd, self._read_ready)
Victor Stinnere912e652014-07-12 03:11:53 +0200821 if self._loop.get_debug():
822 logger.debug("%r resumes reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700823
Guido van Rossum2b570162013-11-01 14:18:02 -0700824 def _read_ready(self):
825 if self._write_wants_read:
826 self._write_wants_read = False
827 self._write_ready()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700828
Guido van Rossum2b570162013-11-01 14:18:02 -0700829 if self._buffer:
830 self._loop.add_writer(self._sock_fd, self._write_ready)
831
832 try:
833 data = self._sock.recv(self.max_size)
834 except (BlockingIOError, InterruptedError, ssl.SSLWantReadError):
835 pass
836 except ssl.SSLWantWriteError:
837 self._read_wants_write = True
838 self._loop.remove_reader(self._sock_fd)
839 self._loop.add_writer(self._sock_fd, self._write_ready)
840 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100841 self._fatal_error(exc, 'Fatal read error on SSL transport')
Guido van Rossum2b570162013-11-01 14:18:02 -0700842 else:
843 if data:
844 self._protocol.data_received(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700845 else:
Guido van Rossum2b570162013-11-01 14:18:02 -0700846 try:
Victor Stinnere912e652014-07-12 03:11:53 +0200847 if self._loop.get_debug():
848 logger.debug("%r received EOF", self)
Guido van Rossum3a703922013-11-01 14:19:35 -0700849 keep_open = self._protocol.eof_received()
850 if keep_open:
851 logger.warning('returning true from eof_received() '
852 'has no effect when using ssl')
Guido van Rossum2b570162013-11-01 14:18:02 -0700853 finally:
854 self.close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700855
Guido van Rossum2b570162013-11-01 14:18:02 -0700856 def _write_ready(self):
857 if self._read_wants_write:
858 self._read_wants_write = False
859 self._read_ready()
860
861 if not (self._paused or self._closing):
862 self._loop.add_reader(self._sock_fd, self._read_ready)
863
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700864 if self._buffer:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700865 try:
Guido van Rossuma5062c52013-11-27 14:12:48 -0800866 n = self._sock.send(self._buffer)
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100867 except (BlockingIOError, InterruptedError, ssl.SSLWantWriteError):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700868 n = 0
Guido van Rossum2b570162013-11-01 14:18:02 -0700869 except ssl.SSLWantReadError:
870 n = 0
871 self._loop.remove_writer(self._sock_fd)
872 self._write_wants_read = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700873 except Exception as exc:
874 self._loop.remove_writer(self._sock_fd)
Guido van Rossuma5062c52013-11-27 14:12:48 -0800875 self._buffer.clear()
Victor Stinner065ca252014-02-19 01:40:41 +0100876 self._fatal_error(exc, 'Fatal write error on SSL transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700877 return
878
Guido van Rossuma5062c52013-11-27 14:12:48 -0800879 if n:
880 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700881
Guido van Rossum2b570162013-11-01 14:18:02 -0700882 self._maybe_resume_protocol() # May append to buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700883
Guido van Rossum2b570162013-11-01 14:18:02 -0700884 if not self._buffer:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700885 self._loop.remove_writer(self._sock_fd)
Guido van Rossum2b570162013-11-01 14:18:02 -0700886 if self._closing:
887 self._call_connection_lost(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700888
889 def write(self, data):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800890 if not isinstance(data, (bytes, bytearray, memoryview)):
891 raise TypeError('data argument must be byte-ish (%r)',
892 type(data))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700893 if not data:
894 return
895
896 if self._conn_lost:
897 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700898 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700899 self._conn_lost += 1
900 return
901
Guido van Rossum2b570162013-11-01 14:18:02 -0700902 if not self._buffer:
903 self._loop.add_writer(self._sock_fd, self._write_ready)
904
905 # Add it to the buffer.
Guido van Rossuma5062c52013-11-27 14:12:48 -0800906 self._buffer.extend(data)
Guido van Rossum355491d2013-10-18 15:17:11 -0700907 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700908
909 def can_write_eof(self):
910 return False
911
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700912
913class _SelectorDatagramTransport(_SelectorTransport):
914
Guido van Rossuma5062c52013-11-27 14:12:48 -0800915 _buffer_factory = collections.deque
916
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200917 def __init__(self, loop, sock, protocol, address=None,
918 waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700919 super().__init__(loop, sock, protocol, extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700920 self._address = address
921 self._loop.add_reader(self._sock_fd, self._read_ready)
922 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200923 if waiter is not None:
924 # wait until protocol.connection_made() has been called
925 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700926
Guido van Rossum355491d2013-10-18 15:17:11 -0700927 def get_write_buffer_size(self):
928 return sum(len(data) for data, _ in self._buffer)
929
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700930 def _read_ready(self):
931 try:
932 data, addr = self._sock.recvfrom(self.max_size)
933 except (BlockingIOError, InterruptedError):
934 pass
Guido van Rossum2335de72013-11-15 16:51:48 -0800935 except OSError as exc:
936 self._protocol.error_received(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700937 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100938 self._fatal_error(exc, 'Fatal read error on datagram transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700939 else:
940 self._protocol.datagram_received(data, addr)
941
942 def sendto(self, data, addr=None):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800943 if not isinstance(data, (bytes, bytearray, memoryview)):
944 raise TypeError('data argument must be byte-ish (%r)',
945 type(data))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700946 if not data:
947 return
948
Guido van Rossuma5062c52013-11-27 14:12:48 -0800949 if self._address and addr not in (None, self._address):
950 raise ValueError('Invalid address: must be None or %s' %
951 (self._address,))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700952
953 if self._conn_lost and self._address:
954 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700955 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700956 self._conn_lost += 1
957 return
958
959 if not self._buffer:
960 # Attempt to send it right away first.
961 try:
962 if self._address:
963 self._sock.send(data)
964 else:
965 self._sock.sendto(data, addr)
966 return
Guido van Rossum2546a172013-10-18 10:10:36 -0700967 except (BlockingIOError, InterruptedError):
968 self._loop.add_writer(self._sock_fd, self._sendto_ready)
Guido van Rossum2335de72013-11-15 16:51:48 -0800969 except OSError as exc:
970 self._protocol.error_received(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700971 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700972 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100973 self._fatal_error(exc,
974 'Fatal write error on datagram transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700975 return
976
Guido van Rossuma5062c52013-11-27 14:12:48 -0800977 # Ensure that what we buffer is immutable.
978 self._buffer.append((bytes(data), addr))
Guido van Rossum355491d2013-10-18 15:17:11 -0700979 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700980
981 def _sendto_ready(self):
982 while self._buffer:
983 data, addr = self._buffer.popleft()
984 try:
985 if self._address:
986 self._sock.send(data)
987 else:
988 self._sock.sendto(data, addr)
Guido van Rossum2546a172013-10-18 10:10:36 -0700989 except (BlockingIOError, InterruptedError):
990 self._buffer.appendleft((data, addr)) # Try again later.
991 break
Guido van Rossum2335de72013-11-15 16:51:48 -0800992 except OSError as exc:
993 self._protocol.error_received(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700994 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700995 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100996 self._fatal_error(exc,
997 'Fatal write error on datagram transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700998 return
999
Guido van Rossum355491d2013-10-18 15:17:11 -07001000 self._maybe_resume_protocol() # May append to buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001001 if not self._buffer:
1002 self._loop.remove_writer(self._sock_fd)
1003 if self._closing:
1004 self._call_connection_lost(None)