blob: 70b323f2db98836a2c3e54f1734638fc4c59436b [file] [log] [blame]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07001"""Abstract Transport class."""
2
Victor Stinner71080fc2015-07-25 02:23:21 +02003from asyncio import compat
Guido van Rossumf10345e2013-12-02 18:36:30 -08004
Guido van Rossumac97bf42013-12-20 14:16:21 -08005__all__ = ['BaseTransport', 'ReadTransport', 'WriteTransport',
6 'Transport', 'DatagramTransport', 'SubprocessTransport',
7 ]
Guido van Rossum27b7c7e2013-10-17 13:40:50 -07008
9
10class BaseTransport:
Guido van Rossum9204af42013-11-30 15:35:42 -080011 """Base class for transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070012
13 def __init__(self, extra=None):
14 if extra is None:
15 extra = {}
16 self._extra = extra
17
18 def get_extra_info(self, name, default=None):
19 """Get optional transport information."""
20 return self._extra.get(name, default)
21
22 def close(self):
Antoine Pitroudec43382013-11-23 12:30:00 +010023 """Close the transport.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070024
25 Buffered data will be flushed asynchronously. No more data
26 will be received. After all buffered data is flushed, the
27 protocol's connection_lost() method will (eventually) called
28 with None as its argument.
29 """
30 raise NotImplementedError
31
32
33class ReadTransport(BaseTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -080034 """Interface for read-only transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070035
Guido van Rossum57497ad2013-10-18 07:58:20 -070036 def pause_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070037 """Pause the receiving end.
38
39 No data will be passed to the protocol's data_received()
Guido van Rossum57497ad2013-10-18 07:58:20 -070040 method until resume_reading() is called.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070041 """
42 raise NotImplementedError
43
Guido van Rossum57497ad2013-10-18 07:58:20 -070044 def resume_reading(self):
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070045 """Resume the receiving end.
46
47 Data received will once again be passed to the protocol's
48 data_received() method.
49 """
50 raise NotImplementedError
51
52
53class WriteTransport(BaseTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -080054 """Interface for write-only transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070055
Guido van Rossum355491d2013-10-18 15:17:11 -070056 def set_write_buffer_limits(self, high=None, low=None):
57 """Set the high- and low-water limits for write flow control.
58
59 These two values control when to call the protocol's
60 pause_writing() and resume_writing() methods. If specified,
61 the low-water limit must be less than or equal to the
62 high-water limit. Neither value can be negative.
63
64 The defaults are implementation-specific. If only the
65 high-water limit is given, the low-water limit defaults to a
66 implementation-specific value less than or equal to the
67 high-water limit. Setting high to zero forces low to zero as
68 well, and causes pause_writing() to be called whenever the
69 buffer becomes non-empty. Setting low to zero causes
70 resume_writing() to be called only once the buffer is empty.
71 Use of zero for either limit is generally sub-optimal as it
72 reduces opportunities for doing I/O and computation
73 concurrently.
74 """
75 raise NotImplementedError
76
77 def get_write_buffer_size(self):
78 """Return the current size of the write buffer."""
79 raise NotImplementedError
80
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070081 def write(self, data):
82 """Write some data bytes to the transport.
83
84 This does not block; it buffers the data and arranges for it
85 to be sent out asynchronously.
86 """
87 raise NotImplementedError
88
89 def writelines(self, list_of_data):
90 """Write a list (or any iterable) of data bytes to the transport.
91
Guido van Rossumf10345e2013-12-02 18:36:30 -080092 The default implementation concatenates the arguments and
93 calls write() on the result.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070094 """
Victor Stinner71080fc2015-07-25 02:23:21 +020095 data = compat.flatten_list_bytes(list_of_data)
96 self.write(data)
Guido van Rossum27b7c7e2013-10-17 13:40:50 -070097
98 def write_eof(self):
Antoine Pitroudec43382013-11-23 12:30:00 +010099 """Close the write end after flushing buffered data.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700100
101 (This is like typing ^D into a UNIX program reading from stdin.)
102
103 Data may still be received.
104 """
105 raise NotImplementedError
106
107 def can_write_eof(self):
Antoine Pitroudec43382013-11-23 12:30:00 +0100108 """Return True if this transport supports write_eof(), False if not."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700109 raise NotImplementedError
110
111 def abort(self):
Guido van Rossum488b0da2013-11-23 11:51:09 -0800112 """Close the transport immediately.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700113
114 Buffered data will be lost. No more data will be received.
115 The protocol's connection_lost() method will (eventually) be
116 called with None as its argument.
117 """
118 raise NotImplementedError
119
120
121class Transport(ReadTransport, WriteTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -0800122 """Interface representing a bidirectional transport.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700123
124 There may be several implementations, but typically, the user does
125 not implement new transports; rather, the platform provides some
126 useful transports that are implemented using the platform's best
127 practices.
128
129 The user never instantiates a transport directly; they call a
130 utility function, passing it a protocol factory and other
131 information necessary to create the transport and protocol. (E.g.
132 EventLoop.create_connection() or EventLoop.create_server().)
133
134 The utility function will asynchronously create a transport and a
135 protocol and hook them up by calling the protocol's
136 connection_made() method, passing it the transport.
137
138 The implementation here raises NotImplemented for every method
139 except writelines(), which calls write() in a loop.
140 """
141
142
143class DatagramTransport(BaseTransport):
Guido van Rossum9204af42013-11-30 15:35:42 -0800144 """Interface for datagram (UDP) transports."""
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700145
146 def sendto(self, data, addr=None):
147 """Send data to the transport.
148
149 This does not block; it buffers the data and arranges for it
150 to be sent out asynchronously.
151 addr is target socket address.
152 If addr is None use target address pointed on transport creation.
153 """
154 raise NotImplementedError
155
156 def abort(self):
Antoine Pitroudec43382013-11-23 12:30:00 +0100157 """Close the transport immediately.
Guido van Rossum27b7c7e2013-10-17 13:40:50 -0700158
159 Buffered data will be lost. No more data will be received.
160 The protocol's connection_lost() method will (eventually) be
161 called with None as its argument.
162 """
163 raise NotImplementedError
164
165
166class SubprocessTransport(BaseTransport):
167
168 def get_pid(self):
169 """Get subprocess id."""
170 raise NotImplementedError
171
172 def get_returncode(self):
173 """Get subprocess returncode.
174
175 See also
176 http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode
177 """
178 raise NotImplementedError
179
180 def get_pipe_transport(self, fd):
181 """Get transport for pipe with number fd."""
182 raise NotImplementedError
183
184 def send_signal(self, signal):
185 """Send signal to subprocess.
186
187 See also:
188 docs.python.org/3/library/subprocess#subprocess.Popen.send_signal
189 """
190 raise NotImplementedError
191
192 def terminate(self):
193 """Stop the subprocess.
194
195 Alias for close() method.
196
197 On Posix OSs the method sends SIGTERM to the subprocess.
198 On Windows the Win32 API function TerminateProcess()
199 is called to stop the subprocess.
200
201 See also:
202 http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate
203 """
204 raise NotImplementedError
205
206 def kill(self):
207 """Kill the subprocess.
208
209 On Posix OSs the function sends SIGKILL to the subprocess.
210 On Windows kill() is an alias for terminate().
211
212 See also:
213 http://docs.python.org/3/library/subprocess#subprocess.Popen.kill
214 """
215 raise NotImplementedError
Yury Selivanov3cb99142014-02-18 18:41:13 -0500216
217
218class _FlowControlMixin(Transport):
219 """All the logic for (write) flow control in a mix-in base class.
220
221 The subclass must implement get_write_buffer_size(). It must call
222 _maybe_pause_protocol() whenever the write buffer size increases,
223 and _maybe_resume_protocol() whenever it decreases. It may also
224 override set_write_buffer_limits() (e.g. to specify different
225 defaults).
226
227 The subclass constructor must call super().__init__(extra). This
228 will call set_write_buffer_limits().
229
230 The user may call set_write_buffer_limits() and
231 get_write_buffer_size(), and their protocol's pause_writing() and
232 resume_writing() may be called.
233 """
234
Victor Stinner004adb92014-11-05 15:27:41 +0100235 def __init__(self, extra=None, loop=None):
Yury Selivanov3cb99142014-02-18 18:41:13 -0500236 super().__init__(extra)
Victor Stinner004adb92014-11-05 15:27:41 +0100237 assert loop is not None
238 self._loop = loop
Yury Selivanov3cb99142014-02-18 18:41:13 -0500239 self._protocol_paused = False
Yury Selivanov15899202014-02-19 11:10:52 -0500240 self._set_write_buffer_limits()
Yury Selivanov3cb99142014-02-18 18:41:13 -0500241
242 def _maybe_pause_protocol(self):
243 size = self.get_write_buffer_size()
244 if size <= self._high_water:
245 return
246 if not self._protocol_paused:
247 self._protocol_paused = True
248 try:
249 self._protocol.pause_writing()
250 except Exception as exc:
251 self._loop.call_exception_handler({
252 'message': 'protocol.pause_writing() failed',
253 'exception': exc,
254 'transport': self,
255 'protocol': self._protocol,
256 })
257
258 def _maybe_resume_protocol(self):
259 if (self._protocol_paused and
260 self.get_write_buffer_size() <= self._low_water):
261 self._protocol_paused = False
262 try:
263 self._protocol.resume_writing()
264 except Exception as exc:
265 self._loop.call_exception_handler({
266 'message': 'protocol.resume_writing() failed',
267 'exception': exc,
268 'transport': self,
269 'protocol': self._protocol,
270 })
271
Victor Stinner52bb9492014-08-26 00:22:28 +0200272 def get_write_buffer_limits(self):
273 return (self._low_water, self._high_water)
274
Yury Selivanov15899202014-02-19 11:10:52 -0500275 def _set_write_buffer_limits(self, high=None, low=None):
Yury Selivanov3cb99142014-02-18 18:41:13 -0500276 if high is None:
277 if low is None:
278 high = 64*1024
279 else:
280 high = 4*low
281 if low is None:
282 low = high // 4
283 if not high >= low >= 0:
284 raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
285 (high, low))
286 self._high_water = high
287 self._low_water = low
288
Yury Selivanov15899202014-02-19 11:10:52 -0500289 def set_write_buffer_limits(self, high=None, low=None):
290 self._set_write_buffer_limits(high=high, low=low)
291 self._maybe_pause_protocol()
292
Yury Selivanov3cb99142014-02-18 18:41:13 -0500293 def get_write_buffer_size(self):
294 raise NotImplementedError