blob: d15fb159d90a4c1f715c525c95c931e97caebc40 [file] [log] [blame]
Guido van Rossum0016e1d2013-10-30 14:56:49 -07001import collections
2import subprocess
3
4from . import protocols
5from . import tasks
6from . import transports
7
8
9STDIN = 0
10STDOUT = 1
11STDERR = 2
12
13
14class BaseSubprocessTransport(transports.SubprocessTransport):
15
16 def __init__(self, loop, protocol, args, shell,
17 stdin, stdout, stderr, bufsize,
18 extra=None, **kwargs):
19 super().__init__(extra)
20 self._protocol = protocol
21 self._loop = loop
22
23 self._pipes = {}
24 if stdin == subprocess.PIPE:
25 self._pipes[STDIN] = None
26 if stdout == subprocess.PIPE:
27 self._pipes[STDOUT] = None
28 if stderr == subprocess.PIPE:
29 self._pipes[STDERR] = None
30 self._pending_calls = collections.deque()
31 self._finished = False
32 self._returncode = None
33 self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
34 stderr=stderr, bufsize=bufsize, **kwargs)
35 self._extra['subprocess'] = self._proc
36
37 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
38 raise NotImplementedError
39
40 def _make_write_subprocess_pipe_proto(self, fd):
41 raise NotImplementedError
42
43 def _make_read_subprocess_pipe_proto(self, fd):
44 raise NotImplementedError
45
46 def close(self):
47 for proto in self._pipes.values():
48 proto.pipe.close()
49 if self._returncode is None:
50 self.terminate()
51
52 def get_pid(self):
53 return self._proc.pid
54
55 def get_returncode(self):
56 return self._returncode
57
58 def get_pipe_transport(self, fd):
59 if fd in self._pipes:
60 return self._pipes[fd].pipe
61 else:
62 return None
63
64 def send_signal(self, signal):
65 self._proc.send_signal(signal)
66
67 def terminate(self):
68 self._proc.terminate()
69
70 def kill(self):
71 self._proc.kill()
72
73 @tasks.coroutine
74 def _post_init(self):
75 proc = self._proc
76 loop = self._loop
77 if proc.stdin is not None:
78 transp, proto = yield from loop.connect_write_pipe(
79 lambda: WriteSubprocessPipeProto(self, STDIN),
80 proc.stdin)
81 if proc.stdout is not None:
82 transp, proto = yield from loop.connect_read_pipe(
83 lambda: ReadSubprocessPipeProto(self, STDOUT),
84 proc.stdout)
85 if proc.stderr is not None:
86 transp, proto = yield from loop.connect_read_pipe(
87 lambda: ReadSubprocessPipeProto(self, STDERR),
88 proc.stderr)
89 if not self._pipes:
90 self._try_connected()
91
92 def _call(self, cb, *data):
93 if self._pending_calls is not None:
94 self._pending_calls.append((cb, data))
95 else:
96 self._loop.call_soon(cb, *data)
97
98 def _try_connected(self):
99 assert self._pending_calls is not None
100 if all(p is not None and p.connected for p in self._pipes.values()):
101 self._loop.call_soon(self._protocol.connection_made, self)
102 for callback, data in self._pending_calls:
103 self._loop.call_soon(callback, *data)
104 self._pending_calls = None
105
106 def _pipe_connection_lost(self, fd, exc):
107 self._call(self._protocol.pipe_connection_lost, fd, exc)
108 self._try_finish()
109
110 def _pipe_data_received(self, fd, data):
111 self._call(self._protocol.pipe_data_received, fd, data)
112
113 def _process_exited(self, returncode):
114 assert returncode is not None, returncode
115 assert self._returncode is None, self._returncode
116 self._returncode = returncode
117 self._loop._subprocess_closed(self)
118 self._call(self._protocol.process_exited)
119 self._try_finish()
120
121 def _try_finish(self):
122 assert not self._finished
123 if self._returncode is None:
124 return
125 if all(p is not None and p.disconnected
126 for p in self._pipes.values()):
127 self._finished = True
128 self._loop.call_soon(self._call_connection_lost, None)
129
130 def _call_connection_lost(self, exc):
131 try:
132 self._protocol.connection_lost(exc)
133 finally:
134 self._proc = None
135 self._protocol = None
136 self._loop = None
137
138
139class WriteSubprocessPipeProto(protocols.BaseProtocol):
140 pipe = None
141
142 def __init__(self, proc, fd):
143 self.proc = proc
144 self.fd = fd
145 self.connected = False
146 self.disconnected = False
147 proc._pipes[fd] = self
148
149 def connection_made(self, transport):
150 self.connected = True
151 self.pipe = transport
152 self.proc._try_connected()
153
154 def connection_lost(self, exc):
155 self.disconnected = True
156 self.proc._pipe_connection_lost(self.fd, exc)
157
158 def eof_received(self):
159 pass
160
161
162class ReadSubprocessPipeProto(WriteSubprocessPipeProto,
163 protocols.Protocol):
164
165 def data_received(self, data):
166 self.proc._pipe_data_received(self.fd, data)