blob: 4312d445e2037a4c47e1fe4cf82a6fbcd6472a12 [file] [log] [blame]
Victor Stinner915bcb02014-02-01 22:49:59 +01001__all__ = ['create_subprocess_exec', 'create_subprocess_shell']
2
3import collections
4import subprocess
5
6from . import events
7from . import futures
8from . import protocols
9from . import streams
10from . import tasks
11
12
13PIPE = subprocess.PIPE
14STDOUT = subprocess.STDOUT
15DEVNULL = subprocess.DEVNULL
16
17
18class SubprocessStreamProtocol(streams.FlowControlMixin,
19 protocols.SubprocessProtocol):
20 """Like StreamReaderProtocol, but for a subprocess."""
21
22 def __init__(self, limit, loop):
23 super().__init__(loop=loop)
24 self._limit = limit
25 self.stdin = self.stdout = self.stderr = None
26 self.waiter = futures.Future(loop=loop)
27 self._waiters = collections.deque()
28 self._transport = None
29
30 def connection_made(self, transport):
31 self._transport = transport
32 if transport.get_pipe_transport(1):
33 self.stdout = streams.StreamReader(limit=self._limit,
34 loop=self._loop)
35 if transport.get_pipe_transport(2):
36 self.stderr = streams.StreamReader(limit=self._limit,
37 loop=self._loop)
38 stdin = transport.get_pipe_transport(0)
39 if stdin is not None:
40 self.stdin = streams.StreamWriter(stdin,
41 protocol=self,
42 reader=None,
43 loop=self._loop)
44 self.waiter.set_result(None)
45
46 def pipe_data_received(self, fd, data):
47 if fd == 1:
48 reader = self.stdout
49 elif fd == 2:
50 reader = self.stderr
51 else:
52 reader = None
53 if reader is not None:
54 reader.feed_data(data)
55
56 def pipe_connection_lost(self, fd, exc):
57 if fd == 0:
58 pipe = self.stdin
59 if pipe is not None:
60 pipe.close()
61 self.connection_lost(exc)
62 return
63 if fd == 1:
64 reader = self.stdout
65 elif fd == 2:
66 reader = self.stderr
67 else:
68 reader = None
69 if reader != None:
70 if exc is None:
71 reader.feed_eof()
72 else:
73 reader.set_exception(exc)
74
75 def process_exited(self):
76 # wake up futures waiting for wait()
77 returncode = self._transport.get_returncode()
78 while self._waiters:
79 waiter = self._waiters.popleft()
80 waiter.set_result(returncode)
81
82
83class Process:
84 def __init__(self, transport, protocol, loop):
85 self._transport = transport
86 self._protocol = protocol
87 self._loop = loop
88 self.stdin = protocol.stdin
89 self.stdout = protocol.stdout
90 self.stderr = protocol.stderr
91 self.pid = transport.get_pid()
92
93 @property
94 def returncode(self):
95 return self._transport.get_returncode()
96
97 @tasks.coroutine
98 def wait(self):
99 """Wait until the process exit and return the process return code."""
100 returncode = self._transport.get_returncode()
101 if returncode is not None:
102 return returncode
103
104 waiter = futures.Future(loop=self._loop)
105 self._protocol._waiters.append(waiter)
106 yield from waiter
107 return waiter.result()
108
109 def get_subprocess(self):
110 return self._transport.get_extra_info('subprocess')
111
112 def _check_alive(self):
113 if self._transport.get_returncode() is not None:
114 raise ProcessLookupError()
115
116 def send_signal(self, signal):
117 self._check_alive()
118 self._transport.send_signal(signal)
119
120 def terminate(self):
121 self._check_alive()
122 self._transport.terminate()
123
124 def kill(self):
125 self._check_alive()
126 self._transport.kill()
127
128 @tasks.coroutine
129 def _feed_stdin(self, input):
130 self.stdin.write(input)
131 yield from self.stdin.drain()
132 self.stdin.close()
133
134 @tasks.coroutine
135 def _noop(self):
136 return None
137
138 @tasks.coroutine
139 def _read_stream(self, fd):
140 transport = self._transport.get_pipe_transport(fd)
141 if fd == 2:
142 stream = self.stderr
143 else:
144 assert fd == 1
145 stream = self.stdout
146 output = yield from stream.read()
147 transport.close()
148 return output
149
150 @tasks.coroutine
151 def communicate(self, input=None):
152 loop = self._transport._loop
153 if input:
154 stdin = self._feed_stdin(input)
155 else:
156 stdin = self._noop()
157 if self.stdout is not None:
158 stdout = self._read_stream(1)
159 else:
160 stdout = self._noop()
161 if self.stderr is not None:
162 stderr = self._read_stream(2)
163 else:
164 stderr = self._noop()
165 stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr,
166 loop=loop)
167 yield from self.wait()
168 return (stdout, stderr)
169
170
171@tasks.coroutine
172def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
173 loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
174 if loop is None:
175 loop = events.get_event_loop()
176 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
177 loop=loop)
178 transport, protocol = yield from loop.subprocess_shell(
179 protocol_factory,
180 cmd, stdin=stdin, stdout=stdout,
181 stderr=stderr, **kwds)
182 yield from protocol.waiter
183 return Process(transport, protocol, loop)
184
185@tasks.coroutine
186def create_subprocess_exec(*args, stdin=None, stdout=None, stderr=None,
187 loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
188 if loop is None:
189 loop = events.get_event_loop()
190 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
191 loop=loop)
192 transport, protocol = yield from loop.subprocess_exec(
193 protocol_factory,
194 *args, stdin=stdin, stdout=stdout,
195 stderr=stderr, **kwds)
196 yield from protocol.waiter
197 return Process(transport, protocol, loop)