blob: a97709d8a09f7aa5675690e6c3710bcd53477a5c [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Event loop using a selector and related classes.
2
3A selector is a "notify-when-ready" multiplexer. For a subclass which
4also includes support for signal handling, see the unix_events sub-module.
5"""
6
Victor Stinner8dffc452014-01-25 15:32:06 +01007__all__ = ['BaseSelectorEventLoop']
8
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07009import collections
Guido van Rossum3317a132013-11-01 14:12:50 -070010import errno
Victor Stinnerd5aeccf92014-08-31 15:07:57 +020011import functools
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012import socket
13try:
14 import ssl
15except ImportError: # pragma: no cover
16 ssl = None
17
18from . import base_events
19from . import constants
20from . import events
21from . import futures
22from . import selectors
23from . import transports
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070024from .log import logger
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025
26
Victor Stinnere912e652014-07-12 03:11:53 +020027def _test_selector_event(selector, fd, event):
28 # Test if the selector is monitoring 'event' events
29 # for the file descriptor 'fd'.
30 try:
31 key = selector.get_key(fd)
32 except KeyError:
33 return False
34 else:
35 return bool(key.events & event)
36
37
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070038class BaseSelectorEventLoop(base_events.BaseEventLoop):
39 """Selector event loop.
40
41 See events.EventLoop for API specification.
42 """
43
44 def __init__(self, selector=None):
45 super().__init__()
46
47 if selector is None:
48 selector = selectors.DefaultSelector()
Guido van Rossumfc29e0f2013-10-17 15:39:45 -070049 logger.debug('Using selector: %s', selector.__class__.__name__)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070050 self._selector = selector
51 self._make_self_pipe()
52
53 def _make_socket_transport(self, sock, protocol, waiter=None, *,
54 extra=None, server=None):
55 return _SelectorSocketTransport(self, sock, protocol, waiter,
56 extra, server)
57
58 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter, *,
59 server_side=False, server_hostname=None,
60 extra=None, server=None):
61 return _SelectorSslTransport(
62 self, rawsock, protocol, sslcontext, waiter,
63 server_side, server_hostname, extra, server)
64
65 def _make_datagram_transport(self, sock, protocol,
Victor Stinnerbfff45d2014-07-08 23:57:31 +020066 address=None, waiter=None, extra=None):
67 return _SelectorDatagramTransport(self, sock, protocol,
68 address, waiter, extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070069
70 def close(self):
Victor Stinner956de692014-12-26 21:07:52 +010071 if self.is_running():
Victor Stinner5e631202014-11-21 00:23:27 +010072 raise RuntimeError("Cannot close a running event loop")
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +020073 if self.is_closed():
74 return
75 self._close_self_pipe()
Victor Stinner5e631202014-11-21 00:23:27 +010076 super().close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070077 if self._selector is not None:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070078 self._selector.close()
79 self._selector = None
80
81 def _socketpair(self):
82 raise NotImplementedError
83
84 def _close_self_pipe(self):
85 self.remove_reader(self._ssock.fileno())
86 self._ssock.close()
87 self._ssock = None
88 self._csock.close()
89 self._csock = None
90 self._internal_fds -= 1
91
92 def _make_self_pipe(self):
93 # A self-socket, really. :-)
94 self._ssock, self._csock = self._socketpair()
95 self._ssock.setblocking(False)
96 self._csock.setblocking(False)
97 self._internal_fds += 1
98 self.add_reader(self._ssock.fileno(), self._read_from_self)
99
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200100 def _process_self_data(self, data):
101 pass
102
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700103 def _read_from_self(self):
Victor Stinner54c4b8e2014-06-19 12:59:15 +0200104 while True:
105 try:
106 data = self._ssock.recv(4096)
107 if not data:
108 break
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200109 self._process_self_data(data)
Victor Stinner54c4b8e2014-06-19 12:59:15 +0200110 except InterruptedError:
111 continue
112 except BlockingIOError:
113 break
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700114
115 def _write_to_self(self):
Guido van Rossum3d139d82014-05-06 14:42:40 -0700116 # This may be called from a different thread, possibly after
117 # _close_self_pipe() has been called or even while it is
118 # running. Guard for self._csock being None or closed. When
119 # a socket is closed, send() raises OSError (with errno set to
120 # EBADF, but let's not rely on the exact error code).
121 csock = self._csock
122 if csock is not None:
123 try:
Victor Stinnerfe5649c2014-07-17 22:43:40 +0200124 csock.send(b'\0')
Guido van Rossum3d139d82014-05-06 14:42:40 -0700125 except OSError:
Victor Stinner65dd69a2014-07-25 22:36:05 +0200126 if self._debug:
127 logger.debug("Fail to write a null byte into the "
128 "self-pipe socket",
129 exc_info=True)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700130
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700131 def _start_serving(self, protocol_factory, sock,
132 sslcontext=None, server=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700133 self.add_reader(sock.fileno(), self._accept_connection,
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700134 protocol_factory, sock, sslcontext, server)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700135
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700136 def _accept_connection(self, protocol_factory, sock,
137 sslcontext=None, server=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700138 try:
139 conn, addr = sock.accept()
Victor Stinnere912e652014-07-12 03:11:53 +0200140 if self._debug:
141 logger.debug("%r got a new connection from %r: %r",
142 server, addr, conn)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700143 conn.setblocking(False)
Guido van Rossum3317a132013-11-01 14:12:50 -0700144 except (BlockingIOError, InterruptedError, ConnectionAbortedError):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700145 pass # False alarm.
Guido van Rossum3317a132013-11-01 14:12:50 -0700146 except OSError as exc:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700147 # There's nowhere to send the error, so just log it.
148 # TODO: Someone will want an error handler for this.
Guido van Rossum3317a132013-11-01 14:12:50 -0700149 if exc.errno in (errno.EMFILE, errno.ENFILE,
150 errno.ENOBUFS, errno.ENOMEM):
151 # Some platforms (e.g. Linux keep reporting the FD as
152 # ready, so we remove the read handler temporarily.
153 # We'll try again in a while.
Yury Selivanovff827f02014-02-18 18:02:19 -0500154 self.call_exception_handler({
155 'message': 'socket.accept() out of system resource',
156 'exception': exc,
157 'socket': sock,
158 })
Guido van Rossum3317a132013-11-01 14:12:50 -0700159 self.remove_reader(sock.fileno())
160 self.call_later(constants.ACCEPT_RETRY_DELAY,
161 self._start_serving,
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700162 protocol_factory, sock, sslcontext, server)
Guido van Rossum3317a132013-11-01 14:12:50 -0700163 else:
164 raise # The event loop will catch, log and ignore it.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700165 else:
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700166 if sslcontext:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700167 self._make_ssl_transport(
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700168 conn, protocol_factory(), sslcontext, None,
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700169 server_side=True, extra={'peername': addr}, server=server)
170 else:
171 self._make_socket_transport(
172 conn, protocol_factory(), extra={'peername': addr},
173 server=server)
174 # It's now up to the protocol to handle the connection.
175
176 def add_reader(self, fd, callback, *args):
177 """Add a reader callback."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200178 self._check_closed()
Yury Selivanovff827f02014-02-18 18:02:19 -0500179 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700180 try:
181 key = self._selector.get_key(fd)
182 except KeyError:
183 self._selector.register(fd, selectors.EVENT_READ,
184 (handle, None))
185 else:
186 mask, (reader, writer) = key.events, key.data
187 self._selector.modify(fd, mask | selectors.EVENT_READ,
188 (handle, writer))
189 if reader is not None:
190 reader.cancel()
191
192 def remove_reader(self, fd):
193 """Remove a reader callback."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200194 if self.is_closed():
Victor Stinnereeeebcd2014-03-06 00:52:53 +0100195 return False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700196 try:
197 key = self._selector.get_key(fd)
198 except KeyError:
199 return False
200 else:
201 mask, (reader, writer) = key.events, key.data
202 mask &= ~selectors.EVENT_READ
203 if not mask:
204 self._selector.unregister(fd)
205 else:
206 self._selector.modify(fd, mask, (None, writer))
207
208 if reader is not None:
209 reader.cancel()
210 return True
211 else:
212 return False
213
214 def add_writer(self, fd, callback, *args):
215 """Add a writer callback.."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200216 self._check_closed()
Yury Selivanovff827f02014-02-18 18:02:19 -0500217 handle = events.Handle(callback, args, self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700218 try:
219 key = self._selector.get_key(fd)
220 except KeyError:
221 self._selector.register(fd, selectors.EVENT_WRITE,
222 (None, handle))
223 else:
224 mask, (reader, writer) = key.events, key.data
225 self._selector.modify(fd, mask | selectors.EVENT_WRITE,
226 (reader, handle))
227 if writer is not None:
228 writer.cancel()
229
230 def remove_writer(self, fd):
231 """Remove a writer callback."""
Victor Stinnerbb2fc5b2014-06-10 10:23:10 +0200232 if self.is_closed():
Victor Stinnereeeebcd2014-03-06 00:52:53 +0100233 return False
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700234 try:
235 key = self._selector.get_key(fd)
236 except KeyError:
237 return False
238 else:
239 mask, (reader, writer) = key.events, key.data
240 # Remove both writer and connector.
241 mask &= ~selectors.EVENT_WRITE
242 if not mask:
243 self._selector.unregister(fd)
244 else:
245 self._selector.modify(fd, mask, (reader, None))
246
247 if writer is not None:
248 writer.cancel()
249 return True
250 else:
251 return False
252
253 def sock_recv(self, sock, n):
Victor Stinnerd1432092014-06-19 17:11:49 +0200254 """Receive data from the socket.
255
256 The return value is a bytes object representing the data received.
257 The maximum amount of data to be received at once is specified by
258 nbytes.
259
260 This method is a coroutine.
261 """
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200262 if self.get_debug() and sock.gettimeout() != 0:
263 raise ValueError("the socket must be non-blocking")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700264 fut = futures.Future(loop=self)
265 self._sock_recv(fut, False, sock, n)
266 return fut
267
268 def _sock_recv(self, fut, registered, sock, n):
Victor Stinner28773462014-02-13 09:24:37 +0100269 # _sock_recv() can add itself as an I/O callback if the operation can't
Yury Selivanovb0b0e622014-02-18 22:27:48 -0500270 # be done immediately. Don't use it directly, call sock_recv().
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700271 fd = sock.fileno()
272 if registered:
273 # Remove the callback early. It should be rare that the
274 # selector says the fd is ready but the call still returns
275 # EAGAIN, and I am willing to take a hit in that case in
276 # order to simplify the common case.
277 self.remove_reader(fd)
278 if fut.cancelled():
279 return
280 try:
281 data = sock.recv(n)
282 except (BlockingIOError, InterruptedError):
283 self.add_reader(fd, self._sock_recv, fut, True, sock, n)
284 except Exception as exc:
285 fut.set_exception(exc)
286 else:
287 fut.set_result(data)
288
289 def sock_sendall(self, sock, data):
Victor Stinnerd1432092014-06-19 17:11:49 +0200290 """Send data to the socket.
291
292 The socket must be connected to a remote socket. This method continues
293 to send data from data until either all data has been sent or an
294 error occurs. None is returned on success. On error, an exception is
295 raised, and there is no way to determine how much data, if any, was
296 successfully processed by the receiving end of the connection.
297
298 This method is a coroutine.
299 """
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200300 if self.get_debug() and sock.gettimeout() != 0:
301 raise ValueError("the socket must be non-blocking")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700302 fut = futures.Future(loop=self)
303 if data:
304 self._sock_sendall(fut, False, sock, data)
305 else:
306 fut.set_result(None)
307 return fut
308
309 def _sock_sendall(self, fut, registered, sock, data):
310 fd = sock.fileno()
311
312 if registered:
313 self.remove_writer(fd)
314 if fut.cancelled():
315 return
316
317 try:
318 n = sock.send(data)
319 except (BlockingIOError, InterruptedError):
320 n = 0
321 except Exception as exc:
322 fut.set_exception(exc)
323 return
324
325 if n == len(data):
326 fut.set_result(None)
327 else:
328 if n:
329 data = data[n:]
330 self.add_writer(fd, self._sock_sendall, fut, True, sock, data)
331
332 def sock_connect(self, sock, address):
Victor Stinnerd1432092014-06-19 17:11:49 +0200333 """Connect to a remote socket at address.
334
335 The address must be already resolved to avoid the trap of hanging the
336 entire event loop when the address requires doing a DNS lookup. For
337 example, it must be an IP address, not an hostname, for AF_INET and
338 AF_INET6 address families. Use getaddrinfo() to resolve the hostname
339 asynchronously.
340
341 This method is a coroutine.
342 """
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200343 if self.get_debug() and sock.gettimeout() != 0:
344 raise ValueError("the socket must be non-blocking")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700345 fut = futures.Future(loop=self)
Victor Stinner28773462014-02-13 09:24:37 +0100346 try:
347 base_events._check_resolved_address(sock, address)
348 except ValueError as err:
349 fut.set_exception(err)
350 else:
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200351 self._sock_connect(fut, sock, address)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700352 return fut
353
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200354 def _sock_connect(self, fut, sock, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700355 fd = sock.fileno()
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200356 try:
357 while True:
358 try:
359 sock.connect(address)
360 except InterruptedError:
361 continue
362 else:
363 break
364 except BlockingIOError:
365 fut.add_done_callback(functools.partial(self._sock_connect_done,
366 sock))
367 self.add_writer(fd, self._sock_connect_cb, fut, sock, address)
368 except Exception as exc:
369 fut.set_exception(exc)
370 else:
371 fut.set_result(None)
372
373 def _sock_connect_done(self, sock, fut):
374 self.remove_writer(sock.fileno())
375
376 def _sock_connect_cb(self, fut, sock, address):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700377 if fut.cancelled():
378 return
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200379
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700380 try:
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200381 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
382 if err != 0:
383 # Jump to any except clause below.
384 raise OSError(err, 'Connect call failed %s' % (address,))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700385 except (BlockingIOError, InterruptedError):
Victor Stinnerd5aeccf92014-08-31 15:07:57 +0200386 # socket is still registered, the callback will be retried later
387 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700388 except Exception as exc:
389 fut.set_exception(exc)
390 else:
391 fut.set_result(None)
392
393 def sock_accept(self, sock):
Victor Stinnerd1432092014-06-19 17:11:49 +0200394 """Accept a connection.
395
396 The socket must be bound to an address and listening for connections.
397 The return value is a pair (conn, address) where conn is a new socket
398 object usable to send and receive data on the connection, and address
399 is the address bound to the socket on the other end of the connection.
400
401 This method is a coroutine.
402 """
Victor Stinner9c9f1f12014-07-29 23:08:17 +0200403 if self.get_debug() and sock.gettimeout() != 0:
404 raise ValueError("the socket must be non-blocking")
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700405 fut = futures.Future(loop=self)
406 self._sock_accept(fut, False, sock)
407 return fut
408
409 def _sock_accept(self, fut, registered, sock):
410 fd = sock.fileno()
411 if registered:
412 self.remove_reader(fd)
413 if fut.cancelled():
414 return
415 try:
416 conn, address = sock.accept()
417 conn.setblocking(False)
418 except (BlockingIOError, InterruptedError):
419 self.add_reader(fd, self._sock_accept, fut, True, sock)
420 except Exception as exc:
421 fut.set_exception(exc)
422 else:
423 fut.set_result((conn, address))
424
425 def _process_events(self, event_list):
426 for key, mask in event_list:
427 fileobj, (reader, writer) = key.fileobj, key.data
428 if mask & selectors.EVENT_READ and reader is not None:
429 if reader._cancelled:
430 self.remove_reader(fileobj)
431 else:
432 self._add_callback(reader)
433 if mask & selectors.EVENT_WRITE and writer is not None:
434 if writer._cancelled:
435 self.remove_writer(fileobj)
436 else:
437 self._add_callback(writer)
438
439 def _stop_serving(self, sock):
440 self.remove_reader(sock.fileno())
441 sock.close()
442
443
Yury Selivanovc0982412014-02-18 18:41:13 -0500444class _SelectorTransport(transports._FlowControlMixin,
445 transports.Transport):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700446
447 max_size = 256 * 1024 # Buffer size passed to recv().
448
Guido van Rossuma5062c52013-11-27 14:12:48 -0800449 _buffer_factory = bytearray # Constructs initial value for self._buffer.
450
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700451 def __init__(self, loop, sock, protocol, extra, server=None):
Victor Stinner004adb92014-11-05 15:27:41 +0100452 super().__init__(extra, loop)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700453 self._extra['socket'] = sock
454 self._extra['sockname'] = sock.getsockname()
455 if 'peername' not in self._extra:
456 try:
457 self._extra['peername'] = sock.getpeername()
458 except socket.error:
459 self._extra['peername'] = None
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700460 self._sock = sock
461 self._sock_fd = sock.fileno()
462 self._protocol = protocol
463 self._server = server
Guido van Rossuma5062c52013-11-27 14:12:48 -0800464 self._buffer = self._buffer_factory()
Guido van Rossum2546a172013-10-18 10:10:36 -0700465 self._conn_lost = 0 # Set when call to connection_lost scheduled.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700466 self._closing = False # Set when close() called.
Guido van Rossum355491d2013-10-18 15:17:11 -0700467 if self._server is not None:
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200468 self._server._attach()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700469
Victor Stinnere912e652014-07-12 03:11:53 +0200470 def __repr__(self):
Victor Stinner0e34dc32014-10-12 09:52:11 +0200471 info = [self.__class__.__name__]
472 if self._sock is None:
473 info.append('closed')
474 elif self._closing:
475 info.append('closing')
476 info.append('fd=%s' % self._sock_fd)
Victor Stinnerb2614752014-08-25 23:20:52 +0200477 # test if the transport was closed
478 if self._loop is not None:
479 polling = _test_selector_event(self._loop._selector,
480 self._sock_fd, selectors.EVENT_READ)
481 if polling:
482 info.append('read=polling')
483 else:
484 info.append('read=idle')
Victor Stinnere912e652014-07-12 03:11:53 +0200485
Victor Stinnerb2614752014-08-25 23:20:52 +0200486 polling = _test_selector_event(self._loop._selector,
487 self._sock_fd, selectors.EVENT_WRITE)
488 if polling:
489 state = 'polling'
490 else:
491 state = 'idle'
Victor Stinnere912e652014-07-12 03:11:53 +0200492
Victor Stinnerb2614752014-08-25 23:20:52 +0200493 bufsize = self.get_write_buffer_size()
494 info.append('write=<%s, bufsize=%s>' % (state, bufsize))
Victor Stinnere912e652014-07-12 03:11:53 +0200495 return '<%s>' % ' '.join(info)
496
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700497 def abort(self):
498 self._force_close(None)
499
500 def close(self):
501 if self._closing:
502 return
503 self._closing = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700504 self._loop.remove_reader(self._sock_fd)
505 if not self._buffer:
Guido van Rossum2546a172013-10-18 10:10:36 -0700506 self._conn_lost += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700507 self._loop.call_soon(self._call_connection_lost, None)
508
Victor Stinner065ca252014-02-19 01:40:41 +0100509 def _fatal_error(self, exc, message='Fatal error on transport'):
Guido van Rossum2546a172013-10-18 10:10:36 -0700510 # Should be called from exception handler only.
Victor Stinnere912e652014-07-12 03:11:53 +0200511 if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
512 if self._loop.get_debug():
513 logger.debug("%r: %s", self, message, exc_info=True)
514 else:
Yury Selivanovff827f02014-02-18 18:02:19 -0500515 self._loop.call_exception_handler({
Victor Stinner065ca252014-02-19 01:40:41 +0100516 'message': message,
Yury Selivanovff827f02014-02-18 18:02:19 -0500517 'exception': exc,
518 'transport': self,
519 'protocol': self._protocol,
520 })
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700521 self._force_close(exc)
522
523 def _force_close(self, exc):
Guido van Rossum2546a172013-10-18 10:10:36 -0700524 if self._conn_lost:
525 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700526 if self._buffer:
527 self._buffer.clear()
528 self._loop.remove_writer(self._sock_fd)
Guido van Rossum2546a172013-10-18 10:10:36 -0700529 if not self._closing:
530 self._closing = True
531 self._loop.remove_reader(self._sock_fd)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700532 self._conn_lost += 1
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700533 self._loop.call_soon(self._call_connection_lost, exc)
534
535 def _call_connection_lost(self, exc):
536 try:
537 self._protocol.connection_lost(exc)
538 finally:
539 self._sock.close()
540 self._sock = None
541 self._protocol = None
542 self._loop = None
543 server = self._server
544 if server is not None:
Victor Stinnerb28dbac2014-07-11 22:52:21 +0200545 server._detach()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700546 self._server = None
547
Guido van Rossum355491d2013-10-18 15:17:11 -0700548 def get_write_buffer_size(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800549 return len(self._buffer)
Guido van Rossum355491d2013-10-18 15:17:11 -0700550
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700551
552class _SelectorSocketTransport(_SelectorTransport):
553
554 def __init__(self, loop, sock, protocol, waiter=None,
555 extra=None, server=None):
556 super().__init__(loop, sock, protocol, extra, server)
557 self._eof = False
558 self._paused = False
559
560 self._loop.add_reader(self._sock_fd, self._read_ready)
561 self._loop.call_soon(self._protocol.connection_made, self)
562 if waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200563 # wait until protocol.connection_made() has been called
Victor Stinner799a60c2014-07-07 18:08:22 +0200564 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700565
Guido van Rossum57497ad2013-10-18 07:58:20 -0700566 def pause_reading(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800567 if self._closing:
568 raise RuntimeError('Cannot pause_reading() when closing')
569 if self._paused:
570 raise RuntimeError('Already paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700571 self._paused = True
572 self._loop.remove_reader(self._sock_fd)
Victor Stinnere912e652014-07-12 03:11:53 +0200573 if self._loop.get_debug():
574 logger.debug("%r pauses reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700575
Guido van Rossum57497ad2013-10-18 07:58:20 -0700576 def resume_reading(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800577 if not self._paused:
578 raise RuntimeError('Not paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700579 self._paused = False
580 if self._closing:
581 return
582 self._loop.add_reader(self._sock_fd, self._read_ready)
Victor Stinnere912e652014-07-12 03:11:53 +0200583 if self._loop.get_debug():
584 logger.debug("%r resumes reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700585
586 def _read_ready(self):
587 try:
588 data = self._sock.recv(self.max_size)
589 except (BlockingIOError, InterruptedError):
590 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700591 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100592 self._fatal_error(exc, 'Fatal read error on socket transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700593 else:
594 if data:
595 self._protocol.data_received(data)
596 else:
Victor Stinnere912e652014-07-12 03:11:53 +0200597 if self._loop.get_debug():
598 logger.debug("%r received EOF", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700599 keep_open = self._protocol.eof_received()
Guido van Rossum1f683bb2013-10-30 14:36:58 -0700600 if keep_open:
601 # We're keeping the connection open so the
602 # protocol can write more, but we still can't
603 # receive more, so remove the reader callback.
604 self._loop.remove_reader(self._sock_fd)
605 else:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700606 self.close()
607
608 def write(self, data):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800609 if not isinstance(data, (bytes, bytearray, memoryview)):
610 raise TypeError('data argument must be byte-ish (%r)',
611 type(data))
612 if self._eof:
613 raise RuntimeError('Cannot call write() after write_eof()')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700614 if not data:
615 return
616
617 if self._conn_lost:
618 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700619 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700620 self._conn_lost += 1
621 return
622
623 if not self._buffer:
Guido van Rossum355491d2013-10-18 15:17:11 -0700624 # Optimization: try to send now.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700625 try:
626 n = self._sock.send(data)
627 except (BlockingIOError, InterruptedError):
Guido van Rossum2546a172013-10-18 10:10:36 -0700628 pass
629 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100630 self._fatal_error(exc, 'Fatal write error on socket transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700631 return
632 else:
633 data = data[n:]
634 if not data:
635 return
Guido van Rossum355491d2013-10-18 15:17:11 -0700636 # Not all was written; register write handler.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700637 self._loop.add_writer(self._sock_fd, self._write_ready)
638
Guido van Rossum355491d2013-10-18 15:17:11 -0700639 # Add it to the buffer.
Guido van Rossuma5062c52013-11-27 14:12:48 -0800640 self._buffer.extend(data)
Guido van Rossum355491d2013-10-18 15:17:11 -0700641 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700642
643 def _write_ready(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800644 assert self._buffer, 'Data should not be empty'
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700645
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700646 try:
Guido van Rossuma5062c52013-11-27 14:12:48 -0800647 n = self._sock.send(self._buffer)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700648 except (BlockingIOError, InterruptedError):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800649 pass
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700650 except Exception as exc:
651 self._loop.remove_writer(self._sock_fd)
Guido van Rossuma5062c52013-11-27 14:12:48 -0800652 self._buffer.clear()
Victor Stinner065ca252014-02-19 01:40:41 +0100653 self._fatal_error(exc, 'Fatal write error on socket transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700654 else:
Guido van Rossuma5062c52013-11-27 14:12:48 -0800655 if n:
656 del self._buffer[:n]
Guido van Rossum355491d2013-10-18 15:17:11 -0700657 self._maybe_resume_protocol() # May append to buffer.
658 if not self._buffer:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700659 self._loop.remove_writer(self._sock_fd)
660 if self._closing:
661 self._call_connection_lost(None)
662 elif self._eof:
663 self._sock.shutdown(socket.SHUT_WR)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700664
665 def write_eof(self):
666 if self._eof:
667 return
668 self._eof = True
669 if not self._buffer:
670 self._sock.shutdown(socket.SHUT_WR)
671
672 def can_write_eof(self):
673 return True
674
675
676class _SelectorSslTransport(_SelectorTransport):
677
Guido van Rossuma5062c52013-11-27 14:12:48 -0800678 _buffer_factory = bytearray
679
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700680 def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,
681 server_side=False, server_hostname=None,
682 extra=None, server=None):
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700683 if ssl is None:
684 raise RuntimeError('stdlib ssl module not available')
685
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700686 if server_side:
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700687 if not sslcontext:
688 raise ValueError('Server side ssl needs a valid SSLContext')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700689 else:
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700690 if not sslcontext:
691 # Client side may pass ssl=True to use a default
692 # context; in that case the sslcontext passed is None.
Antoine Pitroufd39a892014-10-15 16:58:21 +0200693 # The default is secure for client connections.
694 if hasattr(ssl, 'create_default_context'):
695 # Python 3.4+: use up-to-date strong settings.
696 sslcontext = ssl.create_default_context()
697 if not server_hostname:
698 sslcontext.check_hostname = False
Guido van Rossum7fa6e1a2013-11-23 15:36:43 -0800699 else:
700 # Fallback for Python 3.3.
701 sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
702 sslcontext.options |= ssl.OP_NO_SSLv2
Antoine Pitroufd39a892014-10-15 16:58:21 +0200703 sslcontext.options |= ssl.OP_NO_SSLv3
Guido van Rossum7fa6e1a2013-11-23 15:36:43 -0800704 sslcontext.set_default_verify_paths()
705 sslcontext.verify_mode = ssl.CERT_REQUIRED
Guido van Rossum28dff0d2013-11-01 14:22:30 -0700706
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700707 wrap_kwargs = {
708 'server_side': server_side,
709 'do_handshake_on_connect': False,
710 }
Benjamin Peterson7243b572014-11-23 17:04:34 -0600711 if server_hostname and not server_side:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700712 wrap_kwargs['server_hostname'] = server_hostname
713 sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)
714
715 super().__init__(loop, sslsock, protocol, extra, server)
716
717 self._server_hostname = server_hostname
718 self._waiter = waiter
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700719 self._sslcontext = sslcontext
720 self._paused = False
721
722 # SSL-specific extra info. (peercert is set later)
723 self._extra.update(sslcontext=sslcontext)
724
Victor Stinnere912e652014-07-12 03:11:53 +0200725 if self._loop.get_debug():
726 logger.debug("%r starts SSL handshake", self)
727 start_time = self._loop.time()
728 else:
729 start_time = None
730 self._on_handshake(start_time)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700731
Victor Stinnere912e652014-07-12 03:11:53 +0200732 def _on_handshake(self, start_time):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700733 try:
734 self._sock.do_handshake()
735 except ssl.SSLWantReadError:
Victor Stinnere912e652014-07-12 03:11:53 +0200736 self._loop.add_reader(self._sock_fd,
737 self._on_handshake, start_time)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700738 return
739 except ssl.SSLWantWriteError:
Victor Stinnere912e652014-07-12 03:11:53 +0200740 self._loop.add_writer(self._sock_fd,
741 self._on_handshake, start_time)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700742 return
743 except BaseException as exc:
Victor Stinnere912e652014-07-12 03:11:53 +0200744 if self._loop.get_debug():
745 logger.warning("%r: SSL handshake failed",
746 self, exc_info=True)
Guido van Rossum355491d2013-10-18 15:17:11 -0700747 self._loop.remove_reader(self._sock_fd)
748 self._loop.remove_writer(self._sock_fd)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700749 self._sock.close()
750 if self._waiter is not None:
751 self._waiter.set_exception(exc)
Victor Stinnere912e652014-07-12 03:11:53 +0200752 if isinstance(exc, Exception):
753 return
754 else:
755 raise
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700756
Guido van Rossum355491d2013-10-18 15:17:11 -0700757 self._loop.remove_reader(self._sock_fd)
758 self._loop.remove_writer(self._sock_fd)
759
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700760 peercert = self._sock.getpeercert()
Christian Heimes6d8c1ab2013-12-06 00:23:13 +0100761 if not hasattr(self._sslcontext, 'check_hostname'):
762 # Verify hostname if requested, Python 3.4+ uses check_hostname
763 # and checks the hostname in do_handshake()
764 if (self._server_hostname and
765 self._sslcontext.verify_mode != ssl.CERT_NONE):
766 try:
767 ssl.match_hostname(peercert, self._server_hostname)
768 except Exception as exc:
Victor Stinnere912e652014-07-12 03:11:53 +0200769 if self._loop.get_debug():
770 logger.warning("%r: SSL handshake failed "
771 "on matching the hostname",
772 self, exc_info=True)
Christian Heimes6d8c1ab2013-12-06 00:23:13 +0100773 self._sock.close()
774 if self._waiter is not None:
775 self._waiter.set_exception(exc)
776 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700777
778 # Add extra info that becomes available after handshake.
779 self._extra.update(peercert=peercert,
780 cipher=self._sock.cipher(),
781 compression=self._sock.compression(),
782 )
783
Guido van Rossum2b570162013-11-01 14:18:02 -0700784 self._read_wants_write = False
785 self._write_wants_read = False
786 self._loop.add_reader(self._sock_fd, self._read_ready)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700787 self._loop.call_soon(self._protocol.connection_made, self)
788 if self._waiter is not None:
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200789 # wait until protocol.connection_made() has been called
Victor Stinnera9acbe82014-07-05 15:29:41 +0200790 self._loop.call_soon(self._waiter._set_result_unless_cancelled,
791 None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700792
Victor Stinnere912e652014-07-12 03:11:53 +0200793 if self._loop.get_debug():
794 dt = self._loop.time() - start_time
795 logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)
796
Guido van Rossum57497ad2013-10-18 07:58:20 -0700797 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700798 # XXX This is a bit icky, given the comment at the top of
Guido van Rossum2b570162013-11-01 14:18:02 -0700799 # _read_ready(). Is it possible to evoke a deadlock? I don't
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700800 # know, although it doesn't look like it; write() will still
801 # accept more data for the buffer and eventually the app will
Guido van Rossum57497ad2013-10-18 07:58:20 -0700802 # call resume_reading() again, and things will flow again.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700803
Guido van Rossuma5062c52013-11-27 14:12:48 -0800804 if self._closing:
805 raise RuntimeError('Cannot pause_reading() when closing')
806 if self._paused:
807 raise RuntimeError('Already paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700808 self._paused = True
809 self._loop.remove_reader(self._sock_fd)
Victor Stinnere912e652014-07-12 03:11:53 +0200810 if self._loop.get_debug():
811 logger.debug("%r pauses reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700812
Guido van Rossum57497ad2013-10-18 07:58:20 -0700813 def resume_reading(self):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800814 if not self._paused:
Andrew Svetlov3207a032014-05-27 21:24:43 +0300815 raise RuntimeError('Not paused')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700816 self._paused = False
817 if self._closing:
818 return
Guido van Rossum2b570162013-11-01 14:18:02 -0700819 self._loop.add_reader(self._sock_fd, self._read_ready)
Victor Stinnere912e652014-07-12 03:11:53 +0200820 if self._loop.get_debug():
821 logger.debug("%r resumes reading", self)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700822
Guido van Rossum2b570162013-11-01 14:18:02 -0700823 def _read_ready(self):
824 if self._write_wants_read:
825 self._write_wants_read = False
826 self._write_ready()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700827
Guido van Rossum2b570162013-11-01 14:18:02 -0700828 if self._buffer:
829 self._loop.add_writer(self._sock_fd, self._write_ready)
830
831 try:
832 data = self._sock.recv(self.max_size)
833 except (BlockingIOError, InterruptedError, ssl.SSLWantReadError):
834 pass
835 except ssl.SSLWantWriteError:
836 self._read_wants_write = True
837 self._loop.remove_reader(self._sock_fd)
838 self._loop.add_writer(self._sock_fd, self._write_ready)
839 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100840 self._fatal_error(exc, 'Fatal read error on SSL transport')
Guido van Rossum2b570162013-11-01 14:18:02 -0700841 else:
842 if data:
843 self._protocol.data_received(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700844 else:
Guido van Rossum2b570162013-11-01 14:18:02 -0700845 try:
Victor Stinnere912e652014-07-12 03:11:53 +0200846 if self._loop.get_debug():
847 logger.debug("%r received EOF", self)
Guido van Rossum3a703922013-11-01 14:19:35 -0700848 keep_open = self._protocol.eof_received()
849 if keep_open:
850 logger.warning('returning true from eof_received() '
851 'has no effect when using ssl')
Guido van Rossum2b570162013-11-01 14:18:02 -0700852 finally:
853 self.close()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700854
Guido van Rossum2b570162013-11-01 14:18:02 -0700855 def _write_ready(self):
856 if self._read_wants_write:
857 self._read_wants_write = False
858 self._read_ready()
859
860 if not (self._paused or self._closing):
861 self._loop.add_reader(self._sock_fd, self._read_ready)
862
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700863 if self._buffer:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700864 try:
Guido van Rossuma5062c52013-11-27 14:12:48 -0800865 n = self._sock.send(self._buffer)
Victor Stinnerc89c8a72014-02-26 17:35:30 +0100866 except (BlockingIOError, InterruptedError, ssl.SSLWantWriteError):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700867 n = 0
Guido van Rossum2b570162013-11-01 14:18:02 -0700868 except ssl.SSLWantReadError:
869 n = 0
870 self._loop.remove_writer(self._sock_fd)
871 self._write_wants_read = True
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700872 except Exception as exc:
873 self._loop.remove_writer(self._sock_fd)
Guido van Rossuma5062c52013-11-27 14:12:48 -0800874 self._buffer.clear()
Victor Stinner065ca252014-02-19 01:40:41 +0100875 self._fatal_error(exc, 'Fatal write error on SSL transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700876 return
877
Guido van Rossuma5062c52013-11-27 14:12:48 -0800878 if n:
879 del self._buffer[:n]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700880
Guido van Rossum2b570162013-11-01 14:18:02 -0700881 self._maybe_resume_protocol() # May append to buffer.
Guido van Rossum355491d2013-10-18 15:17:11 -0700882
Guido van Rossum2b570162013-11-01 14:18:02 -0700883 if not self._buffer:
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700884 self._loop.remove_writer(self._sock_fd)
Guido van Rossum2b570162013-11-01 14:18:02 -0700885 if self._closing:
886 self._call_connection_lost(None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700887
888 def write(self, data):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800889 if not isinstance(data, (bytes, bytearray, memoryview)):
890 raise TypeError('data argument must be byte-ish (%r)',
891 type(data))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700892 if not data:
893 return
894
895 if self._conn_lost:
896 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700897 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700898 self._conn_lost += 1
899 return
900
Guido van Rossum2b570162013-11-01 14:18:02 -0700901 if not self._buffer:
902 self._loop.add_writer(self._sock_fd, self._write_ready)
903
904 # Add it to the buffer.
Guido van Rossuma5062c52013-11-27 14:12:48 -0800905 self._buffer.extend(data)
Guido van Rossum355491d2013-10-18 15:17:11 -0700906 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700907
908 def can_write_eof(self):
909 return False
910
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700911
912class _SelectorDatagramTransport(_SelectorTransport):
913
Guido van Rossuma5062c52013-11-27 14:12:48 -0800914 _buffer_factory = collections.deque
915
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200916 def __init__(self, loop, sock, protocol, address=None,
917 waiter=None, extra=None):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700918 super().__init__(loop, sock, protocol, extra)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700919 self._address = address
920 self._loop.add_reader(self._sock_fd, self._read_ready)
921 self._loop.call_soon(self._protocol.connection_made, self)
Victor Stinnerbfff45d2014-07-08 23:57:31 +0200922 if waiter is not None:
923 # wait until protocol.connection_made() has been called
924 self._loop.call_soon(waiter._set_result_unless_cancelled, None)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700925
Guido van Rossum355491d2013-10-18 15:17:11 -0700926 def get_write_buffer_size(self):
927 return sum(len(data) for data, _ in self._buffer)
928
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700929 def _read_ready(self):
930 try:
931 data, addr = self._sock.recvfrom(self.max_size)
932 except (BlockingIOError, InterruptedError):
933 pass
Guido van Rossum2335de72013-11-15 16:51:48 -0800934 except OSError as exc:
935 self._protocol.error_received(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700936 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100937 self._fatal_error(exc, 'Fatal read error on datagram transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700938 else:
939 self._protocol.datagram_received(data, addr)
940
941 def sendto(self, data, addr=None):
Guido van Rossuma5062c52013-11-27 14:12:48 -0800942 if not isinstance(data, (bytes, bytearray, memoryview)):
943 raise TypeError('data argument must be byte-ish (%r)',
944 type(data))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700945 if not data:
946 return
947
Guido van Rossuma5062c52013-11-27 14:12:48 -0800948 if self._address and addr not in (None, self._address):
949 raise ValueError('Invalid address: must be None or %s' %
950 (self._address,))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700951
952 if self._conn_lost and self._address:
953 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
Guido van Rossumfc29e0f2013-10-17 15:39:45 -0700954 logger.warning('socket.send() raised exception.')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700955 self._conn_lost += 1
956 return
957
958 if not self._buffer:
959 # Attempt to send it right away first.
960 try:
961 if self._address:
962 self._sock.send(data)
963 else:
964 self._sock.sendto(data, addr)
965 return
Guido van Rossum2546a172013-10-18 10:10:36 -0700966 except (BlockingIOError, InterruptedError):
967 self._loop.add_writer(self._sock_fd, self._sendto_ready)
Guido van Rossum2335de72013-11-15 16:51:48 -0800968 except OSError as exc:
969 self._protocol.error_received(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700970 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700971 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100972 self._fatal_error(exc,
973 'Fatal write error on datagram transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700974 return
975
Guido van Rossuma5062c52013-11-27 14:12:48 -0800976 # Ensure that what we buffer is immutable.
977 self._buffer.append((bytes(data), addr))
Guido van Rossum355491d2013-10-18 15:17:11 -0700978 self._maybe_pause_protocol()
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700979
980 def _sendto_ready(self):
981 while self._buffer:
982 data, addr = self._buffer.popleft()
983 try:
984 if self._address:
985 self._sock.send(data)
986 else:
987 self._sock.sendto(data, addr)
Guido van Rossum2546a172013-10-18 10:10:36 -0700988 except (BlockingIOError, InterruptedError):
989 self._buffer.appendleft((data, addr)) # Try again later.
990 break
Guido van Rossum2335de72013-11-15 16:51:48 -0800991 except OSError as exc:
992 self._protocol.error_received(exc)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700993 return
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700994 except Exception as exc:
Victor Stinner065ca252014-02-19 01:40:41 +0100995 self._fatal_error(exc,
996 'Fatal write error on datagram transport')
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700997 return
998
Guido van Rossum355491d2013-10-18 15:17:11 -0700999 self._maybe_resume_protocol() # May append to buffer.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001000 if not self._buffer:
1001 self._loop.remove_writer(self._sock_fd)
1002 if self._closing:
1003 self._call_connection_lost(None)