blob: b78f816d4c42a8d78614a96c75284ff452f5e158 [file] [log] [blame]
Guido van Rossum0016e1d2013-10-30 14:56:49 -07001import collections
2import subprocess
3
4from . import protocols
5from . import tasks
6from . import transports
7
8
Guido van Rossum0016e1d2013-10-30 14:56:49 -07009class BaseSubprocessTransport(transports.SubprocessTransport):
10
11 def __init__(self, loop, protocol, args, shell,
12 stdin, stdout, stderr, bufsize,
13 extra=None, **kwargs):
14 super().__init__(extra)
15 self._protocol = protocol
16 self._loop = loop
17
18 self._pipes = {}
19 if stdin == subprocess.PIPE:
Victor Stinner915bcb02014-02-01 22:49:59 +010020 self._pipes[0] = None
Guido van Rossum0016e1d2013-10-30 14:56:49 -070021 if stdout == subprocess.PIPE:
Victor Stinner915bcb02014-02-01 22:49:59 +010022 self._pipes[1] = None
Guido van Rossum0016e1d2013-10-30 14:56:49 -070023 if stderr == subprocess.PIPE:
Victor Stinner915bcb02014-02-01 22:49:59 +010024 self._pipes[2] = None
Guido van Rossum0016e1d2013-10-30 14:56:49 -070025 self._pending_calls = collections.deque()
26 self._finished = False
27 self._returncode = None
28 self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
29 stderr=stderr, bufsize=bufsize, **kwargs)
30 self._extra['subprocess'] = self._proc
31
32 def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
33 raise NotImplementedError
34
35 def _make_write_subprocess_pipe_proto(self, fd):
36 raise NotImplementedError
37
38 def _make_read_subprocess_pipe_proto(self, fd):
39 raise NotImplementedError
40
41 def close(self):
42 for proto in self._pipes.values():
43 proto.pipe.close()
44 if self._returncode is None:
45 self.terminate()
46
47 def get_pid(self):
48 return self._proc.pid
49
50 def get_returncode(self):
51 return self._returncode
52
53 def get_pipe_transport(self, fd):
54 if fd in self._pipes:
55 return self._pipes[fd].pipe
56 else:
57 return None
58
59 def send_signal(self, signal):
60 self._proc.send_signal(signal)
61
62 def terminate(self):
63 self._proc.terminate()
64
65 def kill(self):
66 self._proc.kill()
67
68 @tasks.coroutine
69 def _post_init(self):
70 proc = self._proc
71 loop = self._loop
72 if proc.stdin is not None:
Victor Stinneraaabc4f2014-01-29 14:22:56 -080073 _, pipe = yield from loop.connect_write_pipe(
Victor Stinner915bcb02014-02-01 22:49:59 +010074 lambda: WriteSubprocessPipeProto(self, 0),
Guido van Rossum0016e1d2013-10-30 14:56:49 -070075 proc.stdin)
Victor Stinner915bcb02014-02-01 22:49:59 +010076 self._pipes[0] = pipe
Guido van Rossum0016e1d2013-10-30 14:56:49 -070077 if proc.stdout is not None:
Victor Stinneraaabc4f2014-01-29 14:22:56 -080078 _, pipe = yield from loop.connect_read_pipe(
Victor Stinner915bcb02014-02-01 22:49:59 +010079 lambda: ReadSubprocessPipeProto(self, 1),
Guido van Rossum0016e1d2013-10-30 14:56:49 -070080 proc.stdout)
Victor Stinner915bcb02014-02-01 22:49:59 +010081 self._pipes[1] = pipe
Guido van Rossum0016e1d2013-10-30 14:56:49 -070082 if proc.stderr is not None:
Victor Stinneraaabc4f2014-01-29 14:22:56 -080083 _, pipe = yield from loop.connect_read_pipe(
Victor Stinner915bcb02014-02-01 22:49:59 +010084 lambda: ReadSubprocessPipeProto(self, 2),
Guido van Rossum0016e1d2013-10-30 14:56:49 -070085 proc.stderr)
Victor Stinner915bcb02014-02-01 22:49:59 +010086 self._pipes[2] = pipe
Victor Stinneraaabc4f2014-01-29 14:22:56 -080087
88 assert self._pending_calls is not None
89
90 self._loop.call_soon(self._protocol.connection_made, self)
91 for callback, data in self._pending_calls:
92 self._loop.call_soon(callback, *data)
93 self._pending_calls = None
Guido van Rossum0016e1d2013-10-30 14:56:49 -070094
95 def _call(self, cb, *data):
96 if self._pending_calls is not None:
97 self._pending_calls.append((cb, data))
98 else:
99 self._loop.call_soon(cb, *data)
100
Guido van Rossum0016e1d2013-10-30 14:56:49 -0700101 def _pipe_connection_lost(self, fd, exc):
102 self._call(self._protocol.pipe_connection_lost, fd, exc)
103 self._try_finish()
104
105 def _pipe_data_received(self, fd, data):
106 self._call(self._protocol.pipe_data_received, fd, data)
107
108 def _process_exited(self, returncode):
109 assert returncode is not None, returncode
110 assert self._returncode is None, self._returncode
111 self._returncode = returncode
Guido van Rossum0016e1d2013-10-30 14:56:49 -0700112 self._call(self._protocol.process_exited)
113 self._try_finish()
114
115 def _try_finish(self):
116 assert not self._finished
117 if self._returncode is None:
118 return
119 if all(p is not None and p.disconnected
120 for p in self._pipes.values()):
121 self._finished = True
122 self._loop.call_soon(self._call_connection_lost, None)
123
124 def _call_connection_lost(self, exc):
125 try:
126 self._protocol.connection_lost(exc)
127 finally:
128 self._proc = None
129 self._protocol = None
130 self._loop = None
131
132
133class WriteSubprocessPipeProto(protocols.BaseProtocol):
Guido van Rossum0016e1d2013-10-30 14:56:49 -0700134
135 def __init__(self, proc, fd):
136 self.proc = proc
137 self.fd = fd
Victor Stinneraaabc4f2014-01-29 14:22:56 -0800138 self.pipe = None
Guido van Rossum0016e1d2013-10-30 14:56:49 -0700139 self.disconnected = False
Guido van Rossum0016e1d2013-10-30 14:56:49 -0700140
141 def connection_made(self, transport):
Guido van Rossum0016e1d2013-10-30 14:56:49 -0700142 self.pipe = transport
Guido van Rossum0016e1d2013-10-30 14:56:49 -0700143
144 def connection_lost(self, exc):
145 self.disconnected = True
146 self.proc._pipe_connection_lost(self.fd, exc)
147
Guido van Rossum1e9a4462014-01-29 14:28:15 -0800148 def pause_writing(self):
149 self.proc._protocol.pause_writing()
150
151 def resume_writing(self):
152 self.proc._protocol.resume_writing()
Guido van Rossum0016e1d2013-10-30 14:56:49 -0700153
154
155class ReadSubprocessPipeProto(WriteSubprocessPipeProto,
156 protocols.Protocol):
157
158 def data_received(self, data):
159 self.proc._pipe_data_received(self.fd, data)