blob: d0c9779c1c92a1a56f397abfb9ad6b95c8ddcb16 [file] [log] [blame]
Victor Stinner915bcb02014-02-01 22:49:59 +01001__all__ = ['create_subprocess_exec', 'create_subprocess_shell']
2
3import collections
4import subprocess
5
6from . import events
7from . import futures
8from . import protocols
9from . import streams
10from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020011from .coroutines import coroutine
Victor Stinneracdb7822014-07-14 18:33:40 +020012from .log import logger
Victor Stinner915bcb02014-02-01 22:49:59 +010013
14
15PIPE = subprocess.PIPE
16STDOUT = subprocess.STDOUT
17DEVNULL = subprocess.DEVNULL
18
19
20class SubprocessStreamProtocol(streams.FlowControlMixin,
21 protocols.SubprocessProtocol):
22 """Like StreamReaderProtocol, but for a subprocess."""
23
24 def __init__(self, limit, loop):
25 super().__init__(loop=loop)
26 self._limit = limit
27 self.stdin = self.stdout = self.stderr = None
Victor Stinner915bcb02014-02-01 22:49:59 +010028 self._transport = None
29
Victor Stinneracdb7822014-07-14 18:33:40 +020030 def __repr__(self):
31 info = [self.__class__.__name__]
32 if self.stdin is not None:
33 info.append('stdin=%r' % self.stdin)
34 if self.stdout is not None:
35 info.append('stdout=%r' % self.stdout)
36 if self.stderr is not None:
37 info.append('stderr=%r' % self.stderr)
38 return '<%s>' % ' '.join(info)
39
Victor Stinner915bcb02014-02-01 22:49:59 +010040 def connection_made(self, transport):
41 self._transport = transport
Victor Stinner5ef586f2014-11-25 17:20:33 +010042
43 stdout_transport = transport.get_pipe_transport(1)
44 if stdout_transport is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +010045 self.stdout = streams.StreamReader(limit=self._limit,
46 loop=self._loop)
Victor Stinner5ef586f2014-11-25 17:20:33 +010047 self.stdout.set_transport(stdout_transport)
48
49 stderr_transport = transport.get_pipe_transport(2)
50 if stderr_transport is not None:
Victor Stinner915bcb02014-02-01 22:49:59 +010051 self.stderr = streams.StreamReader(limit=self._limit,
52 loop=self._loop)
Victor Stinner5ef586f2014-11-25 17:20:33 +010053 self.stderr.set_transport(stderr_transport)
54
55 stdin_transport = transport.get_pipe_transport(0)
56 if stdin_transport is not None:
57 self.stdin = streams.StreamWriter(stdin_transport,
Victor Stinner915bcb02014-02-01 22:49:59 +010058 protocol=self,
59 reader=None,
60 loop=self._loop)
Victor Stinnerf651a602015-01-14 02:10:33 +010061
Victor Stinner915bcb02014-02-01 22:49:59 +010062 def pipe_data_received(self, fd, data):
63 if fd == 1:
64 reader = self.stdout
65 elif fd == 2:
66 reader = self.stderr
67 else:
68 reader = None
69 if reader is not None:
70 reader.feed_data(data)
71
72 def pipe_connection_lost(self, fd, exc):
73 if fd == 0:
74 pipe = self.stdin
75 if pipe is not None:
76 pipe.close()
77 self.connection_lost(exc)
78 return
79 if fd == 1:
80 reader = self.stdout
81 elif fd == 2:
82 reader = self.stderr
83 else:
84 reader = None
85 if reader != None:
86 if exc is None:
87 reader.feed_eof()
88 else:
89 reader.set_exception(exc)
90
91 def process_exited(self):
Victor Stinner791009b2015-01-15 13:16:02 +010092 self._transport.close()
93 self._transport = None
94
Victor Stinner915bcb02014-02-01 22:49:59 +010095
96class Process:
97 def __init__(self, transport, protocol, loop):
98 self._transport = transport
99 self._protocol = protocol
100 self._loop = loop
101 self.stdin = protocol.stdin
102 self.stdout = protocol.stdout
103 self.stderr = protocol.stderr
104 self.pid = transport.get_pid()
105
Victor Stinneracdb7822014-07-14 18:33:40 +0200106 def __repr__(self):
107 return '<%s %s>' % (self.__class__.__name__, self.pid)
108
Victor Stinner915bcb02014-02-01 22:49:59 +0100109 @property
110 def returncode(self):
111 return self._transport.get_returncode()
112
Victor Stinnerf951d282014-06-29 00:46:45 +0200113 @coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +0100114 def wait(self):
Victor Stinner47cd10d2015-01-30 00:05:19 +0100115 """Wait until the process exit and return the process return code.
Victor Stinner915bcb02014-02-01 22:49:59 +0100116
Victor Stinner47cd10d2015-01-30 00:05:19 +0100117 This method is a coroutine."""
118 return (yield from self._transport.wait())
Victor Stinner915bcb02014-02-01 22:49:59 +0100119
120 def send_signal(self, signal):
Victor Stinner915bcb02014-02-01 22:49:59 +0100121 self._transport.send_signal(signal)
122
123 def terminate(self):
Victor Stinner915bcb02014-02-01 22:49:59 +0100124 self._transport.terminate()
125
126 def kill(self):
Victor Stinner915bcb02014-02-01 22:49:59 +0100127 self._transport.kill()
128
Victor Stinnerf951d282014-06-29 00:46:45 +0200129 @coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +0100130 def _feed_stdin(self, input):
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200131 debug = self._loop.get_debug()
Victor Stinner915bcb02014-02-01 22:49:59 +0100132 self.stdin.write(input)
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200133 if debug:
Victor Stinneracdb7822014-07-14 18:33:40 +0200134 logger.debug('%r communicate: feed stdin (%s bytes)',
135 self, len(input))
Victor Stinnercc996b52014-07-17 12:25:27 +0200136 try:
137 yield from self.stdin.drain()
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200138 except (BrokenPipeError, ConnectionResetError) as exc:
139 # communicate() ignores BrokenPipeError and ConnectionResetError
140 if debug:
141 logger.debug('%r communicate: stdin got %r', self, exc)
Victor Stinneracdb7822014-07-14 18:33:40 +0200142
Victor Stinnerd55b54d2014-07-17 13:12:03 +0200143 if debug:
Victor Stinneracdb7822014-07-14 18:33:40 +0200144 logger.debug('%r communicate: close stdin', self)
Victor Stinner915bcb02014-02-01 22:49:59 +0100145 self.stdin.close()
146
Victor Stinnerf951d282014-06-29 00:46:45 +0200147 @coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +0100148 def _noop(self):
149 return None
150
Victor Stinnerf951d282014-06-29 00:46:45 +0200151 @coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +0100152 def _read_stream(self, fd):
153 transport = self._transport.get_pipe_transport(fd)
154 if fd == 2:
155 stream = self.stderr
156 else:
157 assert fd == 1
158 stream = self.stdout
Victor Stinneracdb7822014-07-14 18:33:40 +0200159 if self._loop.get_debug():
160 name = 'stdout' if fd == 1 else 'stderr'
161 logger.debug('%r communicate: read %s', self, name)
Victor Stinner915bcb02014-02-01 22:49:59 +0100162 output = yield from stream.read()
Victor Stinneracdb7822014-07-14 18:33:40 +0200163 if self._loop.get_debug():
164 name = 'stdout' if fd == 1 else 'stderr'
165 logger.debug('%r communicate: close %s', self, name)
Victor Stinner915bcb02014-02-01 22:49:59 +0100166 transport.close()
167 return output
168
Victor Stinnerf951d282014-06-29 00:46:45 +0200169 @coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +0100170 def communicate(self, input=None):
Victor Stinner915bcb02014-02-01 22:49:59 +0100171 if input:
172 stdin = self._feed_stdin(input)
173 else:
174 stdin = self._noop()
175 if self.stdout is not None:
176 stdout = self._read_stream(1)
177 else:
178 stdout = self._noop()
179 if self.stderr is not None:
180 stderr = self._read_stream(2)
181 else:
182 stderr = self._noop()
183 stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr,
Victor Stinner8d0230b2014-02-20 10:12:59 +0100184 loop=self._loop)
Victor Stinner915bcb02014-02-01 22:49:59 +0100185 yield from self.wait()
186 return (stdout, stderr)
187
188
Victor Stinnerf951d282014-06-29 00:46:45 +0200189@coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +0100190def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
191 loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
192 if loop is None:
193 loop = events.get_event_loop()
194 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
195 loop=loop)
196 transport, protocol = yield from loop.subprocess_shell(
197 protocol_factory,
198 cmd, stdin=stdin, stdout=stdout,
199 stderr=stderr, **kwds)
Victor Stinner915bcb02014-02-01 22:49:59 +0100200 return Process(transport, protocol, loop)
201
Victor Stinnerf951d282014-06-29 00:46:45 +0200202@coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500203def create_subprocess_exec(program, *args, stdin=None, stdout=None,
204 stderr=None, loop=None,
205 limit=streams._DEFAULT_LIMIT, **kwds):
Victor Stinner915bcb02014-02-01 22:49:59 +0100206 if loop is None:
207 loop = events.get_event_loop()
208 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
209 loop=loop)
210 transport, protocol = yield from loop.subprocess_exec(
211 protocol_factory,
Victor Stinner20e07432014-02-11 11:44:56 +0100212 program, *args,
213 stdin=stdin, stdout=stdout,
Victor Stinner915bcb02014-02-01 22:49:59 +0100214 stderr=stderr, **kwds)
Victor Stinner915bcb02014-02-01 22:49:59 +0100215 return Process(transport, protocol, loop)