blob: 22df3c7aede77de1b868e1e0334cc57e349c6bcb [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Abstract Transport class."""
2
Guido van Rossumf10345e2013-12-02 18:36:30 -08003import sys
4
Victor Stinner6f055e72014-01-02 18:41:34 +01005_PY34 = sys.version_info >= (3, 4)
Guido van Rossumf10345e2013-12-02 18:36:30 -08006
Guido van Rossumac97bf42013-12-20 14:16:21 -08007__all__ = ['BaseTransport', 'ReadTransport', 'WriteTransport',
8 'Transport', 'DatagramTransport', 'SubprocessTransport',
9 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070010
11
12class BaseTransport:
Guido van Rossum9204af42013-11-30 15:35:42 -080013 """Base class for transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070014
15 def __init__(self, extra=None):
16 if extra is None:
17 extra = {}
18 self._extra = extra
19
20 def get_extra_info(self, name, default=None):
21 """Get optional transport information."""
22 return self._extra.get(name, default)
23
24 def close(self):
Antoine Pitroudec43382013-11-23 12:30:00 +010025 """Close the transport.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026
27 Buffered data will be flushed asynchronously. No more data
28 will be received. After all buffered data is flushed, the
29 protocol's connection_lost() method will (eventually) called
30 with None as its argument.
31 """
32 raise NotImplementedError
33
34
35class ReadTransport(BaseTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -080036 """Interface for read-only transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037
Guido van Rossum57497ad2013-10-18 07:58:20 -070038 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039 """Pause the receiving end.
40
41 No data will be passed to the protocol's data_received()
Guido van Rossum57497ad2013-10-18 07:58:20 -070042 method until resume_reading() is called.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070043 """
44 raise NotImplementedError
45
Guido van Rossum57497ad2013-10-18 07:58:20 -070046 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070047 """Resume the receiving end.
48
49 Data received will once again be passed to the protocol's
50 data_received() method.
51 """
52 raise NotImplementedError
53
54
55class WriteTransport(BaseTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -080056 """Interface for write-only transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070057
Guido van Rossum355491d2013-10-18 15:17:11 -070058 def set_write_buffer_limits(self, high=None, low=None):
59 """Set the high- and low-water limits for write flow control.
60
61 These two values control when to call the protocol's
62 pause_writing() and resume_writing() methods. If specified,
63 the low-water limit must be less than or equal to the
64 high-water limit. Neither value can be negative.
65
66 The defaults are implementation-specific. If only the
67 high-water limit is given, the low-water limit defaults to a
68 implementation-specific value less than or equal to the
69 high-water limit. Setting high to zero forces low to zero as
70 well, and causes pause_writing() to be called whenever the
71 buffer becomes non-empty. Setting low to zero causes
72 resume_writing() to be called only once the buffer is empty.
73 Use of zero for either limit is generally sub-optimal as it
74 reduces opportunities for doing I/O and computation
75 concurrently.
76 """
77 raise NotImplementedError
78
79 def get_write_buffer_size(self):
80 """Return the current size of the write buffer."""
81 raise NotImplementedError
82
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070083 def write(self, data):
84 """Write some data bytes to the transport.
85
86 This does not block; it buffers the data and arranges for it
87 to be sent out asynchronously.
88 """
89 raise NotImplementedError
90
91 def writelines(self, list_of_data):
92 """Write a list (or any iterable) of data bytes to the transport.
93
Guido van Rossumf10345e2013-12-02 18:36:30 -080094 The default implementation concatenates the arguments and
95 calls write() on the result.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070096 """
Victor Stinner6f055e72014-01-02 18:41:34 +010097 if not _PY34:
Guido van Rossumf10345e2013-12-02 18:36:30 -080098 # In Python 3.3, bytes.join() doesn't handle memoryview.
99 list_of_data = (
100 bytes(data) if isinstance(data, memoryview) else data
101 for data in list_of_data)
102 self.write(b''.join(list_of_data))
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700103
104 def write_eof(self):
Antoine Pitroudec43382013-11-23 12:30:00 +0100105 """Close the write end after flushing buffered data.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700106
107 (This is like typing ^D into a UNIX program reading from stdin.)
108
109 Data may still be received.
110 """
111 raise NotImplementedError
112
113 def can_write_eof(self):
Antoine Pitroudec43382013-11-23 12:30:00 +0100114 """Return True if this transport supports write_eof(), False if not."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700115 raise NotImplementedError
116
117 def abort(self):
Guido van Rossum488b0da2013-11-23 11:51:09 -0800118 """Close the transport immediately.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700119
120 Buffered data will be lost. No more data will be received.
121 The protocol's connection_lost() method will (eventually) be
122 called with None as its argument.
123 """
124 raise NotImplementedError
125
126
127class Transport(ReadTransport, WriteTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -0800128 """Interface representing a bidirectional transport.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700129
130 There may be several implementations, but typically, the user does
131 not implement new transports; rather, the platform provides some
132 useful transports that are implemented using the platform's best
133 practices.
134
135 The user never instantiates a transport directly; they call a
136 utility function, passing it a protocol factory and other
137 information necessary to create the transport and protocol. (E.g.
138 EventLoop.create_connection() or EventLoop.create_server().)
139
140 The utility function will asynchronously create a transport and a
141 protocol and hook them up by calling the protocol's
142 connection_made() method, passing it the transport.
143
144 The implementation here raises NotImplemented for every method
145 except writelines(), which calls write() in a loop.
146 """
147
148
149class DatagramTransport(BaseTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -0800150 """Interface for datagram (UDP) transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700151
152 def sendto(self, data, addr=None):
153 """Send data to the transport.
154
155 This does not block; it buffers the data and arranges for it
156 to be sent out asynchronously.
157 addr is target socket address.
158 If addr is None use target address pointed on transport creation.
159 """
160 raise NotImplementedError
161
162 def abort(self):
Antoine Pitroudec43382013-11-23 12:30:00 +0100163 """Close the transport immediately.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700164
165 Buffered data will be lost. No more data will be received.
166 The protocol's connection_lost() method will (eventually) be
167 called with None as its argument.
168 """
169 raise NotImplementedError
170
171
172class SubprocessTransport(BaseTransport):
173
174 def get_pid(self):
175 """Get subprocess id."""
176 raise NotImplementedError
177
178 def get_returncode(self):
179 """Get subprocess returncode.
180
181 See also
182 http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode
183 """
184 raise NotImplementedError
185
186 def get_pipe_transport(self, fd):
187 """Get transport for pipe with number fd."""
188 raise NotImplementedError
189
190 def send_signal(self, signal):
191 """Send signal to subprocess.
192
193 See also:
194 docs.python.org/3/library/subprocess#subprocess.Popen.send_signal
195 """
196 raise NotImplementedError
197
198 def terminate(self):
199 """Stop the subprocess.
200
201 Alias for close() method.
202
203 On Posix OSs the method sends SIGTERM to the subprocess.
204 On Windows the Win32 API function TerminateProcess()
205 is called to stop the subprocess.
206
207 See also:
208 http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate
209 """
210 raise NotImplementedError
211
212 def kill(self):
213 """Kill the subprocess.
214
215 On Posix OSs the function sends SIGKILL to the subprocess.
216 On Windows kill() is an alias for terminate().
217
218 See also:
219 http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
220 """
221 raise NotImplementedError
Yury Selivanov3cb99142014-02-18 18:41:13 -0500222
223
224class _FlowControlMixin(Transport):
225 """All the logic for (write) flow control in a mix-in base class.
226
227 The subclass must implement get_write_buffer_size(). It must call
228 _maybe_pause_protocol() whenever the write buffer size increases,
229 and _maybe_resume_protocol() whenever it decreases. It may also
230 override set_write_buffer_limits() (e.g. to specify different
231 defaults).
232
233 The subclass constructor must call super().__init__(extra). This
234 will call set_write_buffer_limits().
235
236 The user may call set_write_buffer_limits() and
237 get_write_buffer_size(), and their protocol's pause_writing() and
238 resume_writing() may be called.
239 """
240
Victor Stinner004adb92014-11-05 15:27:41 +0100241 def __init__(self, extra=None, loop=None):
Yury Selivanov3cb99142014-02-18 18:41:13 -0500242 super().__init__(extra)
Victor Stinner004adb92014-11-05 15:27:41 +0100243 assert loop is not None
244 self._loop = loop
Yury Selivanov3cb99142014-02-18 18:41:13 -0500245 self._protocol_paused = False
Yury Selivanov15899202014-02-19 11:10:52 -0500246 self._set_write_buffer_limits()
Yury Selivanov3cb99142014-02-18 18:41:13 -0500247
248 def _maybe_pause_protocol(self):
249 size = self.get_write_buffer_size()
250 if size <= self._high_water:
251 return
252 if not self._protocol_paused:
253 self._protocol_paused = True
254 try:
255 self._protocol.pause_writing()
256 except Exception as exc:
257 self._loop.call_exception_handler({
258 'message': 'protocol.pause_writing() failed',
259 'exception': exc,
260 'transport': self,
261 'protocol': self._protocol,
262 })
263
264 def _maybe_resume_protocol(self):
265 if (self._protocol_paused and
266 self.get_write_buffer_size() <= self._low_water):
267 self._protocol_paused = False
268 try:
269 self._protocol.resume_writing()
270 except Exception as exc:
271 self._loop.call_exception_handler({
272 'message': 'protocol.resume_writing() failed',
273 'exception': exc,
274 'transport': self,
275 'protocol': self._protocol,
276 })
277
Victor Stinner52bb9492014-08-26 00:22:28 +0200278 def get_write_buffer_limits(self):
279 return (self._low_water, self._high_water)
280
Yury Selivanov15899202014-02-19 11:10:52 -0500281 def _set_write_buffer_limits(self, high=None, low=None):
Yury Selivanov3cb99142014-02-18 18:41:13 -0500282 if high is None:
283 if low is None:
284 high = 64*1024
285 else:
286 high = 4*low
287 if low is None:
288 low = high // 4
289 if not high >= low >= 0:
290 raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
291 (high, low))
292 self._high_water = high
293 self._low_water = low
294
Yury Selivanov15899202014-02-19 11:10:52 -0500295 def set_write_buffer_limits(self, high=None, low=None):
296 self._set_write_buffer_limits(high=high, low=low)
297 self._maybe_pause_protocol()
298
Yury Selivanov3cb99142014-02-18 18:41:13 -0500299 def get_write_buffer_size(self):
300 raise NotImplementedError