blob: 4d3e5d9ede8c694777ef1187127facfcc3daf1ce [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop using a selector and related classes.
2
3A selector is a "notify-when-ready" multiplexer. For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
6
Victor Stinner8dffc452014-01-25 15:32:06 +01007__all__ = ['BaseSelectorEventLoop']
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import collections
Guido van Rossum3317a132013-11-01 14:12:50 -070010import errno
Victor Stinnerd5aeccf92014-08-31 15:07:57 +020011import functools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import socket
Victor Stinner231b4042015-01-14 00:19:09 +010013import sys
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014try:
15 import ssl
16except ImportError: # pragma: no cover
17 ssl = None
18
19from . import base_events
20from . import constants
21from . import events
22from . import futures
23from . import selectors
24from . import transports
Victor Stinner231b4042015-01-14 00:19:09 +010025from . import sslproto
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070026from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
28
Victor Stinnere912e652014-07-12 03:11:53 +020029def _test_selector_event(selector, fd, event):
30 # Test if the selector is monitoring 'event' events
31 # for the file descriptor 'fd'.
32 try:
33 key = selector.get_key(fd)
34 except KeyError:
35 return False
36 else:
37 return bool(key.events & event)
38
39
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070040class BaseSelectorEventLoop(base_events.BaseEventLoop):
41 """Selector event loop.
42
43 See events.EventLoop for API specification.
44 """
45
46 def __init__(self, selector=None):
47 super().__init__()
48
49 if selector is None:
50 selector = selectors.DefaultSelector()
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070051 logger.debug('Using selector: %s', selector.__class__.__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070052 self._selector = selector
53 self._make_self_pipe()
54
55 def _make_socket_transport(self, sock, protocol, waiter=None, *,
56 extra=None, server=None):
57 return _SelectorSocketTransport(self, sock, protocol, waiter,
58 extra, server)
59
Victor Stinner15cc6782015-01-09 00:09:10 +010060 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,
61 *, server_side=False, server_hostname=None,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070062 extra=None, server=None):
Victor Stinner231b4042015-01-14 00:19:09 +010063 if not sslproto._is_sslproto_available():
64 return self._make_legacy_ssl_transport(
65 rawsock, protocol, sslcontext, waiter,
66 server_side=server_side, server_hostname=server_hostname,
67 extra=extra, server=server)
68
69 ssl_protocol = sslproto.SSLProtocol(self, protocol, sslcontext, waiter,
70 server_side, server_hostname)
71 _SelectorSocketTransport(self, rawsock, ssl_protocol,
72 extra=extra, server=server)
73 return ssl_protocol._app_transport
74
75 def _make_legacy_ssl_transport(self, rawsock, protocol, sslcontext,
76 waiter, *,
77 server_side=False, server_hostname=None,
78 extra=None, server=None):
79 # Use the legacy API: SSL_write, SSL_read, etc. The legacy API is used
80 # on Python 3.4 and older, when ssl.MemoryBIO is not available.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081 return _SelectorSslTransport(
82 self, rawsock, protocol, sslcontext, waiter,
83 server_side, server_hostname, extra, server)
84
85 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +020086 address=None, waiter=None, extra=None):
87 return _SelectorDatagramTransport(self, sock, protocol,
88 address, waiter, extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070089
90 def close(self):
Victor Stinner956de692014-12-26 21:07:52 +010091 if self.is_running():
Victor Stinner5e631202014-11-21 00:23:27 +010092 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +020093 if self.is_closed():
94 return
95 self._close_self_pipe()
Victor Stinner5e631202014-11-21 00:23:27 +010096 super().close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070097 if self._selector is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098 self._selector.close()
99 self._selector = None
100
101 def _socketpair(self):
102 raise NotImplementedError
103
104 def _close_self_pipe(self):
105 self.remove_reader(self._ssock.fileno())
106 self._ssock.close()
107 self._ssock = None
108 self._csock.close()
109 self._csock = None
110 self._internal_fds -= 1
111
112 def _make_self_pipe(self):
113 # A self-socket, really. :-)
114 self._ssock, self._csock = self._socketpair()
115 self._ssock.setblocking(False)
116 self._csock.setblocking(False)
117 self._internal_fds += 1
118 self.add_reader(self._ssock.fileno(), self._read_from_self)
119
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200120 def _process_self_data(self, data):
121 pass
122
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700123 def _read_from_self(self):
Victor Stinner54c4b8e2014-06-19 12:59:15 +0200124 while True:
125 try:
126 data = self._ssock.recv(4096)
127 if not data:
128 break
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200129 self._process_self_data(data)
Victor Stinner54c4b8e2014-06-19 12:59:15 +0200130 except InterruptedError:
131 continue
132 except BlockingIOError:
133 break
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700134
135 def _write_to_self(self):
Guido van Rossum3d139d82014-05-06 14:42:40 -0700136 # This may be called from a different thread, possibly after
137 # _close_self_pipe() has been called or even while it is
138 # running. Guard for self._csock being None or closed. When
139 # a socket is closed, send() raises OSError (with errno set to
140 # EBADF, but let's not rely on the exact error code).
141 csock = self._csock
142 if csock is not None:
143 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200144 csock.send(b'\0')
Guido van Rossum3d139d82014-05-06 14:42:40 -0700145 except OSError:
Victor Stinner65dd69a2014-07-25 22:36:05 +0200146 if self._debug:
147 logger.debug("Fail to write a null byte into the "
148 "self-pipe socket",
149 exc_info=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700150
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700151 def _start_serving(self, protocol_factory, sock,
152 sslcontext=None, server=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700153 self.add_reader(sock.fileno(), self._accept_connection,
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700154 protocol_factory, sock, sslcontext, server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700155
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700156 def _accept_connection(self, protocol_factory, sock,
157 sslcontext=None, server=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158 try:
159 conn, addr = sock.accept()
Victor Stinnere912e652014-07-12 03:11:53 +0200160 if self._debug:
161 logger.debug("%r got a new connection from %r: %r",
162 server, addr, conn)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700163 conn.setblocking(False)
Guido van Rossum3317a132013-11-01 14:12:50 -0700164 except (BlockingIOError, InterruptedError, ConnectionAbortedError):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700165 pass # False alarm.
Guido van Rossum3317a132013-11-01 14:12:50 -0700166 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 # There's nowhere to send the error, so just log it.
Guido van Rossum3317a132013-11-01 14:12:50 -0700168 if exc.errno in (errno.EMFILE, errno.ENFILE,
169 errno.ENOBUFS, errno.ENOMEM):
170 # Some platforms (e.g. Linux keep reporting the FD as
171 # ready, so we remove the read handler temporarily.
172 # We'll try again in a while.
Yury Selivanovff827f02014-02-18 18:02:19 -0500173 self.call_exception_handler({
174 'message': 'socket.accept() out of system resource',
175 'exception': exc,
176 'socket': sock,
177 })
Guido van Rossum3317a132013-11-01 14:12:50 -0700178 self.remove_reader(sock.fileno())
179 self.call_later(constants.ACCEPT_RETRY_DELAY,
180 self._start_serving,
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700181 protocol_factory, sock, sslcontext, server)
Guido van Rossum3317a132013-11-01 14:12:50 -0700182 else:
183 raise # The event loop will catch, log and ignore it.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700184 else:
Victor Stinner29ad0112015-01-15 00:04:21 +0100185 protocol = protocol_factory()
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700186 if sslcontext:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700187 self._make_ssl_transport(
Victor Stinner29ad0112015-01-15 00:04:21 +0100188 conn, protocol, sslcontext,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700189 server_side=True, extra={'peername': addr}, server=server)
190 else:
191 self._make_socket_transport(
Victor Stinner29ad0112015-01-15 00:04:21 +0100192 conn, protocol , extra={'peername': addr},
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700193 server=server)
194 # It's now up to the protocol to handle the connection.
195
196 def add_reader(self, fd, callback, *args):
197 """Add a reader callback."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200198 self._check_closed()
Yury Selivanovff827f02014-02-18 18:02:19 -0500199 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700200 try:
201 key = self._selector.get_key(fd)
202 except KeyError:
203 self._selector.register(fd, selectors.EVENT_READ,
204 (handle, None))
205 else:
206 mask, (reader, writer) = key.events, key.data
207 self._selector.modify(fd, mask | selectors.EVENT_READ,
208 (handle, writer))
209 if reader is not None:
210 reader.cancel()
211
212 def remove_reader(self, fd):
213 """Remove a reader callback."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200214 if self.is_closed():
Victor Stinnereeeebcd2014-03-06 00:52:53 +0100215 return False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700216 try:
217 key = self._selector.get_key(fd)
218 except KeyError:
219 return False
220 else:
221 mask, (reader, writer) = key.events, key.data
222 mask &= ~selectors.EVENT_READ
223 if not mask:
224 self._selector.unregister(fd)
225 else:
226 self._selector.modify(fd, mask, (None, writer))
227
228 if reader is not None:
229 reader.cancel()
230 return True
231 else:
232 return False
233
234 def add_writer(self, fd, callback, *args):
235 """Add a writer callback.."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200236 self._check_closed()
Yury Selivanovff827f02014-02-18 18:02:19 -0500237 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700238 try:
239 key = self._selector.get_key(fd)
240 except KeyError:
241 self._selector.register(fd, selectors.EVENT_WRITE,
242 (None, handle))
243 else:
244 mask, (reader, writer) = key.events, key.data
245 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
246 (reader, handle))
247 if writer is not None:
248 writer.cancel()
249
250 def remove_writer(self, fd):
251 """Remove a writer callback."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200252 if self.is_closed():
Victor Stinnereeeebcd2014-03-06 00:52:53 +0100253 return False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700254 try:
255 key = self._selector.get_key(fd)
256 except KeyError:
257 return False
258 else:
259 mask, (reader, writer) = key.events, key.data
260 # Remove both writer and connector.
261 mask &= ~selectors.EVENT_WRITE
262 if not mask:
263 self._selector.unregister(fd)
264 else:
265 self._selector.modify(fd, mask, (reader, None))
266
267 if writer is not None:
268 writer.cancel()
269 return True
270 else:
271 return False
272
273 def sock_recv(self, sock, n):
Victor Stinnerd1432092014-06-19 17:11:49 +0200274 """Receive data from the socket.
275
276 The return value is a bytes object representing the data received.
277 The maximum amount of data to be received at once is specified by
278 nbytes.
279
280 This method is a coroutine.
281 """
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200282 if self.get_debug() and sock.gettimeout() != 0:
283 raise ValueError("the socket must be non-blocking")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700284 fut = futures.Future(loop=self)
285 self._sock_recv(fut, False, sock, n)
286 return fut
287
288 def _sock_recv(self, fut, registered, sock, n):
Victor Stinner28773462014-02-13 09:24:37 +0100289 # _sock_recv() can add itself as an I/O callback if the operation can't
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500290 # be done immediately. Don't use it directly, call sock_recv().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700291 fd = sock.fileno()
292 if registered:
293 # Remove the callback early. It should be rare that the
294 # selector says the fd is ready but the call still returns
295 # EAGAIN, and I am willing to take a hit in that case in
296 # order to simplify the common case.
297 self.remove_reader(fd)
298 if fut.cancelled():
299 return
300 try:
301 data = sock.recv(n)
302 except (BlockingIOError, InterruptedError):
303 self.add_reader(fd, self._sock_recv, fut, True, sock, n)
304 except Exception as exc:
305 fut.set_exception(exc)
306 else:
307 fut.set_result(data)
308
309 def sock_sendall(self, sock, data):
Victor Stinnerd1432092014-06-19 17:11:49 +0200310 """Send data to the socket.
311
312 The socket must be connected to a remote socket. This method continues
313 to send data from data until either all data has been sent or an
314 error occurs. None is returned on success. On error, an exception is
315 raised, and there is no way to determine how much data, if any, was
316 successfully processed by the receiving end of the connection.
317
318 This method is a coroutine.
319 """
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200320 if self.get_debug() and sock.gettimeout() != 0:
321 raise ValueError("the socket must be non-blocking")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700322 fut = futures.Future(loop=self)
323 if data:
324 self._sock_sendall(fut, False, sock, data)
325 else:
326 fut.set_result(None)
327 return fut
328
329 def _sock_sendall(self, fut, registered, sock, data):
330 fd = sock.fileno()
331
332 if registered:
333 self.remove_writer(fd)
334 if fut.cancelled():
335 return
336
337 try:
338 n = sock.send(data)
339 except (BlockingIOError, InterruptedError):
340 n = 0
341 except Exception as exc:
342 fut.set_exception(exc)
343 return
344
345 if n == len(data):
346 fut.set_result(None)
347 else:
348 if n:
349 data = data[n:]
350 self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
351
352 def sock_connect(self, sock, address):
Victor Stinnerd1432092014-06-19 17:11:49 +0200353 """Connect to a remote socket at address.
354
355 The address must be already resolved to avoid the trap of hanging the
356 entire event loop when the address requires doing a DNS lookup. For
357 example, it must be an IP address, not an hostname, for AF_INET and
358 AF_INET6 address families. Use getaddrinfo() to resolve the hostname
359 asynchronously.
360
361 This method is a coroutine.
362 """
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200363 if self.get_debug() and sock.gettimeout() != 0:
364 raise ValueError("the socket must be non-blocking")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700365 fut = futures.Future(loop=self)
Victor Stinner28773462014-02-13 09:24:37 +0100366 try:
367 base_events._check_resolved_address(sock, address)
368 except ValueError as err:
369 fut.set_exception(err)
370 else:
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200371 self._sock_connect(fut, sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700372 return fut
373
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200374 def _sock_connect(self, fut, sock, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700375 fd = sock.fileno()
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200376 try:
377 while True:
378 try:
379 sock.connect(address)
380 except InterruptedError:
381 continue
382 else:
383 break
384 except BlockingIOError:
385 fut.add_done_callback(functools.partial(self._sock_connect_done,
Victor Stinner3531d902015-01-09 01:42:52 +0100386 fd))
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200387 self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
388 except Exception as exc:
389 fut.set_exception(exc)
390 else:
391 fut.set_result(None)
392
Victor Stinner3531d902015-01-09 01:42:52 +0100393 def _sock_connect_done(self, fd, fut):
394 self.remove_writer(fd)
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200395
396 def _sock_connect_cb(self, fut, sock, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700397 if fut.cancelled():
398 return
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200399
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700400 try:
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200401 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
402 if err != 0:
403 # Jump to any except clause below.
404 raise OSError(err, 'Connect call failed %s' % (address,))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405 except (BlockingIOError, InterruptedError):
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200406 # socket is still registered, the callback will be retried later
407 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700408 except Exception as exc:
409 fut.set_exception(exc)
410 else:
411 fut.set_result(None)
412
413 def sock_accept(self, sock):
Victor Stinnerd1432092014-06-19 17:11:49 +0200414 """Accept a connection.
415
416 The socket must be bound to an address and listening for connections.
417 The return value is a pair (conn, address) where conn is a new socket
418 object usable to send and receive data on the connection, and address
419 is the address bound to the socket on the other end of the connection.
420
421 This method is a coroutine.
422 """
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200423 if self.get_debug() and sock.gettimeout() != 0:
424 raise ValueError("the socket must be non-blocking")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700425 fut = futures.Future(loop=self)
426 self._sock_accept(fut, False, sock)
427 return fut
428
429 def _sock_accept(self, fut, registered, sock):
430 fd = sock.fileno()
431 if registered:
432 self.remove_reader(fd)
433 if fut.cancelled():
434 return
435 try:
436 conn, address = sock.accept()
437 conn.setblocking(False)
438 except (BlockingIOError, InterruptedError):
439 self.add_reader(fd, self._sock_accept, fut, True, sock)
440 except Exception as exc:
441 fut.set_exception(exc)
442 else:
443 fut.set_result((conn, address))
444
445 def _process_events(self, event_list):
446 for key, mask in event_list:
447 fileobj, (reader, writer) = key.fileobj, key.data
448 if mask & selectors.EVENT_READ and reader is not None:
449 if reader._cancelled:
450 self.remove_reader(fileobj)
451 else:
452 self._add_callback(reader)
453 if mask & selectors.EVENT_WRITE and writer is not None:
454 if writer._cancelled:
455 self.remove_writer(fileobj)
456 else:
457 self._add_callback(writer)
458
459 def _stop_serving(self, sock):
460 self.remove_reader(sock.fileno())
461 sock.close()
462
463
Yury Selivanovc0982412014-02-18 18:41:13 -0500464class _SelectorTransport(transports._FlowControlMixin,
465 transports.Transport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466
467 max_size = 256 * 1024 # Buffer size passed to recv().
468
Guido van Rossuma5062c52013-11-27 14:12:48 -0800469 _buffer_factory = bytearray # Constructs initial value for self._buffer.
470
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700471 def __init__(self, loop, sock, protocol, extra, server=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100472 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700473 self._extra['socket'] = sock
474 self._extra['sockname'] = sock.getsockname()
475 if 'peername' not in self._extra:
476 try:
477 self._extra['peername'] = sock.getpeername()
478 except socket.error:
479 self._extra['peername'] = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700480 self._sock = sock
481 self._sock_fd = sock.fileno()
482 self._protocol = protocol
483 self._server = server
Guido van Rossuma5062c52013-11-27 14:12:48 -0800484 self._buffer = self._buffer_factory()
Guido van Rossum2546a172013-10-18 10:10:36 -0700485 self._conn_lost = 0 # Set when call to connection_lost scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700486 self._closing = False # Set when close() called.
Guido van Rossum355491d2013-10-18 15:17:11 -0700487 if self._server is not None:
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200488 self._server._attach()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700489
Victor Stinnere912e652014-07-12 03:11:53 +0200490 def __repr__(self):
Victor Stinner0e34dc32014-10-12 09:52:11 +0200491 info = [self.__class__.__name__]
492 if self._sock is None:
493 info.append('closed')
494 elif self._closing:
495 info.append('closing')
496 info.append('fd=%s' % self._sock_fd)
Victor Stinnerb2614752014-08-25 23:20:52 +0200497 # test if the transport was closed
498 if self._loop is not None:
499 polling = _test_selector_event(self._loop._selector,
500 self._sock_fd, selectors.EVENT_READ)
501 if polling:
502 info.append('read=polling')
503 else:
504 info.append('read=idle')
Victor Stinnere912e652014-07-12 03:11:53 +0200505
Victor Stinnerb2614752014-08-25 23:20:52 +0200506 polling = _test_selector_event(self._loop._selector,
Victor Stinner15cc6782015-01-09 00:09:10 +0100507 self._sock_fd,
508 selectors.EVENT_WRITE)
Victor Stinnerb2614752014-08-25 23:20:52 +0200509 if polling:
510 state = 'polling'
511 else:
512 state = 'idle'
Victor Stinnere912e652014-07-12 03:11:53 +0200513
Victor Stinnerb2614752014-08-25 23:20:52 +0200514 bufsize = self.get_write_buffer_size()
515 info.append('write=<%s, bufsize=%s>' % (state, bufsize))
Victor Stinnere912e652014-07-12 03:11:53 +0200516 return '<%s>' % ' '.join(info)
517
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700518 def abort(self):
519 self._force_close(None)
520
521 def close(self):
522 if self._closing:
523 return
524 self._closing = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700525 self._loop.remove_reader(self._sock_fd)
526 if not self._buffer:
Guido van Rossum2546a172013-10-18 10:10:36 -0700527 self._conn_lost += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700528 self._loop.call_soon(self._call_connection_lost, None)
529
Victor Stinner065ca252014-02-19 01:40:41 +0100530 def _fatal_error(self, exc, message='Fatal error on transport'):
Guido van Rossum2546a172013-10-18 10:10:36 -0700531 # Should be called from exception handler only.
Victor Stinner231b4042015-01-14 00:19:09 +0100532 if isinstance(exc, (BrokenPipeError,
533 ConnectionResetError, ConnectionAbortedError)):
Victor Stinnere912e652014-07-12 03:11:53 +0200534 if self._loop.get_debug():
535 logger.debug("%r: %s", self, message, exc_info=True)
536 else:
Yury Selivanovff827f02014-02-18 18:02:19 -0500537 self._loop.call_exception_handler({
Victor Stinner065ca252014-02-19 01:40:41 +0100538 'message': message,
Yury Selivanovff827f02014-02-18 18:02:19 -0500539 'exception': exc,
540 'transport': self,
541 'protocol': self._protocol,
542 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700543 self._force_close(exc)
544
545 def _force_close(self, exc):
Guido van Rossum2546a172013-10-18 10:10:36 -0700546 if self._conn_lost:
547 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700548 if self._buffer:
549 self._buffer.clear()
550 self._loop.remove_writer(self._sock_fd)
Guido van Rossum2546a172013-10-18 10:10:36 -0700551 if not self._closing:
552 self._closing = True
553 self._loop.remove_reader(self._sock_fd)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700554 self._conn_lost += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700555 self._loop.call_soon(self._call_connection_lost, exc)
556
557 def _call_connection_lost(self, exc):
558 try:
559 self._protocol.connection_lost(exc)
560 finally:
561 self._sock.close()
562 self._sock = None
563 self._protocol = None
564 self._loop = None
565 server = self._server
566 if server is not None:
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200567 server._detach()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700568 self._server = None
569
Guido van Rossum355491d2013-10-18 15:17:11 -0700570 def get_write_buffer_size(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800571 return len(self._buffer)
Guido van Rossum355491d2013-10-18 15:17:11 -0700572
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700573
574class _SelectorSocketTransport(_SelectorTransport):
575
576 def __init__(self, loop, sock, protocol, waiter=None,
577 extra=None, server=None):
578 super().__init__(loop, sock, protocol, extra, server)
579 self._eof = False
580 self._paused = False
581
582 self._loop.add_reader(self._sock_fd, self._read_ready)
583 self._loop.call_soon(self._protocol.connection_made, self)
584 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200585 # wait until protocol.connection_made() has been called
Victor Stinner799a60c2014-07-07 18:08:22 +0200586 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700587
Guido van Rossum57497ad2013-10-18 07:58:20 -0700588 def pause_reading(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800589 if self._closing:
590 raise RuntimeError('Cannot pause_reading() when closing')
591 if self._paused:
592 raise RuntimeError('Already paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700593 self._paused = True
594 self._loop.remove_reader(self._sock_fd)
Victor Stinnere912e652014-07-12 03:11:53 +0200595 if self._loop.get_debug():
596 logger.debug("%r pauses reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700597
Guido van Rossum57497ad2013-10-18 07:58:20 -0700598 def resume_reading(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800599 if not self._paused:
600 raise RuntimeError('Not paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700601 self._paused = False
602 if self._closing:
603 return
604 self._loop.add_reader(self._sock_fd, self._read_ready)
Victor Stinnere912e652014-07-12 03:11:53 +0200605 if self._loop.get_debug():
606 logger.debug("%r resumes reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700607
608 def _read_ready(self):
609 try:
610 data = self._sock.recv(self.max_size)
611 except (BlockingIOError, InterruptedError):
612 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700613 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100614 self._fatal_error(exc, 'Fatal read error on socket transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700615 else:
616 if data:
617 self._protocol.data_received(data)
618 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200619 if self._loop.get_debug():
620 logger.debug("%r received EOF", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700621 keep_open = self._protocol.eof_received()
Guido van Rossum1f683bb2013-10-30 14:36:58 -0700622 if keep_open:
623 # We're keeping the connection open so the
624 # protocol can write more, but we still can't
625 # receive more, so remove the reader callback.
626 self._loop.remove_reader(self._sock_fd)
627 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700628 self.close()
629
630 def write(self, data):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800631 if not isinstance(data, (bytes, bytearray, memoryview)):
632 raise TypeError('data argument must be byte-ish (%r)',
633 type(data))
634 if self._eof:
635 raise RuntimeError('Cannot call write() after write_eof()')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700636 if not data:
637 return
638
639 if self._conn_lost:
640 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700641 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700642 self._conn_lost += 1
643 return
644
645 if not self._buffer:
Guido van Rossum355491d2013-10-18 15:17:11 -0700646 # Optimization: try to send now.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700647 try:
648 n = self._sock.send(data)
649 except (BlockingIOError, InterruptedError):
Guido van Rossum2546a172013-10-18 10:10:36 -0700650 pass
651 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100652 self._fatal_error(exc, 'Fatal write error on socket transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700653 return
654 else:
655 data = data[n:]
656 if not data:
657 return
Guido van Rossum355491d2013-10-18 15:17:11 -0700658 # Not all was written; register write handler.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700659 self._loop.add_writer(self._sock_fd, self._write_ready)
660
Guido van Rossum355491d2013-10-18 15:17:11 -0700661 # Add it to the buffer.
Guido van Rossuma5062c52013-11-27 14:12:48 -0800662 self._buffer.extend(data)
Guido van Rossum355491d2013-10-18 15:17:11 -0700663 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700664
665 def _write_ready(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800666 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700667
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700668 try:
Guido van Rossuma5062c52013-11-27 14:12:48 -0800669 n = self._sock.send(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700670 except (BlockingIOError, InterruptedError):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800671 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700672 except Exception as exc:
673 self._loop.remove_writer(self._sock_fd)
Guido van Rossuma5062c52013-11-27 14:12:48 -0800674 self._buffer.clear()
Victor Stinner065ca252014-02-19 01:40:41 +0100675 self._fatal_error(exc, 'Fatal write error on socket transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700676 else:
Guido van Rossuma5062c52013-11-27 14:12:48 -0800677 if n:
678 del self._buffer[:n]
Guido van Rossum355491d2013-10-18 15:17:11 -0700679 self._maybe_resume_protocol() # May append to buffer.
680 if not self._buffer:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700681 self._loop.remove_writer(self._sock_fd)
682 if self._closing:
683 self._call_connection_lost(None)
684 elif self._eof:
685 self._sock.shutdown(socket.SHUT_WR)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686
687 def write_eof(self):
688 if self._eof:
689 return
690 self._eof = True
691 if not self._buffer:
692 self._sock.shutdown(socket.SHUT_WR)
693
694 def can_write_eof(self):
695 return True
696
697
698class _SelectorSslTransport(_SelectorTransport):
699
Guido van Rossuma5062c52013-11-27 14:12:48 -0800700 _buffer_factory = bytearray
701
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700702 def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,
703 server_side=False, server_hostname=None,
704 extra=None, server=None):
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700705 if ssl is None:
706 raise RuntimeError('stdlib ssl module not available')
707
Victor Stinner231b4042015-01-14 00:19:09 +0100708 if not sslcontext:
709 sslcontext = sslproto._create_transport_context(server_side, server_hostname)
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700710
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700711 wrap_kwargs = {
712 'server_side': server_side,
713 'do_handshake_on_connect': False,
714 }
Benjamin Peterson7243b572014-11-23 17:04:34 -0600715 if server_hostname and not server_side:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700716 wrap_kwargs['server_hostname'] = server_hostname
717 sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)
718
719 super().__init__(loop, sslsock, protocol, extra, server)
720
721 self._server_hostname = server_hostname
722 self._waiter = waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700723 self._sslcontext = sslcontext
724 self._paused = False
725
726 # SSL-specific extra info. (peercert is set later)
727 self._extra.update(sslcontext=sslcontext)
728
Victor Stinnere912e652014-07-12 03:11:53 +0200729 if self._loop.get_debug():
730 logger.debug("%r starts SSL handshake", self)
731 start_time = self._loop.time()
732 else:
733 start_time = None
734 self._on_handshake(start_time)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700735
Victor Stinnere912e652014-07-12 03:11:53 +0200736 def _on_handshake(self, start_time):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700737 try:
738 self._sock.do_handshake()
739 except ssl.SSLWantReadError:
Victor Stinnere912e652014-07-12 03:11:53 +0200740 self._loop.add_reader(self._sock_fd,
741 self._on_handshake, start_time)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700742 return
743 except ssl.SSLWantWriteError:
Victor Stinnere912e652014-07-12 03:11:53 +0200744 self._loop.add_writer(self._sock_fd,
745 self._on_handshake, start_time)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700746 return
747 except BaseException as exc:
Victor Stinnere912e652014-07-12 03:11:53 +0200748 if self._loop.get_debug():
749 logger.warning("%r: SSL handshake failed",
750 self, exc_info=True)
Guido van Rossum355491d2013-10-18 15:17:11 -0700751 self._loop.remove_reader(self._sock_fd)
752 self._loop.remove_writer(self._sock_fd)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700753 self._sock.close()
Victor Stinner177e9f02015-01-14 16:56:20 +0100754 if self._waiter is not None and not self._waiter.cancelled():
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700755 self._waiter.set_exception(exc)
Victor Stinnere912e652014-07-12 03:11:53 +0200756 if isinstance(exc, Exception):
757 return
758 else:
759 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700760
Guido van Rossum355491d2013-10-18 15:17:11 -0700761 self._loop.remove_reader(self._sock_fd)
762 self._loop.remove_writer(self._sock_fd)
763
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700764 peercert = self._sock.getpeercert()
Christian Heimes6d8c1ab2013-12-06 00:23:13 +0100765 if not hasattr(self._sslcontext, 'check_hostname'):
766 # Verify hostname if requested, Python 3.4+ uses check_hostname
767 # and checks the hostname in do_handshake()
768 if (self._server_hostname and
769 self._sslcontext.verify_mode != ssl.CERT_NONE):
770 try:
771 ssl.match_hostname(peercert, self._server_hostname)
772 except Exception as exc:
Victor Stinnere912e652014-07-12 03:11:53 +0200773 if self._loop.get_debug():
774 logger.warning("%r: SSL handshake failed "
775 "on matching the hostname",
776 self, exc_info=True)
Christian Heimes6d8c1ab2013-12-06 00:23:13 +0100777 self._sock.close()
Victor Stinnerb92626d2015-01-14 17:13:28 +0100778 if (self._waiter is not None
779 and not self._waiter.cancelled()):
Christian Heimes6d8c1ab2013-12-06 00:23:13 +0100780 self._waiter.set_exception(exc)
781 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700782
783 # Add extra info that becomes available after handshake.
784 self._extra.update(peercert=peercert,
785 cipher=self._sock.cipher(),
786 compression=self._sock.compression(),
787 )
788
Guido van Rossum2b570162013-11-01 14:18:02 -0700789 self._read_wants_write = False
790 self._write_wants_read = False
791 self._loop.add_reader(self._sock_fd, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700792 self._loop.call_soon(self._protocol.connection_made, self)
793 if self._waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200794 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200795 self._loop.call_soon(self._waiter._set_result_unless_cancelled,
796 None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700797
Victor Stinnere912e652014-07-12 03:11:53 +0200798 if self._loop.get_debug():
799 dt = self._loop.time() - start_time
800 logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
801
Guido van Rossum57497ad2013-10-18 07:58:20 -0700802 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700803 # XXX This is a bit icky, given the comment at the top of
Guido van Rossum2b570162013-11-01 14:18:02 -0700804 # _read_ready(). Is it possible to evoke a deadlock? I don't
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700805 # know, although it doesn't look like it; write() will still
806 # accept more data for the buffer and eventually the app will
Guido van Rossum57497ad2013-10-18 07:58:20 -0700807 # call resume_reading() again, and things will flow again.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700808
Guido van Rossuma5062c52013-11-27 14:12:48 -0800809 if self._closing:
810 raise RuntimeError('Cannot pause_reading() when closing')
811 if self._paused:
812 raise RuntimeError('Already paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700813 self._paused = True
814 self._loop.remove_reader(self._sock_fd)
Victor Stinnere912e652014-07-12 03:11:53 +0200815 if self._loop.get_debug():
816 logger.debug("%r pauses reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700817
Guido van Rossum57497ad2013-10-18 07:58:20 -0700818 def resume_reading(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800819 if not self._paused:
Andrew Svetlov3207a032014-05-27 21:24:43 +0300820 raise RuntimeError('Not paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700821 self._paused = False
822 if self._closing:
823 return
Guido van Rossum2b570162013-11-01 14:18:02 -0700824 self._loop.add_reader(self._sock_fd, self._read_ready)
Victor Stinnere912e652014-07-12 03:11:53 +0200825 if self._loop.get_debug():
826 logger.debug("%r resumes reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700827
Guido van Rossum2b570162013-11-01 14:18:02 -0700828 def _read_ready(self):
829 if self._write_wants_read:
830 self._write_wants_read = False
831 self._write_ready()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700832
Guido van Rossum2b570162013-11-01 14:18:02 -0700833 if self._buffer:
834 self._loop.add_writer(self._sock_fd, self._write_ready)
835
836 try:
837 data = self._sock.recv(self.max_size)
838 except (BlockingIOError, InterruptedError, ssl.SSLWantReadError):
839 pass
840 except ssl.SSLWantWriteError:
841 self._read_wants_write = True
842 self._loop.remove_reader(self._sock_fd)
843 self._loop.add_writer(self._sock_fd, self._write_ready)
844 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100845 self._fatal_error(exc, 'Fatal read error on SSL transport')
Guido van Rossum2b570162013-11-01 14:18:02 -0700846 else:
847 if data:
848 self._protocol.data_received(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700849 else:
Guido van Rossum2b570162013-11-01 14:18:02 -0700850 try:
Victor Stinnere912e652014-07-12 03:11:53 +0200851 if self._loop.get_debug():
852 logger.debug("%r received EOF", self)
Guido van Rossum3a703922013-11-01 14:19:35 -0700853 keep_open = self._protocol.eof_received()
854 if keep_open:
855 logger.warning('returning true from eof_received() '
856 'has no effect when using ssl')
Guido van Rossum2b570162013-11-01 14:18:02 -0700857 finally:
858 self.close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700859
Guido van Rossum2b570162013-11-01 14:18:02 -0700860 def _write_ready(self):
861 if self._read_wants_write:
862 self._read_wants_write = False
863 self._read_ready()
864
865 if not (self._paused or self._closing):
866 self._loop.add_reader(self._sock_fd, self._read_ready)
867
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700868 if self._buffer:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700869 try:
Guido van Rossuma5062c52013-11-27 14:12:48 -0800870 n = self._sock.send(self._buffer)
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100871 except (BlockingIOError, InterruptedError, ssl.SSLWantWriteError):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700872 n = 0
Guido van Rossum2b570162013-11-01 14:18:02 -0700873 except ssl.SSLWantReadError:
874 n = 0
875 self._loop.remove_writer(self._sock_fd)
876 self._write_wants_read = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700877 except Exception as exc:
878 self._loop.remove_writer(self._sock_fd)
Guido van Rossuma5062c52013-11-27 14:12:48 -0800879 self._buffer.clear()
Victor Stinner065ca252014-02-19 01:40:41 +0100880 self._fatal_error(exc, 'Fatal write error on SSL transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700881 return
882
Guido van Rossuma5062c52013-11-27 14:12:48 -0800883 if n:
884 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700885
Guido van Rossum2b570162013-11-01 14:18:02 -0700886 self._maybe_resume_protocol() # May append to buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700887
Guido van Rossum2b570162013-11-01 14:18:02 -0700888 if not self._buffer:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700889 self._loop.remove_writer(self._sock_fd)
Guido van Rossum2b570162013-11-01 14:18:02 -0700890 if self._closing:
891 self._call_connection_lost(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700892
893 def write(self, data):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800894 if not isinstance(data, (bytes, bytearray, memoryview)):
895 raise TypeError('data argument must be byte-ish (%r)',
896 type(data))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700897 if not data:
898 return
899
900 if self._conn_lost:
901 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700902 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700903 self._conn_lost += 1
904 return
905
Guido van Rossum2b570162013-11-01 14:18:02 -0700906 if not self._buffer:
907 self._loop.add_writer(self._sock_fd, self._write_ready)
908
909 # Add it to the buffer.
Guido van Rossuma5062c52013-11-27 14:12:48 -0800910 self._buffer.extend(data)
Guido van Rossum355491d2013-10-18 15:17:11 -0700911 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700912
913 def can_write_eof(self):
914 return False
915
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700916
917class _SelectorDatagramTransport(_SelectorTransport):
918
Guido van Rossuma5062c52013-11-27 14:12:48 -0800919 _buffer_factory = collections.deque
920
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200921 def __init__(self, loop, sock, protocol, address=None,
922 waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700923 super().__init__(loop, sock, protocol, extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700924 self._address = address
925 self._loop.add_reader(self._sock_fd, self._read_ready)
926 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200927 if waiter is not None:
928 # wait until protocol.connection_made() has been called
929 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700930
Guido van Rossum355491d2013-10-18 15:17:11 -0700931 def get_write_buffer_size(self):
932 return sum(len(data) for data, _ in self._buffer)
933
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700934 def _read_ready(self):
935 try:
936 data, addr = self._sock.recvfrom(self.max_size)
937 except (BlockingIOError, InterruptedError):
938 pass
Guido van Rossum2335de72013-11-15 16:51:48 -0800939 except OSError as exc:
940 self._protocol.error_received(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700941 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100942 self._fatal_error(exc, 'Fatal read error on datagram transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700943 else:
944 self._protocol.datagram_received(data, addr)
945
946 def sendto(self, data, addr=None):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800947 if not isinstance(data, (bytes, bytearray, memoryview)):
948 raise TypeError('data argument must be byte-ish (%r)',
949 type(data))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700950 if not data:
951 return
952
Guido van Rossuma5062c52013-11-27 14:12:48 -0800953 if self._address and addr not in (None, self._address):
954 raise ValueError('Invalid address: must be None or %s' %
955 (self._address,))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700956
957 if self._conn_lost and self._address:
958 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700959 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700960 self._conn_lost += 1
961 return
962
963 if not self._buffer:
964 # Attempt to send it right away first.
965 try:
966 if self._address:
967 self._sock.send(data)
968 else:
969 self._sock.sendto(data, addr)
970 return
Guido van Rossum2546a172013-10-18 10:10:36 -0700971 except (BlockingIOError, InterruptedError):
972 self._loop.add_writer(self._sock_fd, self._sendto_ready)
Guido van Rossum2335de72013-11-15 16:51:48 -0800973 except OSError as exc:
974 self._protocol.error_received(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700975 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700976 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100977 self._fatal_error(exc,
978 'Fatal write error on datagram transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700979 return
980
Guido van Rossuma5062c52013-11-27 14:12:48 -0800981 # Ensure that what we buffer is immutable.
982 self._buffer.append((bytes(data), addr))
Guido van Rossum355491d2013-10-18 15:17:11 -0700983 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700984
985 def _sendto_ready(self):
986 while self._buffer:
987 data, addr = self._buffer.popleft()
988 try:
989 if self._address:
990 self._sock.send(data)
991 else:
992 self._sock.sendto(data, addr)
Guido van Rossum2546a172013-10-18 10:10:36 -0700993 except (BlockingIOError, InterruptedError):
994 self._buffer.appendleft((data, addr)) # Try again later.
995 break
Guido van Rossum2335de72013-11-15 16:51:48 -0800996 except OSError as exc:
997 self._protocol.error_received(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700998 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700999 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +01001000 self._fatal_error(exc,
1001 'Fatal write error on datagram transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001002 return
1003
Guido van Rossum355491d2013-10-18 15:17:11 -07001004 self._maybe_resume_protocol() # May append to buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001005 if not self._buffer:
1006 self._loop.remove_writer(self._sock_fd)
1007 if self._closing:
1008 self._call_connection_lost(None)