blob: 414e02383e4ea1a42dab8edf3b3035718a77bd76 [file] [log] [blame]
Victor Stinner915bcb02014-02-01 22:49:59 +01001__all__ = ['create_subprocess_exec', 'create_subprocess_shell']
2
3import collections
4import subprocess
5
6from . import events
7from . import futures
8from . import protocols
9from . import streams
10from . import tasks
11
12
13PIPE = subprocess.PIPE
14STDOUT = subprocess.STDOUT
15DEVNULL = subprocess.DEVNULL
16
17
18class SubprocessStreamProtocol(streams.FlowControlMixin,
19 protocols.SubprocessProtocol):
20 """Like StreamReaderProtocol, but for a subprocess."""
21
22 def __init__(self, limit, loop):
23 super().__init__(loop=loop)
24 self._limit = limit
25 self.stdin = self.stdout = self.stderr = None
26 self.waiter = futures.Future(loop=loop)
27 self._waiters = collections.deque()
28 self._transport = None
29
30 def connection_made(self, transport):
31 self._transport = transport
32 if transport.get_pipe_transport(1):
33 self.stdout = streams.StreamReader(limit=self._limit,
34 loop=self._loop)
35 if transport.get_pipe_transport(2):
36 self.stderr = streams.StreamReader(limit=self._limit,
37 loop=self._loop)
38 stdin = transport.get_pipe_transport(0)
39 if stdin is not None:
40 self.stdin = streams.StreamWriter(stdin,
41 protocol=self,
42 reader=None,
43 loop=self._loop)
44 self.waiter.set_result(None)
45
46 def pipe_data_received(self, fd, data):
47 if fd == 1:
48 reader = self.stdout
49 elif fd == 2:
50 reader = self.stderr
51 else:
52 reader = None
53 if reader is not None:
54 reader.feed_data(data)
55
56 def pipe_connection_lost(self, fd, exc):
57 if fd == 0:
58 pipe = self.stdin
59 if pipe is not None:
60 pipe.close()
61 self.connection_lost(exc)
62 return
63 if fd == 1:
64 reader = self.stdout
65 elif fd == 2:
66 reader = self.stderr
67 else:
68 reader = None
69 if reader != None:
70 if exc is None:
71 reader.feed_eof()
72 else:
73 reader.set_exception(exc)
74
75 def process_exited(self):
76 # wake up futures waiting for wait()
77 returncode = self._transport.get_returncode()
78 while self._waiters:
79 waiter = self._waiters.popleft()
80 waiter.set_result(returncode)
81
82
83class Process:
84 def __init__(self, transport, protocol, loop):
85 self._transport = transport
86 self._protocol = protocol
87 self._loop = loop
88 self.stdin = protocol.stdin
89 self.stdout = protocol.stdout
90 self.stderr = protocol.stderr
91 self.pid = transport.get_pid()
92
93 @property
94 def returncode(self):
95 return self._transport.get_returncode()
96
97 @tasks.coroutine
98 def wait(self):
99 """Wait until the process exit and return the process return code."""
100 returncode = self._transport.get_returncode()
101 if returncode is not None:
102 return returncode
103
104 waiter = futures.Future(loop=self._loop)
105 self._protocol._waiters.append(waiter)
106 yield from waiter
107 return waiter.result()
108
Victor Stinner915bcb02014-02-01 22:49:59 +0100109 def _check_alive(self):
110 if self._transport.get_returncode() is not None:
111 raise ProcessLookupError()
112
113 def send_signal(self, signal):
114 self._check_alive()
115 self._transport.send_signal(signal)
116
117 def terminate(self):
118 self._check_alive()
119 self._transport.terminate()
120
121 def kill(self):
122 self._check_alive()
123 self._transport.kill()
124
125 @tasks.coroutine
126 def _feed_stdin(self, input):
127 self.stdin.write(input)
128 yield from self.stdin.drain()
129 self.stdin.close()
130
131 @tasks.coroutine
132 def _noop(self):
133 return None
134
135 @tasks.coroutine
136 def _read_stream(self, fd):
137 transport = self._transport.get_pipe_transport(fd)
138 if fd == 2:
139 stream = self.stderr
140 else:
141 assert fd == 1
142 stream = self.stdout
143 output = yield from stream.read()
144 transport.close()
145 return output
146
147 @tasks.coroutine
148 def communicate(self, input=None):
Victor Stinner915bcb02014-02-01 22:49:59 +0100149 if input:
150 stdin = self._feed_stdin(input)
151 else:
152 stdin = self._noop()
153 if self.stdout is not None:
154 stdout = self._read_stream(1)
155 else:
156 stdout = self._noop()
157 if self.stderr is not None:
158 stderr = self._read_stream(2)
159 else:
160 stderr = self._noop()
161 stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr,
Victor Stinner8d0230b2014-02-20 10:12:59 +0100162 loop=self._loop)
Victor Stinner915bcb02014-02-01 22:49:59 +0100163 yield from self.wait()
164 return (stdout, stderr)
165
166
167@tasks.coroutine
168def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
169 loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
170 if loop is None:
171 loop = events.get_event_loop()
172 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
173 loop=loop)
174 transport, protocol = yield from loop.subprocess_shell(
175 protocol_factory,
176 cmd, stdin=stdin, stdout=stdout,
177 stderr=stderr, **kwds)
178 yield from protocol.waiter
179 return Process(transport, protocol, loop)
180
181@tasks.coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500182def create_subprocess_exec(program, *args, stdin=None, stdout=None,
183 stderr=None, loop=None,
184 limit=streams._DEFAULT_LIMIT, **kwds):
Victor Stinner915bcb02014-02-01 22:49:59 +0100185 if loop is None:
186 loop = events.get_event_loop()
187 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
188 loop=loop)
189 transport, protocol = yield from loop.subprocess_exec(
190 protocol_factory,
Victor Stinner20e07432014-02-11 11:44:56 +0100191 program, *args,
192 stdin=stdin, stdout=stdout,
Victor Stinner915bcb02014-02-01 22:49:59 +0100193 stderr=stderr, **kwds)
194 yield from protocol.waiter
195 return Process(transport, protocol, loop)