blob: 0db0875715619d04b8c4a2f7a5fdbe934bb03981 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Abstract Transport class."""
2
Victor Stinner71080fc2015-07-25 02:23:21 +02003from asyncio import compat
Guido van Rossumf10345e2013-12-02 18:36:30 -08004
Guido van Rossumac97bf42013-12-20 14:16:21 -08005__all__ = ['BaseTransport', 'ReadTransport', 'WriteTransport',
6 'Transport', 'DatagramTransport', 'SubprocessTransport',
7 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
9
10class BaseTransport:
Guido van Rossum9204af42013-11-30 15:35:42 -080011 """Base class for transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13 def __init__(self, extra=None):
14 if extra is None:
15 extra = {}
16 self._extra = extra
17
18 def get_extra_info(self, name, default=None):
19 """Get optional transport information."""
20 return self._extra.get(name, default)
21
Yury Selivanov5bb1afb2015-11-16 12:43:21 -050022 def is_closing(self):
23 """Return True if the transport is closing or closed."""
24 raise NotImplementedError
25
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026 def close(self):
Antoine Pitroudec43382013-11-23 12:30:00 +010027 """Close the transport.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
29 Buffered data will be flushed asynchronously. No more data
30 will be received. After all buffered data is flushed, the
31 protocol's connection_lost() method will (eventually) called
32 with None as its argument.
33 """
34 raise NotImplementedError
35
Yury Selivanova05a6ef2016-09-11 21:11:02 -040036 def set_protocol(self, protocol):
37 """Set a new protocol."""
38 raise NotImplementedError
39
40 def get_protocol(self):
41 """Return the current protocol."""
42 raise NotImplementedError
43
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070044
45class ReadTransport(BaseTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -080046 """Interface for read-only transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047
Guido van Rossum57497ad2013-10-18 07:58:20 -070048 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070049 """Pause the receiving end.
50
51 No data will be passed to the protocol's data_received()
Guido van Rossum57497ad2013-10-18 07:58:20 -070052 method until resume_reading() is called.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070053 """
54 raise NotImplementedError
55
Guido van Rossum57497ad2013-10-18 07:58:20 -070056 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070057 """Resume the receiving end.
58
59 Data received will once again be passed to the protocol's
60 data_received() method.
61 """
62 raise NotImplementedError
63
64
65class WriteTransport(BaseTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -080066 """Interface for write-only transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070067
Guido van Rossum355491d2013-10-18 15:17:11 -070068 def set_write_buffer_limits(self, high=None, low=None):
69 """Set the high- and low-water limits for write flow control.
70
71 These two values control when to call the protocol's
72 pause_writing() and resume_writing() methods. If specified,
73 the low-water limit must be less than or equal to the
74 high-water limit. Neither value can be negative.
75
76 The defaults are implementation-specific. If only the
Serhiy Storchakad65c9492015-11-02 14:10:23 +020077 high-water limit is given, the low-water limit defaults to an
Guido van Rossum355491d2013-10-18 15:17:11 -070078 implementation-specific value less than or equal to the
79 high-water limit. Setting high to zero forces low to zero as
80 well, and causes pause_writing() to be called whenever the
81 buffer becomes non-empty. Setting low to zero causes
82 resume_writing() to be called only once the buffer is empty.
83 Use of zero for either limit is generally sub-optimal as it
84 reduces opportunities for doing I/O and computation
85 concurrently.
86 """
87 raise NotImplementedError
88
89 def get_write_buffer_size(self):
90 """Return the current size of the write buffer."""
91 raise NotImplementedError
92
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070093 def write(self, data):
94 """Write some data bytes to the transport.
95
96 This does not block; it buffers the data and arranges for it
97 to be sent out asynchronously.
98 """
99 raise NotImplementedError
100
101 def writelines(self, list_of_data):
102 """Write a list (or any iterable) of data bytes to the transport.
103
Guido van Rossumf10345e2013-12-02 18:36:30 -0800104 The default implementation concatenates the arguments and
105 calls write() on the result.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106 """
Victor Stinner71080fc2015-07-25 02:23:21 +0200107 data = compat.flatten_list_bytes(list_of_data)
108 self.write(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700109
110 def write_eof(self):
Antoine Pitroudec43382013-11-23 12:30:00 +0100111 """Close the write end after flushing buffered data.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700112
113 (This is like typing ^D into a UNIX program reading from stdin.)
114
115 Data may still be received.
116 """
117 raise NotImplementedError
118
119 def can_write_eof(self):
Antoine Pitroudec43382013-11-23 12:30:00 +0100120 """Return True if this transport supports write_eof(), False if not."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700121 raise NotImplementedError
122
123 def abort(self):
Guido van Rossum488b0da2013-11-23 11:51:09 -0800124 """Close the transport immediately.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700125
126 Buffered data will be lost. No more data will be received.
127 The protocol's connection_lost() method will (eventually) be
128 called with None as its argument.
129 """
130 raise NotImplementedError
131
132
133class Transport(ReadTransport, WriteTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -0800134 """Interface representing a bidirectional transport.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700135
136 There may be several implementations, but typically, the user does
137 not implement new transports; rather, the platform provides some
138 useful transports that are implemented using the platform's best
139 practices.
140
141 The user never instantiates a transport directly; they call a
142 utility function, passing it a protocol factory and other
143 information necessary to create the transport and protocol. (E.g.
144 EventLoop.create_connection() or EventLoop.create_server().)
145
146 The utility function will asynchronously create a transport and a
147 protocol and hook them up by calling the protocol's
148 connection_made() method, passing it the transport.
149
150 The implementation here raises NotImplemented for every method
151 except writelines(), which calls write() in a loop.
152 """
153
154
155class DatagramTransport(BaseTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -0800156 """Interface for datagram (UDP) transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700157
158 def sendto(self, data, addr=None):
159 """Send data to the transport.
160
161 This does not block; it buffers the data and arranges for it
162 to be sent out asynchronously.
163 addr is target socket address.
164 If addr is None use target address pointed on transport creation.
165 """
166 raise NotImplementedError
167
168 def abort(self):
Antoine Pitroudec43382013-11-23 12:30:00 +0100169 """Close the transport immediately.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700170
171 Buffered data will be lost. No more data will be received.
172 The protocol's connection_lost() method will (eventually) be
173 called with None as its argument.
174 """
175 raise NotImplementedError
176
177
178class SubprocessTransport(BaseTransport):
179
180 def get_pid(self):
181 """Get subprocess id."""
182 raise NotImplementedError
183
184 def get_returncode(self):
185 """Get subprocess returncode.
186
187 See also
188 http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode
189 """
190 raise NotImplementedError
191
192 def get_pipe_transport(self, fd):
193 """Get transport for pipe with number fd."""
194 raise NotImplementedError
195
196 def send_signal(self, signal):
197 """Send signal to subprocess.
198
199 See also:
200 docs.python.org/3/library/subprocess#subprocess.Popen.send_signal
201 """
202 raise NotImplementedError
203
204 def terminate(self):
205 """Stop the subprocess.
206
207 Alias for close() method.
208
209 On Posix OSs the method sends SIGTERM to the subprocess.
210 On Windows the Win32 API function TerminateProcess()
211 is called to stop the subprocess.
212
213 See also:
214 http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate
215 """
216 raise NotImplementedError
217
218 def kill(self):
219 """Kill the subprocess.
220
221 On Posix OSs the function sends SIGKILL to the subprocess.
222 On Windows kill() is an alias for terminate().
223
224 See also:
225 http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
226 """
227 raise NotImplementedError
Yury Selivanov3cb99142014-02-18 18:41:13 -0500228
229
230class _FlowControlMixin(Transport):
231 """All the logic for (write) flow control in a mix-in base class.
232
233 The subclass must implement get_write_buffer_size(). It must call
234 _maybe_pause_protocol() whenever the write buffer size increases,
235 and _maybe_resume_protocol() whenever it decreases. It may also
236 override set_write_buffer_limits() (e.g. to specify different
237 defaults).
238
239 The subclass constructor must call super().__init__(extra). This
240 will call set_write_buffer_limits().
241
242 The user may call set_write_buffer_limits() and
243 get_write_buffer_size(), and their protocol's pause_writing() and
244 resume_writing() may be called.
245 """
246
Victor Stinner004adb92014-11-05 15:27:41 +0100247 def __init__(self, extra=None, loop=None):
Yury Selivanov3cb99142014-02-18 18:41:13 -0500248 super().__init__(extra)
Victor Stinner004adb92014-11-05 15:27:41 +0100249 assert loop is not None
250 self._loop = loop
Yury Selivanov3cb99142014-02-18 18:41:13 -0500251 self._protocol_paused = False
Yury Selivanov15899202014-02-19 11:10:52 -0500252 self._set_write_buffer_limits()
Yury Selivanov3cb99142014-02-18 18:41:13 -0500253
254 def _maybe_pause_protocol(self):
255 size = self.get_write_buffer_size()
256 if size <= self._high_water:
257 return
258 if not self._protocol_paused:
259 self._protocol_paused = True
260 try:
261 self._protocol.pause_writing()
262 except Exception as exc:
263 self._loop.call_exception_handler({
264 'message': 'protocol.pause_writing() failed',
265 'exception': exc,
266 'transport': self,
267 'protocol': self._protocol,
268 })
269
270 def _maybe_resume_protocol(self):
271 if (self._protocol_paused and
272 self.get_write_buffer_size() <= self._low_water):
273 self._protocol_paused = False
274 try:
275 self._protocol.resume_writing()
276 except Exception as exc:
277 self._loop.call_exception_handler({
278 'message': 'protocol.resume_writing() failed',
279 'exception': exc,
280 'transport': self,
281 'protocol': self._protocol,
282 })
283
Victor Stinner52bb9492014-08-26 00:22:28 +0200284 def get_write_buffer_limits(self):
285 return (self._low_water, self._high_water)
286
Yury Selivanov15899202014-02-19 11:10:52 -0500287 def _set_write_buffer_limits(self, high=None, low=None):
Yury Selivanov3cb99142014-02-18 18:41:13 -0500288 if high is None:
289 if low is None:
290 high = 64*1024
291 else:
292 high = 4*low
293 if low is None:
294 low = high // 4
295 if not high >= low >= 0:
296 raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
297 (high, low))
298 self._high_water = high
299 self._low_water = low
300
Yury Selivanov15899202014-02-19 11:10:52 -0500301 def set_write_buffer_limits(self, high=None, low=None):
302 self._set_write_buffer_limits(high=high, low=low)
303 self._maybe_pause_protocol()
304
Yury Selivanov3cb99142014-02-18 18:41:13 -0500305 def get_write_buffer_size(self):
306 raise NotImplementedError