blob: 9a6d9197d9a0aa350b8f00b8bfe4e1235abe50f5 [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Abstract Transport class."""
2
Victor Stinner71080fc2015-07-25 02:23:21 +02003from asyncio import compat
Guido van Rossumf10345e2013-12-02 18:36:30 -08004
Guido van Rossumac97bf42013-12-20 14:16:21 -08005__all__ = ['BaseTransport', 'ReadTransport', 'WriteTransport',
6 'Transport', 'DatagramTransport', 'SubprocessTransport',
7 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
9
10class BaseTransport:
Guido van Rossum9204af42013-11-30 15:35:42 -080011 """Base class for transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13 def __init__(self, extra=None):
14 if extra is None:
15 extra = {}
16 self._extra = extra
17
18 def get_extra_info(self, name, default=None):
19 """Get optional transport information."""
20 return self._extra.get(name, default)
21
Yury Selivanov5bb1afb2015-11-16 12:43:21 -050022 def is_closing(self):
23 """Return True if the transport is closing or closed."""
24 raise NotImplementedError
25
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070026 def close(self):
Antoine Pitroudec43382013-11-23 12:30:00 +010027 """Close the transport.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070028
29 Buffered data will be flushed asynchronously. No more data
30 will be received. After all buffered data is flushed, the
31 protocol's connection_lost() method will (eventually) called
32 with None as its argument.
33 """
34 raise NotImplementedError
35
36
37class ReadTransport(BaseTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -080038 """Interface for read-only transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070039
Guido van Rossum57497ad2013-10-18 07:58:20 -070040 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041 """Pause the receiving end.
42
43 No data will be passed to the protocol's data_received()
Guido van Rossum57497ad2013-10-18 07:58:20 -070044 method until resume_reading() is called.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070045 """
46 raise NotImplementedError
47
Guido van Rossum57497ad2013-10-18 07:58:20 -070048 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070049 """Resume the receiving end.
50
51 Data received will once again be passed to the protocol's
52 data_received() method.
53 """
54 raise NotImplementedError
55
56
57class WriteTransport(BaseTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -080058 """Interface for write-only transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070059
Guido van Rossum355491d2013-10-18 15:17:11 -070060 def set_write_buffer_limits(self, high=None, low=None):
61 """Set the high- and low-water limits for write flow control.
62
63 These two values control when to call the protocol's
64 pause_writing() and resume_writing() methods. If specified,
65 the low-water limit must be less than or equal to the
66 high-water limit. Neither value can be negative.
67
68 The defaults are implementation-specific. If only the
Serhiy Storchakad65c9492015-11-02 14:10:23 +020069 high-water limit is given, the low-water limit defaults to an
Guido van Rossum355491d2013-10-18 15:17:11 -070070 implementation-specific value less than or equal to the
71 high-water limit. Setting high to zero forces low to zero as
72 well, and causes pause_writing() to be called whenever the
73 buffer becomes non-empty. Setting low to zero causes
74 resume_writing() to be called only once the buffer is empty.
75 Use of zero for either limit is generally sub-optimal as it
76 reduces opportunities for doing I/O and computation
77 concurrently.
78 """
79 raise NotImplementedError
80
81 def get_write_buffer_size(self):
82 """Return the current size of the write buffer."""
83 raise NotImplementedError
84
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070085 def write(self, data):
86 """Write some data bytes to the transport.
87
88 This does not block; it buffers the data and arranges for it
89 to be sent out asynchronously.
90 """
91 raise NotImplementedError
92
93 def writelines(self, list_of_data):
94 """Write a list (or any iterable) of data bytes to the transport.
95
Guido van Rossumf10345e2013-12-02 18:36:30 -080096 The default implementation concatenates the arguments and
97 calls write() on the result.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070098 """
Victor Stinner71080fc2015-07-25 02:23:21 +020099 data = compat.flatten_list_bytes(list_of_data)
100 self.write(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700101
102 def write_eof(self):
Antoine Pitroudec43382013-11-23 12:30:00 +0100103 """Close the write end after flushing buffered data.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700104
105 (This is like typing ^D into a UNIX program reading from stdin.)
106
107 Data may still be received.
108 """
109 raise NotImplementedError
110
111 def can_write_eof(self):
Antoine Pitroudec43382013-11-23 12:30:00 +0100112 """Return True if this transport supports write_eof(), False if not."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113 raise NotImplementedError
114
115 def abort(self):
Guido van Rossum488b0da2013-11-23 11:51:09 -0800116 """Close the transport immediately.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700117
118 Buffered data will be lost. No more data will be received.
119 The protocol's connection_lost() method will (eventually) be
120 called with None as its argument.
121 """
122 raise NotImplementedError
123
124
125class Transport(ReadTransport, WriteTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -0800126 """Interface representing a bidirectional transport.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700127
128 There may be several implementations, but typically, the user does
129 not implement new transports; rather, the platform provides some
130 useful transports that are implemented using the platform's best
131 practices.
132
133 The user never instantiates a transport directly; they call a
134 utility function, passing it a protocol factory and other
135 information necessary to create the transport and protocol. (E.g.
136 EventLoop.create_connection() or EventLoop.create_server().)
137
138 The utility function will asynchronously create a transport and a
139 protocol and hook them up by calling the protocol's
140 connection_made() method, passing it the transport.
141
142 The implementation here raises NotImplemented for every method
143 except writelines(), which calls write() in a loop.
144 """
145
146
147class DatagramTransport(BaseTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -0800148 """Interface for datagram (UDP) transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700149
150 def sendto(self, data, addr=None):
151 """Send data to the transport.
152
153 This does not block; it buffers the data and arranges for it
154 to be sent out asynchronously.
155 addr is target socket address.
156 If addr is None use target address pointed on transport creation.
157 """
158 raise NotImplementedError
159
160 def abort(self):
Antoine Pitroudec43382013-11-23 12:30:00 +0100161 """Close the transport immediately.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700162
163 Buffered data will be lost. No more data will be received.
164 The protocol's connection_lost() method will (eventually) be
165 called with None as its argument.
166 """
167 raise NotImplementedError
168
169
170class SubprocessTransport(BaseTransport):
171
172 def get_pid(self):
173 """Get subprocess id."""
174 raise NotImplementedError
175
176 def get_returncode(self):
177 """Get subprocess returncode.
178
179 See also
180 http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode
181 """
182 raise NotImplementedError
183
184 def get_pipe_transport(self, fd):
185 """Get transport for pipe with number fd."""
186 raise NotImplementedError
187
188 def send_signal(self, signal):
189 """Send signal to subprocess.
190
191 See also:
192 docs.python.org/3/library/subprocess#subprocess.Popen.send_signal
193 """
194 raise NotImplementedError
195
196 def terminate(self):
197 """Stop the subprocess.
198
199 Alias for close() method.
200
201 On Posix OSs the method sends SIGTERM to the subprocess.
202 On Windows the Win32 API function TerminateProcess()
203 is called to stop the subprocess.
204
205 See also:
206 http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate
207 """
208 raise NotImplementedError
209
210 def kill(self):
211 """Kill the subprocess.
212
213 On Posix OSs the function sends SIGKILL to the subprocess.
214 On Windows kill() is an alias for terminate().
215
216 See also:
217 http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
218 """
219 raise NotImplementedError
Yury Selivanov3cb99142014-02-18 18:41:13 -0500220
221
222class _FlowControlMixin(Transport):
223 """All the logic for (write) flow control in a mix-in base class.
224
225 The subclass must implement get_write_buffer_size(). It must call
226 _maybe_pause_protocol() whenever the write buffer size increases,
227 and _maybe_resume_protocol() whenever it decreases. It may also
228 override set_write_buffer_limits() (e.g. to specify different
229 defaults).
230
231 The subclass constructor must call super().__init__(extra). This
232 will call set_write_buffer_limits().
233
234 The user may call set_write_buffer_limits() and
235 get_write_buffer_size(), and their protocol's pause_writing() and
236 resume_writing() may be called.
237 """
238
Victor Stinner004adb92014-11-05 15:27:41 +0100239 def __init__(self, extra=None, loop=None):
Yury Selivanov3cb99142014-02-18 18:41:13 -0500240 super().__init__(extra)
Victor Stinner004adb92014-11-05 15:27:41 +0100241 assert loop is not None
242 self._loop = loop
Yury Selivanov3cb99142014-02-18 18:41:13 -0500243 self._protocol_paused = False
Yury Selivanov15899202014-02-19 11:10:52 -0500244 self._set_write_buffer_limits()
Yury Selivanov3cb99142014-02-18 18:41:13 -0500245
246 def _maybe_pause_protocol(self):
247 size = self.get_write_buffer_size()
248 if size <= self._high_water:
249 return
250 if not self._protocol_paused:
251 self._protocol_paused = True
252 try:
253 self._protocol.pause_writing()
254 except Exception as exc:
255 self._loop.call_exception_handler({
256 'message': 'protocol.pause_writing() failed',
257 'exception': exc,
258 'transport': self,
259 'protocol': self._protocol,
260 })
261
262 def _maybe_resume_protocol(self):
263 if (self._protocol_paused and
264 self.get_write_buffer_size() <= self._low_water):
265 self._protocol_paused = False
266 try:
267 self._protocol.resume_writing()
268 except Exception as exc:
269 self._loop.call_exception_handler({
270 'message': 'protocol.resume_writing() failed',
271 'exception': exc,
272 'transport': self,
273 'protocol': self._protocol,
274 })
275
Victor Stinner52bb9492014-08-26 00:22:28 +0200276 def get_write_buffer_limits(self):
277 return (self._low_water, self._high_water)
278
Yury Selivanov15899202014-02-19 11:10:52 -0500279 def _set_write_buffer_limits(self, high=None, low=None):
Yury Selivanov3cb99142014-02-18 18:41:13 -0500280 if high is None:
281 if low is None:
282 high = 64*1024
283 else:
284 high = 4*low
285 if low is None:
286 low = high // 4
287 if not high >= low >= 0:
288 raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
289 (high, low))
290 self._high_water = high
291 self._low_water = low
292
Yury Selivanov15899202014-02-19 11:10:52 -0500293 def set_write_buffer_limits(self, high=None, low=None):
294 self._set_write_buffer_limits(high=high, low=low)
295 self._maybe_pause_protocol()
296
Yury Selivanov3cb99142014-02-18 18:41:13 -0500297 def get_write_buffer_size(self):
298 raise NotImplementedError