blob: c86de3d087024061dd87c3ab9f824cdf82ef2b0d [file] [log] [blame]
Yury Selivanov6370f342017-12-10 18:36:12 -05001__all__ = 'create_subprocess_exec', 'create_subprocess_shell'
Victor Stinner915bcb02014-02-01 22:49:59 +01002
Victor Stinner915bcb02014-02-01 22:49:59 +01003import subprocess
4
5from . import events
Victor Stinner915bcb02014-02-01 22:49:59 +01006from . import protocols
7from . import streams
8from . import tasks
Victor Stinneracdb7822014-07-14 18:33:40 +02009from .log import logger
Victor Stinner915bcb02014-02-01 22:49:59 +010010
11
12PIPE = subprocess.PIPE
13STDOUT = subprocess.STDOUT
14DEVNULL = subprocess.DEVNULL
15
16
17class SubprocessStreamProtocol(streams.FlowControlMixin,
18 protocols.SubprocessProtocol):
19 """Like StreamReaderProtocol, but for a subprocess."""
20
21 def __init__(self, limit, loop):
22 super().__init__(loop=loop)
23 self._limit = limit
24 self.stdin = self.stdout = self.stderr = None
Victor Stinner915bcb02014-02-01 22:49:59 +010025 self._transport = None
Seth M. Larson481cb702017-03-02 22:21:18 -060026 self._process_exited = False
27 self._pipe_fds = []
Victor Stinner915bcb02014-02-01 22:49:59 +010028
Victor Stinneracdb7822014-07-14 18:33:40 +020029 def __repr__(self):
30 info = [self.__class__.__name__]
31 if self.stdin is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -050032 info.append(f'stdin={self.stdin!r}')
Victor Stinneracdb7822014-07-14 18:33:40 +020033 if self.stdout is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -050034 info.append(f'stdout={self.stdout!r}')
Victor Stinneracdb7822014-07-14 18:33:40 +020035 if self.stderr is not None:
Yury Selivanov6370f342017-12-10 18:36:12 -050036 info.append(f'stderr={self.stderr!r}')
37 return '<{}>'.format(' '.join(info))
Victor Stinneracdb7822014-07-14 18:33:40 +020038
Andrew Svetlova5d1eb82018-09-12 11:43:04 -070039 def _untrack_reader(self):
40 # StreamWriter.close() expects the protocol
41 # to have this method defined.
42 pass
43
Victor Stinner915bcb02014-02-01 22:49:59 +010044 def connection_made(self, transport):
45 self._transport = transport
Victor Stinner5ef586f2014-11-25 17:20:33 +010046
47 stdout_transport = transport.get_pipe_transport(1)
48 if stdout_transport is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +010049 self.stdout = streams.StreamReader(limit=self._limit,
50 loop=self._loop)
Victor Stinner5ef586f2014-11-25 17:20:33 +010051 self.stdout.set_transport(stdout_transport)
Seth M. Larson481cb702017-03-02 22:21:18 -060052 self._pipe_fds.append(1)
Victor Stinner5ef586f2014-11-25 17:20:33 +010053
54 stderr_transport = transport.get_pipe_transport(2)
55 if stderr_transport is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +010056 self.stderr = streams.StreamReader(limit=self._limit,
57 loop=self._loop)
Victor Stinner5ef586f2014-11-25 17:20:33 +010058 self.stderr.set_transport(stderr_transport)
Seth M. Larson481cb702017-03-02 22:21:18 -060059 self._pipe_fds.append(2)
Victor Stinner5ef586f2014-11-25 17:20:33 +010060
61 stdin_transport = transport.get_pipe_transport(0)
62 if stdin_transport is not None:
63 self.stdin = streams.StreamWriter(stdin_transport,
Victor Stinner915bcb02014-02-01 22:49:59 +010064 protocol=self,
65 reader=None,
66 loop=self._loop)
Victor Stinnerf651a602015-01-14 02:10:33 +010067
Victor Stinner915bcb02014-02-01 22:49:59 +010068 def pipe_data_received(self, fd, data):
69 if fd == 1:
70 reader = self.stdout
71 elif fd == 2:
72 reader = self.stderr
73 else:
74 reader = None
75 if reader is not None:
76 reader.feed_data(data)
77
78 def pipe_connection_lost(self, fd, exc):
79 if fd == 0:
80 pipe = self.stdin
81 if pipe is not None:
82 pipe.close()
83 self.connection_lost(exc)
84 return
85 if fd == 1:
86 reader = self.stdout
87 elif fd == 2:
88 reader = self.stderr
89 else:
90 reader = None
Yury Selivanov6370f342017-12-10 18:36:12 -050091 if reader is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +010092 if exc is None:
93 reader.feed_eof()
94 else:
95 reader.set_exception(exc)
Yury Selivanov2f156452017-03-02 23:25:31 -050096
Seth M. Larson481cb702017-03-02 22:21:18 -060097 if fd in self._pipe_fds:
98 self._pipe_fds.remove(fd)
99 self._maybe_close_transport()
Victor Stinner915bcb02014-02-01 22:49:59 +0100100
101 def process_exited(self):
Seth M. Larson481cb702017-03-02 22:21:18 -0600102 self._process_exited = True
103 self._maybe_close_transport()
Yury Selivanov2f156452017-03-02 23:25:31 -0500104
Seth M. Larson481cb702017-03-02 22:21:18 -0600105 def _maybe_close_transport(self):
106 if len(self._pipe_fds) == 0 and self._process_exited:
107 self._transport.close()
108 self._transport = None
Victor Stinner791009b2015-01-15 13:16:02 +0100109
Victor Stinner915bcb02014-02-01 22:49:59 +0100110
111class Process:
112 def __init__(self, transport, protocol, loop):
113 self._transport = transport
114 self._protocol = protocol
115 self._loop = loop
116 self.stdin = protocol.stdin
117 self.stdout = protocol.stdout
118 self.stderr = protocol.stderr
119 self.pid = transport.get_pid()
120
Victor Stinneracdb7822014-07-14 18:33:40 +0200121 def __repr__(self):
Yury Selivanov6370f342017-12-10 18:36:12 -0500122 return f'<{self.__class__.__name__} {self.pid}>'
Victor Stinneracdb7822014-07-14 18:33:40 +0200123
Victor Stinner915bcb02014-02-01 22:49:59 +0100124 @property
125 def returncode(self):
126 return self._transport.get_returncode()
127
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200128 async def wait(self):
129 """Wait until the process exit and return the process return code."""
130 return await self._transport._wait()
Victor Stinner915bcb02014-02-01 22:49:59 +0100131
132 def send_signal(self, signal):
Victor Stinner915bcb02014-02-01 22:49:59 +0100133 self._transport.send_signal(signal)
134
135 def terminate(self):
Victor Stinner915bcb02014-02-01 22:49:59 +0100136 self._transport.terminate()
137
138 def kill(self):
Victor Stinner915bcb02014-02-01 22:49:59 +0100139 self._transport.kill()
140
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200141 async def _feed_stdin(self, input):
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200142 debug = self._loop.get_debug()
Victor Stinner915bcb02014-02-01 22:49:59 +0100143 self.stdin.write(input)
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200144 if debug:
Yury Selivanov6370f342017-12-10 18:36:12 -0500145 logger.debug(
146 '%r communicate: feed stdin (%s bytes)', self, len(input))
Victor Stinnercc996b52014-07-17 12:25:27 +0200147 try:
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200148 await self.stdin.drain()
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200149 except (BrokenPipeError, ConnectionResetError) as exc:
150 # communicate() ignores BrokenPipeError and ConnectionResetError
151 if debug:
152 logger.debug('%r communicate: stdin got %r', self, exc)
Victor Stinneracdb7822014-07-14 18:33:40 +0200153
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200154 if debug:
Victor Stinneracdb7822014-07-14 18:33:40 +0200155 logger.debug('%r communicate: close stdin', self)
Victor Stinner915bcb02014-02-01 22:49:59 +0100156 self.stdin.close()
157
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200158 async def _noop(self):
Victor Stinner915bcb02014-02-01 22:49:59 +0100159 return None
160
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200161 async def _read_stream(self, fd):
Victor Stinner915bcb02014-02-01 22:49:59 +0100162 transport = self._transport.get_pipe_transport(fd)
163 if fd == 2:
164 stream = self.stderr
165 else:
166 assert fd == 1
167 stream = self.stdout
Victor Stinneracdb7822014-07-14 18:33:40 +0200168 if self._loop.get_debug():
169 name = 'stdout' if fd == 1 else 'stderr'
170 logger.debug('%r communicate: read %s', self, name)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200171 output = await stream.read()
Victor Stinneracdb7822014-07-14 18:33:40 +0200172 if self._loop.get_debug():
173 name = 'stdout' if fd == 1 else 'stderr'
174 logger.debug('%r communicate: close %s', self, name)
Victor Stinner915bcb02014-02-01 22:49:59 +0100175 transport.close()
176 return output
177
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200178 async def communicate(self, input=None):
Yury Selivanov7657f6b2016-05-13 15:35:28 -0400179 if input is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +0100180 stdin = self._feed_stdin(input)
181 else:
182 stdin = self._noop()
183 if self.stdout is not None:
184 stdout = self._read_stream(1)
185 else:
186 stdout = self._noop()
187 if self.stderr is not None:
188 stderr = self._read_stream(2)
189 else:
190 stderr = self._noop()
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200191 stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr,
192 loop=self._loop)
193 await self.wait()
Victor Stinner915bcb02014-02-01 22:49:59 +0100194 return (stdout, stderr)
195
196
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200197async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
198 loop=None, limit=streams._DEFAULT_LIMIT,
199 **kwds):
Victor Stinner915bcb02014-02-01 22:49:59 +0100200 if loop is None:
201 loop = events.get_event_loop()
202 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
203 loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200204 transport, protocol = await loop.subprocess_shell(
205 protocol_factory,
206 cmd, stdin=stdin, stdout=stdout,
207 stderr=stderr, **kwds)
Victor Stinner915bcb02014-02-01 22:49:59 +0100208 return Process(transport, protocol, loop)
209
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200210
211async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
212 stderr=None, loop=None,
213 limit=streams._DEFAULT_LIMIT, **kwds):
Victor Stinner915bcb02014-02-01 22:49:59 +0100214 if loop is None:
215 loop = events.get_event_loop()
216 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
217 loop=loop)
Andrew Svetlov5f841b52017-12-09 00:23:48 +0200218 transport, protocol = await loop.subprocess_exec(
219 protocol_factory,
220 program, *args,
221 stdin=stdin, stdout=stdout,
222 stderr=stderr, **kwds)
Victor Stinner915bcb02014-02-01 22:49:59 +0100223 return Process(transport, protocol, loop)