blob: cc4f8cb7592d580f27a1fbdd473cccade7d4834b [file] [log] [blame]
Guido van Rossum0016e1d2013-10-30 14:56:49 -07001import collections
2import subprocess
3
4from . import protocols
5from . import tasks
6from . import transports
7
8
9STDIN = 0
10STDOUT = 1
11STDERR = 2
12
13
14class BaseSubprocessTransport(transports.SubprocessTransport):
15
16 def __init__(self, loop, protocol, args, shell,
17 stdin, stdout, stderr, bufsize,
18 extra=None, **kwargs):
19 super().__init__(extra)
20 self._protocol = protocol
21 self._loop = loop
22
23 self._pipes = {}
24 if stdin == subprocess.PIPE:
25 self._pipes[STDIN] = None
26 if stdout == subprocess.PIPE:
27 self._pipes[STDOUT] = None
28 if stderr == subprocess.PIPE:
29 self._pipes[STDERR] = None
30 self._pending_calls = collections.deque()
31 self._finished = False
32 self._returncode = None
33 self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
34 stderr=stderr, bufsize=bufsize, **kwargs)
35 self._extra['subprocess'] = self._proc
36
37 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
38 raise NotImplementedError
39
40 def _make_write_subprocess_pipe_proto(self, fd):
41 raise NotImplementedError
42
43 def _make_read_subprocess_pipe_proto(self, fd):
44 raise NotImplementedError
45
46 def close(self):
47 for proto in self._pipes.values():
48 proto.pipe.close()
49 if self._returncode is None:
50 self.terminate()
51
52 def get_pid(self):
53 return self._proc.pid
54
55 def get_returncode(self):
56 return self._returncode
57
58 def get_pipe_transport(self, fd):
59 if fd in self._pipes:
60 return self._pipes[fd].pipe
61 else:
62 return None
63
64 def send_signal(self, signal):
65 self._proc.send_signal(signal)
66
67 def terminate(self):
68 self._proc.terminate()
69
70 def kill(self):
71 self._proc.kill()
72
73 @tasks.coroutine
74 def _post_init(self):
75 proc = self._proc
76 loop = self._loop
77 if proc.stdin is not None:
Victor Stinneraaabc4f2014-01-29 14:22:56 -080078 _, pipe = yield from loop.connect_write_pipe(
Guido van Rossum0016e1d2013-10-30 14:56:49 -070079 lambda: WriteSubprocessPipeProto(self, STDIN),
80 proc.stdin)
Victor Stinneraaabc4f2014-01-29 14:22:56 -080081 self._pipes[STDIN] = pipe
Guido van Rossum0016e1d2013-10-30 14:56:49 -070082 if proc.stdout is not None:
Victor Stinneraaabc4f2014-01-29 14:22:56 -080083 _, pipe = yield from loop.connect_read_pipe(
Guido van Rossum0016e1d2013-10-30 14:56:49 -070084 lambda: ReadSubprocessPipeProto(self, STDOUT),
85 proc.stdout)
Victor Stinneraaabc4f2014-01-29 14:22:56 -080086 self._pipes[STDOUT] = pipe
Guido van Rossum0016e1d2013-10-30 14:56:49 -070087 if proc.stderr is not None:
Victor Stinneraaabc4f2014-01-29 14:22:56 -080088 _, pipe = yield from loop.connect_read_pipe(
Guido van Rossum0016e1d2013-10-30 14:56:49 -070089 lambda: ReadSubprocessPipeProto(self, STDERR),
90 proc.stderr)
Victor Stinneraaabc4f2014-01-29 14:22:56 -080091 self._pipes[STDERR] = pipe
92
93 assert self._pending_calls is not None
94
95 self._loop.call_soon(self._protocol.connection_made, self)
96 for callback, data in self._pending_calls:
97 self._loop.call_soon(callback, *data)
98 self._pending_calls = None
Guido van Rossum0016e1d2013-10-30 14:56:49 -070099
100 def _call(self, cb, *data):
101 if self._pending_calls is not None:
102 self._pending_calls.append((cb, data))
103 else:
104 self._loop.call_soon(cb, *data)
105
Guido van Rossum0016e1d2013-10-30 14:56:49 -0700106 def _pipe_connection_lost(self, fd, exc):
107 self._call(self._protocol.pipe_connection_lost, fd, exc)
108 self._try_finish()
109
110 def _pipe_data_received(self, fd, data):
111 self._call(self._protocol.pipe_data_received, fd, data)
112
113 def _process_exited(self, returncode):
114 assert returncode is not None, returncode
115 assert self._returncode is None, self._returncode
116 self._returncode = returncode
Guido van Rossum0016e1d2013-10-30 14:56:49 -0700117 self._call(self._protocol.process_exited)
118 self._try_finish()
119
120 def _try_finish(self):
121 assert not self._finished
122 if self._returncode is None:
123 return
124 if all(p is not None and p.disconnected
125 for p in self._pipes.values()):
126 self._finished = True
127 self._loop.call_soon(self._call_connection_lost, None)
128
129 def _call_connection_lost(self, exc):
130 try:
131 self._protocol.connection_lost(exc)
132 finally:
133 self._proc = None
134 self._protocol = None
135 self._loop = None
136
137
138class WriteSubprocessPipeProto(protocols.BaseProtocol):
Guido van Rossum0016e1d2013-10-30 14:56:49 -0700139
140 def __init__(self, proc, fd):
141 self.proc = proc
142 self.fd = fd
Victor Stinneraaabc4f2014-01-29 14:22:56 -0800143 self.pipe = None
Guido van Rossum0016e1d2013-10-30 14:56:49 -0700144 self.disconnected = False
Guido van Rossum0016e1d2013-10-30 14:56:49 -0700145
146 def connection_made(self, transport):
Guido van Rossum0016e1d2013-10-30 14:56:49 -0700147 self.pipe = transport
Guido van Rossum0016e1d2013-10-30 14:56:49 -0700148
149 def connection_lost(self, exc):
150 self.disconnected = True
151 self.proc._pipe_connection_lost(self.fd, exc)
152
153 def eof_received(self):
154 pass
155
156
157class ReadSubprocessPipeProto(WriteSubprocessPipeProto,
158 protocols.Protocol):
159
160 def data_received(self, data):
161 self.proc._pipe_data_received(self.fd, data)