blob: 51f56737c67afbab97ee9d28683c1a6df14bf2d7 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Abstract Transport class."""
2
Yury Selivanov6370f342017-12-10 18:36:12 -05003__all__ = (
4 'BaseTransport', 'ReadTransport', 'WriteTransport',
5 'Transport', 'DatagramTransport', 'SubprocessTransport',
6)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07007
8
9class BaseTransport:
Guido van Rossum9204af42013-11-30 15:35:42 -080010 """Base class for transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070011
12 def __init__(self, extra=None):
13 if extra is None:
14 extra = {}
15 self._extra = extra
16
17 def get_extra_info(self, name, default=None):
18 """Get optional transport information."""
19 return self._extra.get(name, default)
20
Yury Selivanov5bb1afb2015-11-16 12:43:21 -050021 def is_closing(self):
22 """Return True if the transport is closing or closed."""
23 raise NotImplementedError
24
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070025 def close(self):
Antoine Pitroudec43382013-11-23 12:30:00 +010026 """Close the transport.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070027
28 Buffered data will be flushed asynchronously. No more data
29 will be received. After all buffered data is flushed, the
30 protocol's connection_lost() method will (eventually) called
31 with None as its argument.
32 """
33 raise NotImplementedError
34
Yury Selivanova05a6ef2016-09-11 21:11:02 -040035 def set_protocol(self, protocol):
36 """Set a new protocol."""
37 raise NotImplementedError
38
39 def get_protocol(self):
40 """Return the current protocol."""
41 raise NotImplementedError
42
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043
44class ReadTransport(BaseTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -080045 """Interface for read-only transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070046
Guido van Rossum57497ad2013-10-18 07:58:20 -070047 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070048 """Pause the receiving end.
49
50 No data will be passed to the protocol's data_received()
Guido van Rossum57497ad2013-10-18 07:58:20 -070051 method until resume_reading() is called.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070052 """
53 raise NotImplementedError
54
Guido van Rossum57497ad2013-10-18 07:58:20 -070055 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070056 """Resume the receiving end.
57
58 Data received will once again be passed to the protocol's
59 data_received() method.
60 """
61 raise NotImplementedError
62
63
64class WriteTransport(BaseTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -080065 """Interface for write-only transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070066
Guido van Rossum355491d2013-10-18 15:17:11 -070067 def set_write_buffer_limits(self, high=None, low=None):
68 """Set the high- and low-water limits for write flow control.
69
70 These two values control when to call the protocol's
71 pause_writing() and resume_writing() methods. If specified,
72 the low-water limit must be less than or equal to the
73 high-water limit. Neither value can be negative.
74
75 The defaults are implementation-specific. If only the
Serhiy Storchakad65c9492015-11-02 14:10:23 +020076 high-water limit is given, the low-water limit defaults to an
Guido van Rossum355491d2013-10-18 15:17:11 -070077 implementation-specific value less than or equal to the
78 high-water limit. Setting high to zero forces low to zero as
79 well, and causes pause_writing() to be called whenever the
80 buffer becomes non-empty. Setting low to zero causes
81 resume_writing() to be called only once the buffer is empty.
82 Use of zero for either limit is generally sub-optimal as it
83 reduces opportunities for doing I/O and computation
84 concurrently.
85 """
86 raise NotImplementedError
87
88 def get_write_buffer_size(self):
89 """Return the current size of the write buffer."""
90 raise NotImplementedError
91
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070092 def write(self, data):
93 """Write some data bytes to the transport.
94
95 This does not block; it buffers the data and arranges for it
96 to be sent out asynchronously.
97 """
98 raise NotImplementedError
99
100 def writelines(self, list_of_data):
101 """Write a list (or any iterable) of data bytes to the transport.
102
Guido van Rossumf10345e2013-12-02 18:36:30 -0800103 The default implementation concatenates the arguments and
104 calls write() on the result.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700105 """
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900106 data = b''.join(list_of_data)
Victor Stinner71080fc2015-07-25 02:23:21 +0200107 self.write(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700108
109 def write_eof(self):
Antoine Pitroudec43382013-11-23 12:30:00 +0100110 """Close the write end after flushing buffered data.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700111
112 (This is like typing ^D into a UNIX program reading from stdin.)
113
114 Data may still be received.
115 """
116 raise NotImplementedError
117
118 def can_write_eof(self):
Antoine Pitroudec43382013-11-23 12:30:00 +0100119 """Return True if this transport supports write_eof(), False if not."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700120 raise NotImplementedError
121
122 def abort(self):
Guido van Rossum488b0da2013-11-23 11:51:09 -0800123 """Close the transport immediately.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700124
125 Buffered data will be lost. No more data will be received.
126 The protocol's connection_lost() method will (eventually) be
127 called with None as its argument.
128 """
129 raise NotImplementedError
130
131
132class Transport(ReadTransport, WriteTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -0800133 """Interface representing a bidirectional transport.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700134
135 There may be several implementations, but typically, the user does
136 not implement new transports; rather, the platform provides some
137 useful transports that are implemented using the platform's best
138 practices.
139
140 The user never instantiates a transport directly; they call a
141 utility function, passing it a protocol factory and other
142 information necessary to create the transport and protocol. (E.g.
143 EventLoop.create_connection() or EventLoop.create_server().)
144
145 The utility function will asynchronously create a transport and a
146 protocol and hook them up by calling the protocol's
147 connection_made() method, passing it the transport.
148
149 The implementation here raises NotImplemented for every method
150 except writelines(), which calls write() in a loop.
151 """
152
153
154class DatagramTransport(BaseTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -0800155 """Interface for datagram (UDP) transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700156
157 def sendto(self, data, addr=None):
158 """Send data to the transport.
159
160 This does not block; it buffers the data and arranges for it
161 to be sent out asynchronously.
162 addr is target socket address.
163 If addr is None use target address pointed on transport creation.
164 """
165 raise NotImplementedError
166
167 def abort(self):
Antoine Pitroudec43382013-11-23 12:30:00 +0100168 """Close the transport immediately.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700169
170 Buffered data will be lost. No more data will be received.
171 The protocol's connection_lost() method will (eventually) be
172 called with None as its argument.
173 """
174 raise NotImplementedError
175
176
177class SubprocessTransport(BaseTransport):
178
179 def get_pid(self):
180 """Get subprocess id."""
181 raise NotImplementedError
182
183 def get_returncode(self):
184 """Get subprocess returncode.
185
186 See also
187 http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode
188 """
189 raise NotImplementedError
190
191 def get_pipe_transport(self, fd):
192 """Get transport for pipe with number fd."""
193 raise NotImplementedError
194
195 def send_signal(self, signal):
196 """Send signal to subprocess.
197
198 See also:
199 docs.python.org/3/library/subprocess#subprocess.Popen.send_signal
200 """
201 raise NotImplementedError
202
203 def terminate(self):
204 """Stop the subprocess.
205
206 Alias for close() method.
207
208 On Posix OSs the method sends SIGTERM to the subprocess.
209 On Windows the Win32 API function TerminateProcess()
210 is called to stop the subprocess.
211
212 See also:
213 http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate
214 """
215 raise NotImplementedError
216
217 def kill(self):
218 """Kill the subprocess.
219
220 On Posix OSs the function sends SIGKILL to the subprocess.
221 On Windows kill() is an alias for terminate().
222
223 See also:
224 http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
225 """
226 raise NotImplementedError
Yury Selivanov3cb99142014-02-18 18:41:13 -0500227
228
229class _FlowControlMixin(Transport):
230 """All the logic for (write) flow control in a mix-in base class.
231
232 The subclass must implement get_write_buffer_size(). It must call
233 _maybe_pause_protocol() whenever the write buffer size increases,
234 and _maybe_resume_protocol() whenever it decreases. It may also
235 override set_write_buffer_limits() (e.g. to specify different
236 defaults).
237
238 The subclass constructor must call super().__init__(extra). This
239 will call set_write_buffer_limits().
240
241 The user may call set_write_buffer_limits() and
242 get_write_buffer_size(), and their protocol's pause_writing() and
243 resume_writing() may be called.
244 """
245
Victor Stinner004adb92014-11-05 15:27:41 +0100246 def __init__(self, extra=None, loop=None):
Yury Selivanov3cb99142014-02-18 18:41:13 -0500247 super().__init__(extra)
Victor Stinner004adb92014-11-05 15:27:41 +0100248 assert loop is not None
249 self._loop = loop
Yury Selivanov3cb99142014-02-18 18:41:13 -0500250 self._protocol_paused = False
Yury Selivanov15899202014-02-19 11:10:52 -0500251 self._set_write_buffer_limits()
Yury Selivanov3cb99142014-02-18 18:41:13 -0500252
253 def _maybe_pause_protocol(self):
254 size = self.get_write_buffer_size()
255 if size <= self._high_water:
256 return
257 if not self._protocol_paused:
258 self._protocol_paused = True
259 try:
260 self._protocol.pause_writing()
261 except Exception as exc:
262 self._loop.call_exception_handler({
263 'message': 'protocol.pause_writing() failed',
264 'exception': exc,
265 'transport': self,
266 'protocol': self._protocol,
267 })
268
269 def _maybe_resume_protocol(self):
270 if (self._protocol_paused and
Yury Selivanov6370f342017-12-10 18:36:12 -0500271 self.get_write_buffer_size() <= self._low_water):
Yury Selivanov3cb99142014-02-18 18:41:13 -0500272 self._protocol_paused = False
273 try:
274 self._protocol.resume_writing()
275 except Exception as exc:
276 self._loop.call_exception_handler({
277 'message': 'protocol.resume_writing() failed',
278 'exception': exc,
279 'transport': self,
280 'protocol': self._protocol,
281 })
282
Victor Stinner52bb9492014-08-26 00:22:28 +0200283 def get_write_buffer_limits(self):
284 return (self._low_water, self._high_water)
285
Yury Selivanov15899202014-02-19 11:10:52 -0500286 def _set_write_buffer_limits(self, high=None, low=None):
Yury Selivanov3cb99142014-02-18 18:41:13 -0500287 if high is None:
288 if low is None:
Yury Selivanov6370f342017-12-10 18:36:12 -0500289 high = 64 * 1024
Yury Selivanov3cb99142014-02-18 18:41:13 -0500290 else:
Yury Selivanov6370f342017-12-10 18:36:12 -0500291 high = 4 * low
Yury Selivanov3cb99142014-02-18 18:41:13 -0500292 if low is None:
293 low = high // 4
Yury Selivanov6370f342017-12-10 18:36:12 -0500294
Yury Selivanov3cb99142014-02-18 18:41:13 -0500295 if not high >= low >= 0:
Yury Selivanov6370f342017-12-10 18:36:12 -0500296 raise ValueError(
297 f'high ({high!r}) must be >= low ({low!r}) must be >= 0')
298
Yury Selivanov3cb99142014-02-18 18:41:13 -0500299 self._high_water = high
300 self._low_water = low
301
Yury Selivanov15899202014-02-19 11:10:52 -0500302 def set_write_buffer_limits(self, high=None, low=None):
303 self._set_write_buffer_limits(high=high, low=low)
304 self._maybe_pause_protocol()
305
Yury Selivanov3cb99142014-02-18 18:41:13 -0500306 def get_write_buffer_size(self):
307 raise NotImplementedError