blob: 90fc00de8339fb2c70cb19c5782e1a7cdc0d8a01 [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = 'create_subprocess_exec', 'create_subprocess_shell'
Victor Stinner915bcb02014-02-01 22:49:59 +01002
Victor Stinner915bcb02014-02-01 22:49:59 +01003import subprocess
4
5from . import events
Victor Stinner915bcb02014-02-01 22:49:59 +01006from . import protocols
7from . import streams
8from . import tasks
Victor Stinneracdb7822014-07-14 18:33:40 +02009from .log import logger
Victor Stinner915bcb02014-02-01 22:49:59 +010010
11
12PIPE = subprocess.PIPE
13STDOUT = subprocess.STDOUT
14DEVNULL = subprocess.DEVNULL
15
16
17class SubprocessStreamProtocol(streams.FlowControlMixin,
18 protocols.SubprocessProtocol):
19 """Like StreamReaderProtocol, but for a subprocess."""
20
21 def __init__(self, limit, loop):
22 super().__init__(loop=loop)
23 self._limit = limit
24 self.stdin = self.stdout = self.stderr = None
Victor Stinner915bcb02014-02-01 22:49:59 +010025 self._transport = None
Seth M. Larson481cb702017-03-02 22:21:18 -060026 self._process_exited = False
27 self._pipe_fds = []
Victor Stinner915bcb02014-02-01 22:49:59 +010028
Victor Stinneracdb7822014-07-14 18:33:40 +020029 def __repr__(self):
30 info = [self.__class__.__name__]
31 if self.stdin is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -050032 info.append(f'stdin={self.stdin!r}')
Victor Stinneracdb7822014-07-14 18:33:40 +020033 if self.stdout is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -050034 info.append(f'stdout={self.stdout!r}')
Victor Stinneracdb7822014-07-14 18:33:40 +020035 if self.stderr is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -050036 info.append(f'stderr={self.stderr!r}')
37 return '<{}>'.format(' '.join(info))
Victor Stinneracdb7822014-07-14 18:33:40 +020038
Victor Stinner915bcb02014-02-01 22:49:59 +010039 def connection_made(self, transport):
40 self._transport = transport
Victor Stinner5ef586f2014-11-25 17:20:33 +010041
42 stdout_transport = transport.get_pipe_transport(1)
43 if stdout_transport is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +010044 self.stdout = streams.StreamReader(limit=self._limit,
45 loop=self._loop)
Victor Stinner5ef586f2014-11-25 17:20:33 +010046 self.stdout.set_transport(stdout_transport)
Seth M. Larson481cb702017-03-02 22:21:18 -060047 self._pipe_fds.append(1)
Victor Stinner5ef586f2014-11-25 17:20:33 +010048
49 stderr_transport = transport.get_pipe_transport(2)
50 if stderr_transport is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +010051 self.stderr = streams.StreamReader(limit=self._limit,
52 loop=self._loop)
Victor Stinner5ef586f2014-11-25 17:20:33 +010053 self.stderr.set_transport(stderr_transport)
Seth M. Larson481cb702017-03-02 22:21:18 -060054 self._pipe_fds.append(2)
Victor Stinner5ef586f2014-11-25 17:20:33 +010055
56 stdin_transport = transport.get_pipe_transport(0)
57 if stdin_transport is not None:
58 self.stdin = streams.StreamWriter(stdin_transport,
Victor Stinner915bcb02014-02-01 22:49:59 +010059 protocol=self,
60 reader=None,
61 loop=self._loop)
Victor Stinnerf651a602015-01-14 02:10:33 +010062
Victor Stinner915bcb02014-02-01 22:49:59 +010063 def pipe_data_received(self, fd, data):
64 if fd == 1:
65 reader = self.stdout
66 elif fd == 2:
67 reader = self.stderr
68 else:
69 reader = None
70 if reader is not None:
71 reader.feed_data(data)
72
73 def pipe_connection_lost(self, fd, exc):
74 if fd == 0:
75 pipe = self.stdin
76 if pipe is not None:
77 pipe.close()
78 self.connection_lost(exc)
79 return
80 if fd == 1:
81 reader = self.stdout
82 elif fd == 2:
83 reader = self.stderr
84 else:
85 reader = None
Yury Selivanov6370f342017-12-10 18:36:12 -050086 if reader is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +010087 if exc is None:
88 reader.feed_eof()
89 else:
90 reader.set_exception(exc)
Yury Selivanov2f156452017-03-02 23:25:31 -050091
Seth M. Larson481cb702017-03-02 22:21:18 -060092 if fd in self._pipe_fds:
93 self._pipe_fds.remove(fd)
94 self._maybe_close_transport()
Victor Stinner915bcb02014-02-01 22:49:59 +010095
96 def process_exited(self):
Seth M. Larson481cb702017-03-02 22:21:18 -060097 self._process_exited = True
98 self._maybe_close_transport()
Yury Selivanov2f156452017-03-02 23:25:31 -050099
Seth M. Larson481cb702017-03-02 22:21:18 -0600100 def _maybe_close_transport(self):
101 if len(self._pipe_fds) == 0 and self._process_exited:
102 self._transport.close()
103 self._transport = None
Victor Stinner791009b2015-01-15 13:16:02 +0100104
Victor Stinner915bcb02014-02-01 22:49:59 +0100105
106class Process:
107 def __init__(self, transport, protocol, loop):
108 self._transport = transport
109 self._protocol = protocol
110 self._loop = loop
111 self.stdin = protocol.stdin
112 self.stdout = protocol.stdout
113 self.stderr = protocol.stderr
114 self.pid = transport.get_pid()
115
Victor Stinneracdb7822014-07-14 18:33:40 +0200116 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500117 return f'<{self.__class__.__name__} {self.pid}>'
Victor Stinneracdb7822014-07-14 18:33:40 +0200118
Victor Stinner915bcb02014-02-01 22:49:59 +0100119 @property
120 def returncode(self):
121 return self._transport.get_returncode()
122
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200123 async def wait(self):
124 """Wait until the process exit and return the process return code."""
125 return await self._transport._wait()
Victor Stinner915bcb02014-02-01 22:49:59 +0100126
127 def send_signal(self, signal):
Victor Stinner915bcb02014-02-01 22:49:59 +0100128 self._transport.send_signal(signal)
129
130 def terminate(self):
Victor Stinner915bcb02014-02-01 22:49:59 +0100131 self._transport.terminate()
132
133 def kill(self):
Victor Stinner915bcb02014-02-01 22:49:59 +0100134 self._transport.kill()
135
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200136 async def _feed_stdin(self, input):
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200137 debug = self._loop.get_debug()
Victor Stinner915bcb02014-02-01 22:49:59 +0100138 self.stdin.write(input)
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200139 if debug:
Yury Selivanov6370f342017-12-10 18:36:12 -0500140 logger.debug(
141 '%r communicate: feed stdin (%s bytes)', self, len(input))
Victor Stinnercc996b52014-07-17 12:25:27 +0200142 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200143 await self.stdin.drain()
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200144 except (BrokenPipeError, ConnectionResetError) as exc:
145 # communicate() ignores BrokenPipeError and ConnectionResetError
146 if debug:
147 logger.debug('%r communicate: stdin got %r', self, exc)
Victor Stinneracdb7822014-07-14 18:33:40 +0200148
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200149 if debug:
Victor Stinneracdb7822014-07-14 18:33:40 +0200150 logger.debug('%r communicate: close stdin', self)
Victor Stinner915bcb02014-02-01 22:49:59 +0100151 self.stdin.close()
152
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200153 async def _noop(self):
Victor Stinner915bcb02014-02-01 22:49:59 +0100154 return None
155
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200156 async def _read_stream(self, fd):
Victor Stinner915bcb02014-02-01 22:49:59 +0100157 transport = self._transport.get_pipe_transport(fd)
158 if fd == 2:
159 stream = self.stderr
160 else:
161 assert fd == 1
162 stream = self.stdout
Victor Stinneracdb7822014-07-14 18:33:40 +0200163 if self._loop.get_debug():
164 name = 'stdout' if fd == 1 else 'stderr'
165 logger.debug('%r communicate: read %s', self, name)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200166 output = await stream.read()
Victor Stinneracdb7822014-07-14 18:33:40 +0200167 if self._loop.get_debug():
168 name = 'stdout' if fd == 1 else 'stderr'
169 logger.debug('%r communicate: close %s', self, name)
Victor Stinner915bcb02014-02-01 22:49:59 +0100170 transport.close()
171 return output
172
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200173 async def communicate(self, input=None):
Yury Selivanov7657f6b2016-05-13 15:35:28 -0400174 if input is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +0100175 stdin = self._feed_stdin(input)
176 else:
177 stdin = self._noop()
178 if self.stdout is not None:
179 stdout = self._read_stream(1)
180 else:
181 stdout = self._noop()
182 if self.stderr is not None:
183 stderr = self._read_stream(2)
184 else:
185 stderr = self._noop()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200186 stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr,
187 loop=self._loop)
188 await self.wait()
Victor Stinner915bcb02014-02-01 22:49:59 +0100189 return (stdout, stderr)
190
191
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200192async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
193 loop=None, limit=streams._DEFAULT_LIMIT,
194 **kwds):
Victor Stinner915bcb02014-02-01 22:49:59 +0100195 if loop is None:
196 loop = events.get_event_loop()
197 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
198 loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200199 transport, protocol = await loop.subprocess_shell(
200 protocol_factory,
201 cmd, stdin=stdin, stdout=stdout,
202 stderr=stderr, **kwds)
Victor Stinner915bcb02014-02-01 22:49:59 +0100203 return Process(transport, protocol, loop)
204
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200205
206async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
207 stderr=None, loop=None,
208 limit=streams._DEFAULT_LIMIT, **kwds):
Victor Stinner915bcb02014-02-01 22:49:59 +0100209 if loop is None:
210 loop = events.get_event_loop()
211 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
212 loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200213 transport, protocol = await loop.subprocess_exec(
214 protocol_factory,
215 program, *args,
216 stdin=stdin, stdout=stdout,
217 stderr=stderr, **kwds)
Victor Stinner915bcb02014-02-01 22:49:59 +0100218 return Process(transport, protocol, loop)