blob: c17c5c9a5ef9c703ac33877d7be84839d71d7eb5 [file] [log] [blame]
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +09001package main
2
3import (
4 "bufio"
5 "encoding/binary"
6 "fmt"
7 "io"
8 "os/exec"
9 "path/filepath"
10)
11
12func btoi(b bool) int {
13 if b {
14 return 1
15 } else {
16 return 0
17 }
18}
19
20func sendMsg(w io.Writer, data []byte) {
21 for len(data) != 0 {
22 written, err := w.Write(data)
23 if err == io.EOF {
24 return
25 }
26 if err != nil {
27 panic(err)
28 }
29 data = data[written:]
30 }
31}
32
33func sendInt(w io.Writer, i int) {
34 v := int32(i)
35 binary.Write(w, binary.LittleEndian, &v)
36}
37
38func sendString(w io.Writer, s string) {
39 sendInt(w, len(s))
40 sendMsg(w, []byte(s))
41}
42
43func sendRunners(w io.Writer, runners []runner) {
44 sendInt(w, len(runners))
45 for _, r := range runners {
46 sendString(w, r.output)
47 sendString(w, r.cmd)
48 sendString(w, r.shell)
49 sendInt(w, btoi(r.echo))
50 sendInt(w, btoi(r.ignoreError))
51 }
52}
53
54type ParaResult struct {
55 output string
56 stdout string
57 stderr string
58 status int
Shinichiro Hamajia6808422015-05-13 18:00:50 +090059 signal int
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +090060}
61
62func recvInt(r *bufio.Reader) (int, error) {
63 var v int32
64 err := binary.Read(r, binary.LittleEndian, &v)
65 return int(v), err
66}
67
68func recvString(r *bufio.Reader) (string, error) {
69 l, err := recvInt(r)
70 if err != nil {
71 return "", err
72 }
73 buf := make([]byte, l)
74 read := 0
75 for read < len(buf) {
76 r, err := r.Read(buf[read:])
77 if err != nil {
78 return "", err
79 }
80 read += r
81 }
82 return string(buf), nil
83}
84
85func recvResult(r *bufio.Reader) (*ParaResult, error) {
86 output, err := recvString(r)
87 if err != nil {
88 return nil, err
89 }
90 stdout, err := recvString(r)
91 if err != nil {
92 return nil, err
93 }
94 stderr, err := recvString(r)
95 if err != nil {
96 return nil, err
97 }
98 status, err := recvInt(r)
99 if err != nil {
100 return nil, err
101 }
Shinichiro Hamajia6808422015-05-13 18:00:50 +0900102 signal, err := recvInt(r)
103 if err != nil {
104 return nil, err
105 }
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900106 return &ParaResult{
107 output: output,
108 stdout: stdout,
109 stderr: stderr,
110 status: status,
Shinichiro Hamajia6808422015-05-13 18:00:50 +0900111 signal: signal,
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900112 }, nil
113}
114
115type ParaWorker struct {
116 para *exec.Cmd
117 paraChan chan *ParaResult
118 stdin io.WriteCloser
119 stdout *bufio.Reader
120 doneChan chan bool
121}
122
123func NewParaWorker(paraChan chan *ParaResult) *ParaWorker {
124 bin := filepath.Join(katiDir, "para")
125 para := exec.Command(bin, fmt.Sprintf("-j%d", jobsFlag), "--kati")
126 stdin, err := para.StdinPipe()
127 if err != nil {
128 panic(err)
129 }
130 stdout, err := para.StdoutPipe()
131 if err != nil {
132 panic(err)
133 }
134 err = para.Start()
135 if err != nil {
136 panic(err)
137 }
138 return &ParaWorker{
139 para: para,
140 paraChan: paraChan,
141 stdin: stdin,
142 stdout: bufio.NewReader(stdout),
143 doneChan: make(chan bool),
144 }
145}
146
147func (para *ParaWorker) Run() {
148 for {
149 r, err := recvResult(para.stdout)
150 if err == io.EOF {
151 break
152 }
153 if err != nil {
154 panic(err)
155 }
156 para.paraChan <- r
157 }
158 para.para.Process.Kill()
159 para.para.Process.Wait()
160 para.doneChan <- true
161}
162
163func (para *ParaWorker) Wait() {
Shinichiro Hamajicedc5c82015-05-13 17:03:20 +0900164 para.stdin.Close()
165 <-para.doneChan
166}
167
168func (para *ParaWorker) RunCommand(runners []runner) {
169 sendRunners(para.stdin, runners)
170}