blob: 2cd6de6d6fd63fea1e001a0bc4d4f8044acfce0f [file] [log] [blame]
Victor Stinner915bcb02014-02-01 22:49:59 +01001__all__ = ['create_subprocess_exec', 'create_subprocess_shell']
2
3import collections
4import subprocess
5
6from . import events
7from . import futures
8from . import protocols
9from . import streams
10from . import tasks
Victor Stinnerf951d282014-06-29 00:46:45 +020011from .coroutines import coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +010012
13
14PIPE = subprocess.PIPE
15STDOUT = subprocess.STDOUT
16DEVNULL = subprocess.DEVNULL
17
18
19class SubprocessStreamProtocol(streams.FlowControlMixin,
20 protocols.SubprocessProtocol):
21 """Like StreamReaderProtocol, but for a subprocess."""
22
23 def __init__(self, limit, loop):
24 super().__init__(loop=loop)
25 self._limit = limit
26 self.stdin = self.stdout = self.stderr = None
27 self.waiter = futures.Future(loop=loop)
28 self._waiters = collections.deque()
29 self._transport = None
30
31 def connection_made(self, transport):
32 self._transport = transport
33 if transport.get_pipe_transport(1):
34 self.stdout = streams.StreamReader(limit=self._limit,
35 loop=self._loop)
36 if transport.get_pipe_transport(2):
37 self.stderr = streams.StreamReader(limit=self._limit,
38 loop=self._loop)
39 stdin = transport.get_pipe_transport(0)
40 if stdin is not None:
41 self.stdin = streams.StreamWriter(stdin,
42 protocol=self,
43 reader=None,
44 loop=self._loop)
45 self.waiter.set_result(None)
46
47 def pipe_data_received(self, fd, data):
48 if fd == 1:
49 reader = self.stdout
50 elif fd == 2:
51 reader = self.stderr
52 else:
53 reader = None
54 if reader is not None:
55 reader.feed_data(data)
56
57 def pipe_connection_lost(self, fd, exc):
58 if fd == 0:
59 pipe = self.stdin
60 if pipe is not None:
61 pipe.close()
62 self.connection_lost(exc)
63 return
64 if fd == 1:
65 reader = self.stdout
66 elif fd == 2:
67 reader = self.stderr
68 else:
69 reader = None
70 if reader != None:
71 if exc is None:
72 reader.feed_eof()
73 else:
74 reader.set_exception(exc)
75
76 def process_exited(self):
77 # wake up futures waiting for wait()
78 returncode = self._transport.get_returncode()
79 while self._waiters:
80 waiter = self._waiters.popleft()
81 waiter.set_result(returncode)
82
83
84class Process:
85 def __init__(self, transport, protocol, loop):
86 self._transport = transport
87 self._protocol = protocol
88 self._loop = loop
89 self.stdin = protocol.stdin
90 self.stdout = protocol.stdout
91 self.stderr = protocol.stderr
92 self.pid = transport.get_pid()
93
94 @property
95 def returncode(self):
96 return self._transport.get_returncode()
97
Victor Stinnerf951d282014-06-29 00:46:45 +020098 @coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +010099 def wait(self):
100 """Wait until the process exit and return the process return code."""
101 returncode = self._transport.get_returncode()
102 if returncode is not None:
103 return returncode
104
105 waiter = futures.Future(loop=self._loop)
106 self._protocol._waiters.append(waiter)
107 yield from waiter
108 return waiter.result()
109
Victor Stinner915bcb02014-02-01 22:49:59 +0100110 def _check_alive(self):
111 if self._transport.get_returncode() is not None:
112 raise ProcessLookupError()
113
114 def send_signal(self, signal):
115 self._check_alive()
116 self._transport.send_signal(signal)
117
118 def terminate(self):
119 self._check_alive()
120 self._transport.terminate()
121
122 def kill(self):
123 self._check_alive()
124 self._transport.kill()
125
Victor Stinnerf951d282014-06-29 00:46:45 +0200126 @coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +0100127 def _feed_stdin(self, input):
128 self.stdin.write(input)
129 yield from self.stdin.drain()
130 self.stdin.close()
131
Victor Stinnerf951d282014-06-29 00:46:45 +0200132 @coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +0100133 def _noop(self):
134 return None
135
Victor Stinnerf951d282014-06-29 00:46:45 +0200136 @coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +0100137 def _read_stream(self, fd):
138 transport = self._transport.get_pipe_transport(fd)
139 if fd == 2:
140 stream = self.stderr
141 else:
142 assert fd == 1
143 stream = self.stdout
144 output = yield from stream.read()
145 transport.close()
146 return output
147
Victor Stinnerf951d282014-06-29 00:46:45 +0200148 @coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +0100149 def communicate(self, input=None):
Victor Stinner915bcb02014-02-01 22:49:59 +0100150 if input:
151 stdin = self._feed_stdin(input)
152 else:
153 stdin = self._noop()
154 if self.stdout is not None:
155 stdout = self._read_stream(1)
156 else:
157 stdout = self._noop()
158 if self.stderr is not None:
159 stderr = self._read_stream(2)
160 else:
161 stderr = self._noop()
162 stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr,
Victor Stinner8d0230b2014-02-20 10:12:59 +0100163 loop=self._loop)
Victor Stinner915bcb02014-02-01 22:49:59 +0100164 yield from self.wait()
165 return (stdout, stderr)
166
167
Victor Stinnerf951d282014-06-29 00:46:45 +0200168@coroutine
Victor Stinner915bcb02014-02-01 22:49:59 +0100169def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
170 loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
171 if loop is None:
172 loop = events.get_event_loop()
173 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
174 loop=loop)
175 transport, protocol = yield from loop.subprocess_shell(
176 protocol_factory,
177 cmd, stdin=stdin, stdout=stdout,
178 stderr=stderr, **kwds)
179 yield from protocol.waiter
180 return Process(transport, protocol, loop)
181
Victor Stinnerf951d282014-06-29 00:46:45 +0200182@coroutine
Yury Selivanov57797522014-02-18 22:56:15 -0500183def create_subprocess_exec(program, *args, stdin=None, stdout=None,
184 stderr=None, loop=None,
185 limit=streams._DEFAULT_LIMIT, **kwds):
Victor Stinner915bcb02014-02-01 22:49:59 +0100186 if loop is None:
187 loop = events.get_event_loop()
188 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
189 loop=loop)
190 transport, protocol = yield from loop.subprocess_exec(
191 protocol_factory,
Victor Stinner20e07432014-02-11 11:44:56 +0100192 program, *args,
193 stdin=stdin, stdout=stdout,
Victor Stinner915bcb02014-02-01 22:49:59 +0100194 stderr=stderr, **kwds)
195 yield from protocol.waiter
196 return Process(transport, protocol, loop)