blob: a94079f433adfc642a7df3ed05c34a04e785a0ce [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Abstract Transport class."""
2
Guido van Rossumac97bf42013-12-20 14:16:21 -08003__all__ = ['BaseTransport', 'ReadTransport', 'WriteTransport',
4 'Transport', 'DatagramTransport', 'SubprocessTransport',
5 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07006
7
8class BaseTransport:
Guido van Rossum9204af42013-11-30 15:35:42 -08009 """Base class for transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010
11 def __init__(self, extra=None):
12 if extra is None:
13 extra = {}
14 self._extra = extra
15
16 def get_extra_info(self, name, default=None):
17 """Get optional transport information."""
18 return self._extra.get(name, default)
19
Yury Selivanov5bb1afb2015-11-16 12:43:21 -050020 def is_closing(self):
21 """Return True if the transport is closing or closed."""
22 raise NotImplementedError
23
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024 def close(self):
Antoine Pitroudec43382013-11-23 12:30:00 +010025 """Close the transport.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
27 Buffered data will be flushed asynchronously. No more data
28 will be received. After all buffered data is flushed, the
29 protocol's connection_lost() method will (eventually) called
30 with None as its argument.
31 """
32 raise NotImplementedError
33
Yury Selivanova05a6ef2016-09-11 21:11:02 -040034 def set_protocol(self, protocol):
35 """Set a new protocol."""
36 raise NotImplementedError
37
38 def get_protocol(self):
39 """Return the current protocol."""
40 raise NotImplementedError
41
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070042
43class ReadTransport(BaseTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -080044 """Interface for read-only transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070045
Guido van Rossum57497ad2013-10-18 07:58:20 -070046 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047 """Pause the receiving end.
48
49 No data will be passed to the protocol's data_received()
Guido van Rossum57497ad2013-10-18 07:58:20 -070050 method until resume_reading() is called.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070051 """
52 raise NotImplementedError
53
Guido van Rossum57497ad2013-10-18 07:58:20 -070054 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070055 """Resume the receiving end.
56
57 Data received will once again be passed to the protocol's
58 data_received() method.
59 """
60 raise NotImplementedError
61
62
63class WriteTransport(BaseTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -080064 """Interface for write-only transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070065
Guido van Rossum355491d2013-10-18 15:17:11 -070066 def set_write_buffer_limits(self, high=None, low=None):
67 """Set the high- and low-water limits for write flow control.
68
69 These two values control when to call the protocol's
70 pause_writing() and resume_writing() methods. If specified,
71 the low-water limit must be less than or equal to the
72 high-water limit. Neither value can be negative.
73
74 The defaults are implementation-specific. If only the
Serhiy Storchakad65c9492015-11-02 14:10:23 +020075 high-water limit is given, the low-water limit defaults to an
Guido van Rossum355491d2013-10-18 15:17:11 -070076 implementation-specific value less than or equal to the
77 high-water limit. Setting high to zero forces low to zero as
78 well, and causes pause_writing() to be called whenever the
79 buffer becomes non-empty. Setting low to zero causes
80 resume_writing() to be called only once the buffer is empty.
81 Use of zero for either limit is generally sub-optimal as it
82 reduces opportunities for doing I/O and computation
83 concurrently.
84 """
85 raise NotImplementedError
86
87 def get_write_buffer_size(self):
88 """Return the current size of the write buffer."""
89 raise NotImplementedError
90
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070091 def write(self, data):
92 """Write some data bytes to the transport.
93
94 This does not block; it buffers the data and arranges for it
95 to be sent out asynchronously.
96 """
97 raise NotImplementedError
98
99 def writelines(self, list_of_data):
100 """Write a list (or any iterable) of data bytes to the transport.
101
Guido van Rossumf10345e2013-12-02 18:36:30 -0800102 The default implementation concatenates the arguments and
103 calls write() on the result.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700104 """
INADA Naoki3e2ad8e2017-04-25 10:57:18 +0900105 data = b''.join(list_of_data)
Victor Stinner71080fc2015-07-25 02:23:21 +0200106 self.write(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700107
108 def write_eof(self):
Antoine Pitroudec43382013-11-23 12:30:00 +0100109 """Close the write end after flushing buffered data.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700110
111 (This is like typing ^D into a UNIX program reading from stdin.)
112
113 Data may still be received.
114 """
115 raise NotImplementedError
116
117 def can_write_eof(self):
Antoine Pitroudec43382013-11-23 12:30:00 +0100118 """Return True if this transport supports write_eof(), False if not."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119 raise NotImplementedError
120
121 def abort(self):
Guido van Rossum488b0da2013-11-23 11:51:09 -0800122 """Close the transport immediately.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700123
124 Buffered data will be lost. No more data will be received.
125 The protocol's connection_lost() method will (eventually) be
126 called with None as its argument.
127 """
128 raise NotImplementedError
129
130
131class Transport(ReadTransport, WriteTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -0800132 """Interface representing a bidirectional transport.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700133
134 There may be several implementations, but typically, the user does
135 not implement new transports; rather, the platform provides some
136 useful transports that are implemented using the platform's best
137 practices.
138
139 The user never instantiates a transport directly; they call a
140 utility function, passing it a protocol factory and other
141 information necessary to create the transport and protocol. (E.g.
142 EventLoop.create_connection() or EventLoop.create_server().)
143
144 The utility function will asynchronously create a transport and a
145 protocol and hook them up by calling the protocol's
146 connection_made() method, passing it the transport.
147
148 The implementation here raises NotImplemented for every method
149 except writelines(), which calls write() in a loop.
150 """
151
152
153class DatagramTransport(BaseTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -0800154 """Interface for datagram (UDP) transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700155
156 def sendto(self, data, addr=None):
157 """Send data to the transport.
158
159 This does not block; it buffers the data and arranges for it
160 to be sent out asynchronously.
161 addr is target socket address.
162 If addr is None use target address pointed on transport creation.
163 """
164 raise NotImplementedError
165
166 def abort(self):
Antoine Pitroudec43382013-11-23 12:30:00 +0100167 """Close the transport immediately.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700168
169 Buffered data will be lost. No more data will be received.
170 The protocol's connection_lost() method will (eventually) be
171 called with None as its argument.
172 """
173 raise NotImplementedError
174
175
176class SubprocessTransport(BaseTransport):
177
178 def get_pid(self):
179 """Get subprocess id."""
180 raise NotImplementedError
181
182 def get_returncode(self):
183 """Get subprocess returncode.
184
185 See also
186 http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode
187 """
188 raise NotImplementedError
189
190 def get_pipe_transport(self, fd):
191 """Get transport for pipe with number fd."""
192 raise NotImplementedError
193
194 def send_signal(self, signal):
195 """Send signal to subprocess.
196
197 See also:
198 docs.python.org/3/library/subprocess#subprocess.Popen.send_signal
199 """
200 raise NotImplementedError
201
202 def terminate(self):
203 """Stop the subprocess.
204
205 Alias for close() method.
206
207 On Posix OSs the method sends SIGTERM to the subprocess.
208 On Windows the Win32 API function TerminateProcess()
209 is called to stop the subprocess.
210
211 See also:
212 http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate
213 """
214 raise NotImplementedError
215
216 def kill(self):
217 """Kill the subprocess.
218
219 On Posix OSs the function sends SIGKILL to the subprocess.
220 On Windows kill() is an alias for terminate().
221
222 See also:
223 http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
224 """
225 raise NotImplementedError
Yury Selivanov3cb99142014-02-18 18:41:13 -0500226
227
228class _FlowControlMixin(Transport):
229 """All the logic for (write) flow control in a mix-in base class.
230
231 The subclass must implement get_write_buffer_size(). It must call
232 _maybe_pause_protocol() whenever the write buffer size increases,
233 and _maybe_resume_protocol() whenever it decreases. It may also
234 override set_write_buffer_limits() (e.g. to specify different
235 defaults).
236
237 The subclass constructor must call super().__init__(extra). This
238 will call set_write_buffer_limits().
239
240 The user may call set_write_buffer_limits() and
241 get_write_buffer_size(), and their protocol's pause_writing() and
242 resume_writing() may be called.
243 """
244
Victor Stinner004adb92014-11-05 15:27:41 +0100245 def __init__(self, extra=None, loop=None):
Yury Selivanov3cb99142014-02-18 18:41:13 -0500246 super().__init__(extra)
Victor Stinner004adb92014-11-05 15:27:41 +0100247 assert loop is not None
248 self._loop = loop
Yury Selivanov3cb99142014-02-18 18:41:13 -0500249 self._protocol_paused = False
Yury Selivanov15899202014-02-19 11:10:52 -0500250 self._set_write_buffer_limits()
Yury Selivanov3cb99142014-02-18 18:41:13 -0500251
252 def _maybe_pause_protocol(self):
253 size = self.get_write_buffer_size()
254 if size <= self._high_water:
255 return
256 if not self._protocol_paused:
257 self._protocol_paused = True
258 try:
259 self._protocol.pause_writing()
260 except Exception as exc:
261 self._loop.call_exception_handler({
262 'message': 'protocol.pause_writing() failed',
263 'exception': exc,
264 'transport': self,
265 'protocol': self._protocol,
266 })
267
268 def _maybe_resume_protocol(self):
269 if (self._protocol_paused and
270 self.get_write_buffer_size() <= self._low_water):
271 self._protocol_paused = False
272 try:
273 self._protocol.resume_writing()
274 except Exception as exc:
275 self._loop.call_exception_handler({
276 'message': 'protocol.resume_writing() failed',
277 'exception': exc,
278 'transport': self,
279 'protocol': self._protocol,
280 })
281
Victor Stinner52bb9492014-08-26 00:22:28 +0200282 def get_write_buffer_limits(self):
283 return (self._low_water, self._high_water)
284
Yury Selivanov15899202014-02-19 11:10:52 -0500285 def _set_write_buffer_limits(self, high=None, low=None):
Yury Selivanov3cb99142014-02-18 18:41:13 -0500286 if high is None:
287 if low is None:
288 high = 64*1024
289 else:
290 high = 4*low
291 if low is None:
292 low = high // 4
293 if not high >= low >= 0:
294 raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
295 (high, low))
296 self._high_water = high
297 self._low_water = low
298
Yury Selivanov15899202014-02-19 11:10:52 -0500299 def set_write_buffer_limits(self, high=None, low=None):
300 self._set_write_buffer_limits(high=high, low=low)
301 self._maybe_pause_protocol()
302
Yury Selivanov3cb99142014-02-18 18:41:13 -0500303 def get_write_buffer_size(self):
304 raise NotImplementedError