blob: c848a21a8f26d229909a490c1f0c242322bb52f6 [file] [log] [blame]
Victor Stinner915bcb02014-02-01 22:49:59 +01001__all__ = ['create_subprocess_exec', 'create_subprocess_shell']
2
3import collections
4import subprocess
5
6from . import events
7from . import futures
8from . import protocols
9from . import streams
10from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020011from .coroutines import coroutine
Victor Stinneracdb7822014-07-14 18:33:40 +020012from .log import logger
Victor Stinner915bcb02014-02-01 22:49:59 +010013
14
15PIPE = subprocess.PIPE
16STDOUT = subprocess.STDOUT
17DEVNULL = subprocess.DEVNULL
18
19
20class SubprocessStreamProtocol(streams.FlowControlMixin,
21 protocols.SubprocessProtocol):
22 """Like StreamReaderProtocol, but for a subprocess."""
23
24 def __init__(self, limit, loop):
25 super().__init__(loop=loop)
26 self._limit = limit
27 self.stdin = self.stdout = self.stderr = None
28 self.waiter = futures.Future(loop=loop)
29 self._waiters = collections.deque()
30 self._transport = None
31
Victor Stinneracdb7822014-07-14 18:33:40 +020032 def __repr__(self):
33 info = [self.__class__.__name__]
34 if self.stdin is not None:
35 info.append('stdin=%r' % self.stdin)
36 if self.stdout is not None:
37 info.append('stdout=%r' % self.stdout)
38 if self.stderr is not None:
39 info.append('stderr=%r' % self.stderr)
40 return '<%s>' % ' '.join(info)
41
Victor Stinner915bcb02014-02-01 22:49:59 +010042 def connection_made(self, transport):
43 self._transport = transport
Victor Stinner5ef586f2014-11-25 17:20:33 +010044
45 stdout_transport = transport.get_pipe_transport(1)
46 if stdout_transport is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +010047 self.stdout = streams.StreamReader(limit=self._limit,
48 loop=self._loop)
Victor Stinner5ef586f2014-11-25 17:20:33 +010049 self.stdout.set_transport(stdout_transport)
50
51 stderr_transport = transport.get_pipe_transport(2)
52 if stderr_transport is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +010053 self.stderr = streams.StreamReader(limit=self._limit,
54 loop=self._loop)
Victor Stinner5ef586f2014-11-25 17:20:33 +010055 self.stderr.set_transport(stderr_transport)
56
57 stdin_transport = transport.get_pipe_transport(0)
58 if stdin_transport is not None:
59 self.stdin = streams.StreamWriter(stdin_transport,
Victor Stinner915bcb02014-02-01 22:49:59 +010060 protocol=self,
61 reader=None,
62 loop=self._loop)
Victor Stinnerf651a602015-01-14 02:10:33 +010063
64 if not self.waiter.cancelled():
65 self.waiter.set_result(None)
Victor Stinner915bcb02014-02-01 22:49:59 +010066
67 def pipe_data_received(self, fd, data):
68 if fd == 1:
69 reader = self.stdout
70 elif fd == 2:
71 reader = self.stderr
72 else:
73 reader = None
74 if reader is not None:
75 reader.feed_data(data)
76
77 def pipe_connection_lost(self, fd, exc):
78 if fd == 0:
79 pipe = self.stdin
80 if pipe is not None:
81 pipe.close()
82 self.connection_lost(exc)
83 return
84 if fd == 1:
85 reader = self.stdout
86 elif fd == 2:
87 reader = self.stderr
88 else:
89 reader = None
90 if reader != None:
91 if exc is None:
92 reader.feed_eof()
93 else:
94 reader.set_exception(exc)
95
96 def process_exited(self):
Victor Stinner915bcb02014-02-01 22:49:59 +010097 returncode = self._transport.get_returncode()
Victor Stinner791009b2015-01-15 13:16:02 +010098 self._transport.close()
99 self._transport = None
100
101 # wake up futures waiting for wait()
Victor Stinner915bcb02014-02-01 22:49:59 +0100102 while self._waiters:
103 waiter = self._waiters.popleft()
Victor Stinnerc447ba02015-01-06 01:13:49 +0100104 if not waiter.cancelled():
105 waiter.set_result(returncode)
Victor Stinner915bcb02014-02-01 22:49:59 +0100106
107
108class Process:
109 def __init__(self, transport, protocol, loop):
110 self._transport = transport
111 self._protocol = protocol
112 self._loop = loop
113 self.stdin = protocol.stdin
114 self.stdout = protocol.stdout
115 self.stderr = protocol.stderr
116 self.pid = transport.get_pid()
117
Victor Stinneracdb7822014-07-14 18:33:40 +0200118 def __repr__(self):
119 return '<%s %s>' % (self.__class__.__name__, self.pid)
120
Victor Stinner915bcb02014-02-01 22:49:59 +0100121 @property
122 def returncode(self):
123 return self._transport.get_returncode()
124
Victor Stinnerf951d282014-06-29 00:46:45 +0200125 @coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +0100126 def wait(self):
127 """Wait until the process exit and return the process return code."""
128 returncode = self._transport.get_returncode()
129 if returncode is not None:
130 return returncode
131
132 waiter = futures.Future(loop=self._loop)
133 self._protocol._waiters.append(waiter)
134 yield from waiter
135 return waiter.result()
136
Victor Stinner915bcb02014-02-01 22:49:59 +0100137 def _check_alive(self):
138 if self._transport.get_returncode() is not None:
139 raise ProcessLookupError()
140
141 def send_signal(self, signal):
142 self._check_alive()
143 self._transport.send_signal(signal)
144
145 def terminate(self):
146 self._check_alive()
147 self._transport.terminate()
148
149 def kill(self):
150 self._check_alive()
151 self._transport.kill()
152
Victor Stinnerf951d282014-06-29 00:46:45 +0200153 @coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +0100154 def _feed_stdin(self, input):
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200155 debug = self._loop.get_debug()
Victor Stinner915bcb02014-02-01 22:49:59 +0100156 self.stdin.write(input)
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200157 if debug:
Victor Stinneracdb7822014-07-14 18:33:40 +0200158 logger.debug('%r communicate: feed stdin (%s bytes)',
159 self, len(input))
Victor Stinnercc996b52014-07-17 12:25:27 +0200160 try:
161 yield from self.stdin.drain()
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200162 except (BrokenPipeError, ConnectionResetError) as exc:
163 # communicate() ignores BrokenPipeError and ConnectionResetError
164 if debug:
165 logger.debug('%r communicate: stdin got %r', self, exc)
Victor Stinneracdb7822014-07-14 18:33:40 +0200166
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200167 if debug:
Victor Stinneracdb7822014-07-14 18:33:40 +0200168 logger.debug('%r communicate: close stdin', self)
Victor Stinner915bcb02014-02-01 22:49:59 +0100169 self.stdin.close()
170
Victor Stinnerf951d282014-06-29 00:46:45 +0200171 @coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +0100172 def _noop(self):
173 return None
174
Victor Stinnerf951d282014-06-29 00:46:45 +0200175 @coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +0100176 def _read_stream(self, fd):
177 transport = self._transport.get_pipe_transport(fd)
178 if fd == 2:
179 stream = self.stderr
180 else:
181 assert fd == 1
182 stream = self.stdout
Victor Stinneracdb7822014-07-14 18:33:40 +0200183 if self._loop.get_debug():
184 name = 'stdout' if fd == 1 else 'stderr'
185 logger.debug('%r communicate: read %s', self, name)
Victor Stinner915bcb02014-02-01 22:49:59 +0100186 output = yield from stream.read()
Victor Stinneracdb7822014-07-14 18:33:40 +0200187 if self._loop.get_debug():
188 name = 'stdout' if fd == 1 else 'stderr'
189 logger.debug('%r communicate: close %s', self, name)
Victor Stinner915bcb02014-02-01 22:49:59 +0100190 transport.close()
191 return output
192
Victor Stinnerf951d282014-06-29 00:46:45 +0200193 @coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +0100194 def communicate(self, input=None):
Victor Stinner915bcb02014-02-01 22:49:59 +0100195 if input:
196 stdin = self._feed_stdin(input)
197 else:
198 stdin = self._noop()
199 if self.stdout is not None:
200 stdout = self._read_stream(1)
201 else:
202 stdout = self._noop()
203 if self.stderr is not None:
204 stderr = self._read_stream(2)
205 else:
206 stderr = self._noop()
207 stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr,
Victor Stinner8d0230b2014-02-20 10:12:59 +0100208 loop=self._loop)
Victor Stinner915bcb02014-02-01 22:49:59 +0100209 yield from self.wait()
210 return (stdout, stderr)
211
212
Victor Stinnerf951d282014-06-29 00:46:45 +0200213@coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +0100214def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
215 loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
216 if loop is None:
217 loop = events.get_event_loop()
218 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
219 loop=loop)
220 transport, protocol = yield from loop.subprocess_shell(
221 protocol_factory,
222 cmd, stdin=stdin, stdout=stdout,
223 stderr=stderr, **kwds)
Victor Stinnerf651a602015-01-14 02:10:33 +0100224 try:
225 yield from protocol.waiter
226 except:
227 transport._kill_wait()
228 raise
Victor Stinner915bcb02014-02-01 22:49:59 +0100229 return Process(transport, protocol, loop)
230
Victor Stinnerf951d282014-06-29 00:46:45 +0200231@coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500232def create_subprocess_exec(program, *args, stdin=None, stdout=None,
233 stderr=None, loop=None,
234 limit=streams._DEFAULT_LIMIT, **kwds):
Victor Stinner915bcb02014-02-01 22:49:59 +0100235 if loop is None:
236 loop = events.get_event_loop()
237 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
238 loop=loop)
239 transport, protocol = yield from loop.subprocess_exec(
240 protocol_factory,
Victor Stinner20e07432014-02-11 11:44:56 +0100241 program, *args,
242 stdin=stdin, stdout=stdout,
Victor Stinner915bcb02014-02-01 22:49:59 +0100243 stderr=stderr, **kwds)
Victor Stinnerf651a602015-01-14 02:10:33 +0100244 try:
245 yield from protocol.waiter
246 except:
247 transport._kill_wait()
248 raise
Victor Stinner915bcb02014-02-01 22:49:59 +0100249 return Process(transport, protocol, loop)