blob: fa58e1e85862395ad0e76bada90479491d796b2f [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = 'create_subprocess_exec', 'create_subprocess_shell'
Victor Stinner915bcb02014-02-01 22:49:59 +01002
Victor Stinner915bcb02014-02-01 22:49:59 +01003import subprocess
Andrew Svetlovad4ed872019-05-06 22:52:11 -04004import warnings
Victor Stinner915bcb02014-02-01 22:49:59 +01005
6from . import events
Victor Stinner915bcb02014-02-01 22:49:59 +01007from . import protocols
8from . import streams
9from . import tasks
Victor Stinneracdb7822014-07-14 18:33:40 +020010from .log import logger
Victor Stinner915bcb02014-02-01 22:49:59 +010011
12
13PIPE = subprocess.PIPE
14STDOUT = subprocess.STDOUT
15DEVNULL = subprocess.DEVNULL
16
17
18class SubprocessStreamProtocol(streams.FlowControlMixin,
19 protocols.SubprocessProtocol):
20 """Like StreamReaderProtocol, but for a subprocess."""
21
Andrew Svetlovad4ed872019-05-06 22:52:11 -040022 def __init__(self, limit, loop, *, _asyncio_internal=False):
23 super().__init__(loop=loop, _asyncio_internal=_asyncio_internal)
Victor Stinner915bcb02014-02-01 22:49:59 +010024 self._limit = limit
25 self.stdin = self.stdout = self.stderr = None
Victor Stinner915bcb02014-02-01 22:49:59 +010026 self._transport = None
Seth M. Larson481cb702017-03-02 22:21:18 -060027 self._process_exited = False
28 self._pipe_fds = []
Victor Stinner915bcb02014-02-01 22:49:59 +010029
Victor Stinneracdb7822014-07-14 18:33:40 +020030 def __repr__(self):
31 info = [self.__class__.__name__]
32 if self.stdin is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -050033 info.append(f'stdin={self.stdin!r}')
Victor Stinneracdb7822014-07-14 18:33:40 +020034 if self.stdout is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -050035 info.append(f'stdout={self.stdout!r}')
Victor Stinneracdb7822014-07-14 18:33:40 +020036 if self.stderr is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -050037 info.append(f'stderr={self.stderr!r}')
38 return '<{}>'.format(' '.join(info))
Victor Stinneracdb7822014-07-14 18:33:40 +020039
Victor Stinner915bcb02014-02-01 22:49:59 +010040 def connection_made(self, transport):
41 self._transport = transport
Victor Stinner5ef586f2014-11-25 17:20:33 +010042
43 stdout_transport = transport.get_pipe_transport(1)
44 if stdout_transport is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +010045 self.stdout = streams.StreamReader(limit=self._limit,
Andrew Svetlovad4ed872019-05-06 22:52:11 -040046 loop=self._loop,
47 _asyncio_internal=True)
Victor Stinner5ef586f2014-11-25 17:20:33 +010048 self.stdout.set_transport(stdout_transport)
Seth M. Larson481cb702017-03-02 22:21:18 -060049 self._pipe_fds.append(1)
Victor Stinner5ef586f2014-11-25 17:20:33 +010050
51 stderr_transport = transport.get_pipe_transport(2)
52 if stderr_transport is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +010053 self.stderr = streams.StreamReader(limit=self._limit,
Andrew Svetlovad4ed872019-05-06 22:52:11 -040054 loop=self._loop,
55 _asyncio_internal=True)
Victor Stinner5ef586f2014-11-25 17:20:33 +010056 self.stderr.set_transport(stderr_transport)
Seth M. Larson481cb702017-03-02 22:21:18 -060057 self._pipe_fds.append(2)
Victor Stinner5ef586f2014-11-25 17:20:33 +010058
59 stdin_transport = transport.get_pipe_transport(0)
60 if stdin_transport is not None:
61 self.stdin = streams.StreamWriter(stdin_transport,
Victor Stinner915bcb02014-02-01 22:49:59 +010062 protocol=self,
63 reader=None,
Andrew Svetlovad4ed872019-05-06 22:52:11 -040064 loop=self._loop,
65 _asyncio_internal=True)
Victor Stinnerf651a602015-01-14 02:10:33 +010066
Victor Stinner915bcb02014-02-01 22:49:59 +010067 def pipe_data_received(self, fd, data):
68 if fd == 1:
69 reader = self.stdout
70 elif fd == 2:
71 reader = self.stderr
72 else:
73 reader = None
74 if reader is not None:
75 reader.feed_data(data)
76
77 def pipe_connection_lost(self, fd, exc):
78 if fd == 0:
79 pipe = self.stdin
80 if pipe is not None:
81 pipe.close()
82 self.connection_lost(exc)
83 return
84 if fd == 1:
85 reader = self.stdout
86 elif fd == 2:
87 reader = self.stderr
88 else:
89 reader = None
Yury Selivanov6370f342017-12-10 18:36:12 -050090 if reader is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +010091 if exc is None:
92 reader.feed_eof()
93 else:
94 reader.set_exception(exc)
Yury Selivanov2f156452017-03-02 23:25:31 -050095
Seth M. Larson481cb702017-03-02 22:21:18 -060096 if fd in self._pipe_fds:
97 self._pipe_fds.remove(fd)
98 self._maybe_close_transport()
Victor Stinner915bcb02014-02-01 22:49:59 +010099
100 def process_exited(self):
Seth M. Larson481cb702017-03-02 22:21:18 -0600101 self._process_exited = True
102 self._maybe_close_transport()
Yury Selivanov2f156452017-03-02 23:25:31 -0500103
Seth M. Larson481cb702017-03-02 22:21:18 -0600104 def _maybe_close_transport(self):
105 if len(self._pipe_fds) == 0 and self._process_exited:
106 self._transport.close()
107 self._transport = None
Victor Stinner791009b2015-01-15 13:16:02 +0100108
Victor Stinner915bcb02014-02-01 22:49:59 +0100109
110class Process:
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400111 def __init__(self, transport, protocol, loop, *, _asyncio_internal=False):
112 if not _asyncio_internal:
113 warnings.warn(f"{self.__class__} should be instaniated "
114 "by asyncio internals only, "
115 "please avoid its creation from user code",
116 DeprecationWarning)
117
Victor Stinner915bcb02014-02-01 22:49:59 +0100118 self._transport = transport
119 self._protocol = protocol
120 self._loop = loop
121 self.stdin = protocol.stdin
122 self.stdout = protocol.stdout
123 self.stderr = protocol.stderr
124 self.pid = transport.get_pid()
125
Victor Stinneracdb7822014-07-14 18:33:40 +0200126 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500127 return f'<{self.__class__.__name__} {self.pid}>'
Victor Stinneracdb7822014-07-14 18:33:40 +0200128
Victor Stinner915bcb02014-02-01 22:49:59 +0100129 @property
130 def returncode(self):
131 return self._transport.get_returncode()
132
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200133 async def wait(self):
134 """Wait until the process exit and return the process return code."""
135 return await self._transport._wait()
Victor Stinner915bcb02014-02-01 22:49:59 +0100136
137 def send_signal(self, signal):
Victor Stinner915bcb02014-02-01 22:49:59 +0100138 self._transport.send_signal(signal)
139
140 def terminate(self):
Victor Stinner915bcb02014-02-01 22:49:59 +0100141 self._transport.terminate()
142
143 def kill(self):
Victor Stinner915bcb02014-02-01 22:49:59 +0100144 self._transport.kill()
145
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200146 async def _feed_stdin(self, input):
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200147 debug = self._loop.get_debug()
Victor Stinner915bcb02014-02-01 22:49:59 +0100148 self.stdin.write(input)
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200149 if debug:
Yury Selivanov6370f342017-12-10 18:36:12 -0500150 logger.debug(
151 '%r communicate: feed stdin (%s bytes)', self, len(input))
Victor Stinnercc996b52014-07-17 12:25:27 +0200152 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200153 await self.stdin.drain()
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200154 except (BrokenPipeError, ConnectionResetError) as exc:
155 # communicate() ignores BrokenPipeError and ConnectionResetError
156 if debug:
157 logger.debug('%r communicate: stdin got %r', self, exc)
Victor Stinneracdb7822014-07-14 18:33:40 +0200158
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200159 if debug:
Victor Stinneracdb7822014-07-14 18:33:40 +0200160 logger.debug('%r communicate: close stdin', self)
Victor Stinner915bcb02014-02-01 22:49:59 +0100161 self.stdin.close()
162
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200163 async def _noop(self):
Victor Stinner915bcb02014-02-01 22:49:59 +0100164 return None
165
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200166 async def _read_stream(self, fd):
Victor Stinner915bcb02014-02-01 22:49:59 +0100167 transport = self._transport.get_pipe_transport(fd)
168 if fd == 2:
169 stream = self.stderr
170 else:
171 assert fd == 1
172 stream = self.stdout
Victor Stinneracdb7822014-07-14 18:33:40 +0200173 if self._loop.get_debug():
174 name = 'stdout' if fd == 1 else 'stderr'
175 logger.debug('%r communicate: read %s', self, name)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200176 output = await stream.read()
Victor Stinneracdb7822014-07-14 18:33:40 +0200177 if self._loop.get_debug():
178 name = 'stdout' if fd == 1 else 'stderr'
179 logger.debug('%r communicate: close %s', self, name)
Victor Stinner915bcb02014-02-01 22:49:59 +0100180 transport.close()
181 return output
182
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200183 async def communicate(self, input=None):
Yury Selivanov7657f6b2016-05-13 15:35:28 -0400184 if input is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +0100185 stdin = self._feed_stdin(input)
186 else:
187 stdin = self._noop()
188 if self.stdout is not None:
189 stdout = self._read_stream(1)
190 else:
191 stdout = self._noop()
192 if self.stderr is not None:
193 stderr = self._read_stream(2)
194 else:
195 stderr = self._noop()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200196 stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr,
197 loop=self._loop)
198 await self.wait()
Victor Stinner915bcb02014-02-01 22:49:59 +0100199 return (stdout, stderr)
200
201
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200202async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
203 loop=None, limit=streams._DEFAULT_LIMIT,
204 **kwds):
Victor Stinner915bcb02014-02-01 22:49:59 +0100205 if loop is None:
206 loop = events.get_event_loop()
207 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400208 loop=loop,
209 _asyncio_internal=True)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200210 transport, protocol = await loop.subprocess_shell(
211 protocol_factory,
212 cmd, stdin=stdin, stdout=stdout,
213 stderr=stderr, **kwds)
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400214 return Process(transport, protocol, loop, _asyncio_internal=True)
Victor Stinner915bcb02014-02-01 22:49:59 +0100215
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200216
217async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
218 stderr=None, loop=None,
219 limit=streams._DEFAULT_LIMIT, **kwds):
Victor Stinner915bcb02014-02-01 22:49:59 +0100220 if loop is None:
221 loop = events.get_event_loop()
222 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400223 loop=loop,
224 _asyncio_internal=True)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200225 transport, protocol = await loop.subprocess_exec(
226 protocol_factory,
227 program, *args,
228 stdin=stdin, stdout=stdout,
229 stderr=stderr, **kwds)
Andrew Svetlovad4ed872019-05-06 22:52:11 -0400230 return Process(transport, protocol, loop, _asyncio_internal=True)