blob: 3047894b3808ea5446eae3b1358e195f491ffc20 [file] [log] [blame]
Victor Stinner915bcb02014-02-01 22:49:59 +01001__all__ = ['create_subprocess_exec', 'create_subprocess_shell']
2
3import collections
4import subprocess
5
6from . import events
7from . import futures
8from . import protocols
9from . import streams
10from . import tasks
11
12
13PIPE = subprocess.PIPE
14STDOUT = subprocess.STDOUT
15DEVNULL = subprocess.DEVNULL
16
17
18class SubprocessStreamProtocol(streams.FlowControlMixin,
19 protocols.SubprocessProtocol):
20 """Like StreamReaderProtocol, but for a subprocess."""
21
22 def __init__(self, limit, loop):
23 super().__init__(loop=loop)
24 self._limit = limit
25 self.stdin = self.stdout = self.stderr = None
26 self.waiter = futures.Future(loop=loop)
27 self._waiters = collections.deque()
28 self._transport = None
29
30 def connection_made(self, transport):
31 self._transport = transport
32 if transport.get_pipe_transport(1):
33 self.stdout = streams.StreamReader(limit=self._limit,
34 loop=self._loop)
35 if transport.get_pipe_transport(2):
36 self.stderr = streams.StreamReader(limit=self._limit,
37 loop=self._loop)
38 stdin = transport.get_pipe_transport(0)
39 if stdin is not None:
40 self.stdin = streams.StreamWriter(stdin,
41 protocol=self,
42 reader=None,
43 loop=self._loop)
44 self.waiter.set_result(None)
45
46 def pipe_data_received(self, fd, data):
47 if fd == 1:
48 reader = self.stdout
49 elif fd == 2:
50 reader = self.stderr
51 else:
52 reader = None
53 if reader is not None:
54 reader.feed_data(data)
55
56 def pipe_connection_lost(self, fd, exc):
57 if fd == 0:
58 pipe = self.stdin
59 if pipe is not None:
60 pipe.close()
61 self.connection_lost(exc)
62 return
63 if fd == 1:
64 reader = self.stdout
65 elif fd == 2:
66 reader = self.stderr
67 else:
68 reader = None
69 if reader != None:
70 if exc is None:
71 reader.feed_eof()
72 else:
73 reader.set_exception(exc)
74
75 def process_exited(self):
76 # wake up futures waiting for wait()
77 returncode = self._transport.get_returncode()
78 while self._waiters:
79 waiter = self._waiters.popleft()
80 waiter.set_result(returncode)
81
82
83class Process:
84 def __init__(self, transport, protocol, loop):
85 self._transport = transport
86 self._protocol = protocol
87 self._loop = loop
88 self.stdin = protocol.stdin
89 self.stdout = protocol.stdout
90 self.stderr = protocol.stderr
91 self.pid = transport.get_pid()
92
93 @property
94 def returncode(self):
95 return self._transport.get_returncode()
96
97 @tasks.coroutine
98 def wait(self):
99 """Wait until the process exit and return the process return code."""
100 returncode = self._transport.get_returncode()
101 if returncode is not None:
102 return returncode
103
104 waiter = futures.Future(loop=self._loop)
105 self._protocol._waiters.append(waiter)
106 yield from waiter
107 return waiter.result()
108
Victor Stinnerb79eb052014-02-03 23:08:14 +0100109 @property
110 def subprocess(self):
Victor Stinner915bcb02014-02-01 22:49:59 +0100111 return self._transport.get_extra_info('subprocess')
112
113 def _check_alive(self):
114 if self._transport.get_returncode() is not None:
115 raise ProcessLookupError()
116
117 def send_signal(self, signal):
118 self._check_alive()
119 self._transport.send_signal(signal)
120
121 def terminate(self):
122 self._check_alive()
123 self._transport.terminate()
124
125 def kill(self):
126 self._check_alive()
127 self._transport.kill()
128
129 @tasks.coroutine
130 def _feed_stdin(self, input):
131 self.stdin.write(input)
132 yield from self.stdin.drain()
133 self.stdin.close()
134
135 @tasks.coroutine
136 def _noop(self):
137 return None
138
139 @tasks.coroutine
140 def _read_stream(self, fd):
141 transport = self._transport.get_pipe_transport(fd)
142 if fd == 2:
143 stream = self.stderr
144 else:
145 assert fd == 1
146 stream = self.stdout
147 output = yield from stream.read()
148 transport.close()
149 return output
150
151 @tasks.coroutine
152 def communicate(self, input=None):
153 loop = self._transport._loop
154 if input:
155 stdin = self._feed_stdin(input)
156 else:
157 stdin = self._noop()
158 if self.stdout is not None:
159 stdout = self._read_stream(1)
160 else:
161 stdout = self._noop()
162 if self.stderr is not None:
163 stderr = self._read_stream(2)
164 else:
165 stderr = self._noop()
166 stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr,
167 loop=loop)
168 yield from self.wait()
169 return (stdout, stderr)
170
171
172@tasks.coroutine
173def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
174 loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
175 if loop is None:
176 loop = events.get_event_loop()
177 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
178 loop=loop)
179 transport, protocol = yield from loop.subprocess_shell(
180 protocol_factory,
181 cmd, stdin=stdin, stdout=stdout,
182 stderr=stderr, **kwds)
183 yield from protocol.waiter
184 return Process(transport, protocol, loop)
185
186@tasks.coroutine
187def create_subprocess_exec(*args, stdin=None, stdout=None, stderr=None,
188 loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
189 if loop is None:
190 loop = events.get_event_loop()
191 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
192 loop=loop)
193 transport, protocol = yield from loop.subprocess_exec(
194 protocol_factory,
195 *args, stdin=stdin, stdout=stdout,
196 stderr=stderr, **kwds)
197 yield from protocol.waiter
198 return Process(transport, protocol, loop)