blob: d34b6118fdcf72b30dd51edbbd7482810156b83a [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = 'create_subprocess_exec', 'create_subprocess_shell'
Victor Stinner915bcb02014-02-01 22:49:59 +01002
Victor Stinner915bcb02014-02-01 22:49:59 +01003import subprocess
Andrew Svetlovad4ed872019-05-06 22:52:11 -04004import warnings
Victor Stinner915bcb02014-02-01 22:49:59 +01005
6from . import events
Victor Stinner915bcb02014-02-01 22:49:59 +01007from . import protocols
8from . import streams
9from . import tasks
Victor Stinneracdb7822014-07-14 18:33:40 +020010from .log import logger
Victor Stinner915bcb02014-02-01 22:49:59 +010011
12
13PIPE = subprocess.PIPE
14STDOUT = subprocess.STDOUT
15DEVNULL = subprocess.DEVNULL
16
17
18class SubprocessStreamProtocol(streams.FlowControlMixin,
19 protocols.SubprocessProtocol):
20 """Like StreamReaderProtocol, but for a subprocess."""
21
Andrew Svetlovad4ed872019-05-06 22:52:11 -040022 def __init__(self, limit, loop, *, _asyncio_internal=False):
23 super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
Victor Stinner915bcb02014-02-01 22:49:59 +010024 self._limit = limit
25 self.stdin = self.stdout = self.stderr = None
Victor Stinner915bcb02014-02-01 22:49:59 +010026 self._transport = None
Seth M. Larson481cb702017-03-02 22:21:18 -060027 self._process_exited = False
28 self._pipe_fds = []
Andrew Svetlov1cc0ee72019-05-07 16:53:19 -040029 self._stdin_closed = self._loop.create_future()
Victor Stinner915bcb02014-02-01 22:49:59 +010030
Victor Stinneracdb7822014-07-14 18:33:40 +020031 def __repr__(self):
32 info = [self.__class__.__name__]
33 if self.stdin is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -050034 info.append(f'stdin={self.stdin!r}')
Victor Stinneracdb7822014-07-14 18:33:40 +020035 if self.stdout is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -050036 info.append(f'stdout={self.stdout!r}')
Victor Stinneracdb7822014-07-14 18:33:40 +020037 if self.stderr is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -050038 info.append(f'stderr={self.stderr!r}')
39 return '<{}>'.format(' '.join(info))
Victor Stinneracdb7822014-07-14 18:33:40 +020040
Victor Stinner915bcb02014-02-01 22:49:59 +010041 def connection_made(self, transport):
42 self._transport = transport
Victor Stinner5ef586f2014-11-25 17:20:33 +010043
44 stdout_transport = transport.get_pipe_transport(1)
45 if stdout_transport is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +010046 self.stdout = streams.StreamReader(limit=self._limit,
Andrew Svetlovad4ed872019-05-06 22:52:11 -040047 loop=self._loop,
48 _asyncio_internal=True)
Victor Stinner5ef586f2014-11-25 17:20:33 +010049 self.stdout.set_transport(stdout_transport)
Seth M. Larson481cb702017-03-02 22:21:18 -060050 self._pipe_fds.append(1)
Victor Stinner5ef586f2014-11-25 17:20:33 +010051
52 stderr_transport = transport.get_pipe_transport(2)
53 if stderr_transport is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +010054 self.stderr = streams.StreamReader(limit=self._limit,
Andrew Svetlovad4ed872019-05-06 22:52:11 -040055 loop=self._loop,
56 _asyncio_internal=True)
Victor Stinner5ef586f2014-11-25 17:20:33 +010057 self.stderr.set_transport(stderr_transport)
Seth M. Larson481cb702017-03-02 22:21:18 -060058 self._pipe_fds.append(2)
Victor Stinner5ef586f2014-11-25 17:20:33 +010059
60 stdin_transport = transport.get_pipe_transport(0)
61 if stdin_transport is not None:
62 self.stdin = streams.StreamWriter(stdin_transport,
Victor Stinner915bcb02014-02-01 22:49:59 +010063 protocol=self,
64 reader=None,
Andrew Svetlovad4ed872019-05-06 22:52:11 -040065 loop=self._loop,
66 _asyncio_internal=True)
Victor Stinnerf651a602015-01-14 02:10:33 +010067
Victor Stinner915bcb02014-02-01 22:49:59 +010068 def pipe_data_received(self, fd, data):
69 if fd == 1:
70 reader = self.stdout
71 elif fd == 2:
72 reader = self.stderr
73 else:
74 reader = None
75 if reader is not None:
76 reader.feed_data(data)
77
78 def pipe_connection_lost(self, fd, exc):
79 if fd == 0:
80 pipe = self.stdin
81 if pipe is not None:
82 pipe.close()
83 self.connection_lost(exc)
Andrew Svetlov1cc0ee72019-05-07 16:53:19 -040084 if exc is None:
85 self._stdin_closed.set_result(None)
86 else:
87 self._stdin_closed.set_exception(exc)
Victor Stinner915bcb02014-02-01 22:49:59 +010088 return
89 if fd == 1:
90 reader = self.stdout
91 elif fd == 2:
92 reader = self.stderr
93 else:
94 reader = None
Yury Selivanov6370f342017-12-10 18:36:12 -050095 if reader is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +010096 if exc is None:
97 reader.feed_eof()
98 else:
99 reader.set_exception(exc)
Yury Selivanov2f156452017-03-02 23:25:31 -0500100
Seth M. Larson481cb702017-03-02 22:21:18 -0600101 if fd in self._pipe_fds:
102 self._pipe_fds.remove(fd)
103 self._maybe_close_transport()
Victor Stinner915bcb02014-02-01 22:49:59 +0100104
105 def process_exited(self):
Seth M. Larson481cb702017-03-02 22:21:18 -0600106 self._process_exited = True
107 self._maybe_close_transport()
Yury Selivanov2f156452017-03-02 23:25:31 -0500108
Seth M. Larson481cb702017-03-02 22:21:18 -0600109 def _maybe_close_transport(self):
110 if len(self._pipe_fds) == 0 and self._process_exited:
111 self._transport.close()
112 self._transport = None
Victor Stinner791009b2015-01-15 13:16:02 +0100113
Andrew Svetlov1cc0ee72019-05-07 16:53:19 -0400114 def _get_close_waiter(self, stream):
115 if stream is self.stdin:
116 return self._stdin_closed
117
Victor Stinner915bcb02014-02-01 22:49:59 +0100118
119class Process:
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400120 def __init__(self, transport, protocol, loop, *, _asyncio_internal=False):
121 if not _asyncio_internal:
122 warnings.warn(f"{self.__class__} should be instaniated "
123 "by asyncio internals only, "
124 "please avoid its creation from user code",
125 DeprecationWarning)
126
Victor Stinner915bcb02014-02-01 22:49:59 +0100127 self._transport = transport
128 self._protocol = protocol
129 self._loop = loop
130 self.stdin = protocol.stdin
131 self.stdout = protocol.stdout
132 self.stderr = protocol.stderr
133 self.pid = transport.get_pid()
134
Victor Stinneracdb7822014-07-14 18:33:40 +0200135 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500136 return f'<{self.__class__.__name__} {self.pid}>'
Victor Stinneracdb7822014-07-14 18:33:40 +0200137
Victor Stinner915bcb02014-02-01 22:49:59 +0100138 @property
139 def returncode(self):
140 return self._transport.get_returncode()
141
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200142 async def wait(self):
143 """Wait until the process exit and return the process return code."""
144 return await self._transport._wait()
Victor Stinner915bcb02014-02-01 22:49:59 +0100145
146 def send_signal(self, signal):
Victor Stinner915bcb02014-02-01 22:49:59 +0100147 self._transport.send_signal(signal)
148
149 def terminate(self):
Victor Stinner915bcb02014-02-01 22:49:59 +0100150 self._transport.terminate()
151
152 def kill(self):
Victor Stinner915bcb02014-02-01 22:49:59 +0100153 self._transport.kill()
154
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200155 async def _feed_stdin(self, input):
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200156 debug = self._loop.get_debug()
Victor Stinner915bcb02014-02-01 22:49:59 +0100157 self.stdin.write(input)
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200158 if debug:
Yury Selivanov6370f342017-12-10 18:36:12 -0500159 logger.debug(
160 '%r communicate: feed stdin (%s bytes)', self, len(input))
Victor Stinnercc996b52014-07-17 12:25:27 +0200161 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200162 await self.stdin.drain()
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200163 except (BrokenPipeError, ConnectionResetError) as exc:
164 # communicate() ignores BrokenPipeError and ConnectionResetError
165 if debug:
166 logger.debug('%r communicate: stdin got %r', self, exc)
Victor Stinneracdb7822014-07-14 18:33:40 +0200167
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200168 if debug:
Victor Stinneracdb7822014-07-14 18:33:40 +0200169 logger.debug('%r communicate: close stdin', self)
Victor Stinner915bcb02014-02-01 22:49:59 +0100170 self.stdin.close()
171
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200172 async def _noop(self):
Victor Stinner915bcb02014-02-01 22:49:59 +0100173 return None
174
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200175 async def _read_stream(self, fd):
Victor Stinner915bcb02014-02-01 22:49:59 +0100176 transport = self._transport.get_pipe_transport(fd)
177 if fd == 2:
178 stream = self.stderr
179 else:
180 assert fd == 1
181 stream = self.stdout
Victor Stinneracdb7822014-07-14 18:33:40 +0200182 if self._loop.get_debug():
183 name = 'stdout' if fd == 1 else 'stderr'
184 logger.debug('%r communicate: read %s', self, name)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200185 output = await stream.read()
Victor Stinneracdb7822014-07-14 18:33:40 +0200186 if self._loop.get_debug():
187 name = 'stdout' if fd == 1 else 'stderr'
188 logger.debug('%r communicate: close %s', self, name)
Victor Stinner915bcb02014-02-01 22:49:59 +0100189 transport.close()
190 return output
191
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200192 async def communicate(self, input=None):
Yury Selivanov7657f6b2016-05-13 15:35:28 -0400193 if input is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +0100194 stdin = self._feed_stdin(input)
195 else:
196 stdin = self._noop()
197 if self.stdout is not None:
198 stdout = self._read_stream(1)
199 else:
200 stdout = self._noop()
201 if self.stderr is not None:
202 stderr = self._read_stream(2)
203 else:
204 stderr = self._noop()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200205 stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr,
206 loop=self._loop)
207 await self.wait()
Victor Stinner915bcb02014-02-01 22:49:59 +0100208 return (stdout, stderr)
209
210
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200211async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
212 loop=None, limit=streams._DEFAULT_LIMIT,
213 **kwds):
Victor Stinner915bcb02014-02-01 22:49:59 +0100214 if loop is None:
215 loop = events.get_event_loop()
216 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400217 loop=loop,
218 _asyncio_internal=True)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200219 transport, protocol = await loop.subprocess_shell(
220 protocol_factory,
221 cmd, stdin=stdin, stdout=stdout,
222 stderr=stderr, **kwds)
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400223 return Process(transport, protocol, loop, _asyncio_internal=True)
Victor Stinner915bcb02014-02-01 22:49:59 +0100224
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200225
226async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
227 stderr=None, loop=None,
228 limit=streams._DEFAULT_LIMIT, **kwds):
Victor Stinner915bcb02014-02-01 22:49:59 +0100229 if loop is None:
230 loop = events.get_event_loop()
231 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400232 loop=loop,
233 _asyncio_internal=True)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200234 transport, protocol = await loop.subprocess_exec(
235 protocol_factory,
236 program, *args,
237 stdin=stdin, stdout=stdout,
238 stderr=stderr, **kwds)
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400239 return Process(transport, protocol, loop, _asyncio_internal=True)