blob: f20e72a65fe7c2795cb74d0879123001615b7526 [file] [log] [blame]
Paul Duffine2363012015-11-30 16:20:41 +00001/*
2 * Copyright (C) 2013 Google Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5 * in compliance with the License. You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software distributed under the License
10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11 * or implied. See the License for the specific language governing permissions and limitations under
12 * the License.
13 */
14
15package com.google.caliper.runner;
16
17import static com.google.common.base.Preconditions.checkArgument;
18import static com.google.common.base.Preconditions.checkState;
19import static org.junit.Assert.assertEquals;
20import static org.junit.Assert.assertNotSame;
21import static org.junit.Assert.assertTrue;
22import static org.junit.Assert.fail;
23
24import com.google.caliper.bridge.LogMessage;
25import com.google.caliper.bridge.OpenedSocket;
26import com.google.caliper.runner.FakeWorkers.DummyLogMessage;
27import com.google.caliper.runner.StreamService.StreamItem;
28import com.google.caliper.runner.StreamService.StreamItem.Kind;
29import com.google.caliper.util.Parser;
30import com.google.common.collect.Sets;
31import com.google.common.util.concurrent.ListenableFuture;
32import com.google.common.util.concurrent.ListenableFutureTask;
33import com.google.common.util.concurrent.MoreExecutors;
34import com.google.common.util.concurrent.Service.Listener;
35import com.google.common.util.concurrent.Service.State;
36
37import org.junit.After;
38import org.junit.Before;
39import org.junit.Test;
40import org.junit.runner.RunWith;
41import org.junit.runners.JUnit4;
42
43import java.io.File;
44import java.io.FileNotFoundException;
45import java.io.IOException;
46import java.io.PrintWriter;
47import java.io.StringWriter;
48import java.net.ServerSocket;
49import java.net.SocketException;
50import java.text.ParseException;
51import java.util.Set;
52import java.util.UUID;
53import java.util.concurrent.Callable;
54import java.util.concurrent.CountDownLatch;
55import java.util.concurrent.TimeUnit;
56
57/**
58 * Tests for {@link StreamService}.
59 */
60@RunWith(JUnit4.class)
61
62public class StreamServiceTest {
63
64 private ServerSocket serverSocket;
65 private final StringWriter writer = new StringWriter();
66 private final PrintWriter stdout = new PrintWriter(writer, true);
67 private final Parser<LogMessage> parser = new Parser<LogMessage>() {
68 @Override public LogMessage parse(final CharSequence text) throws ParseException {
69 return new DummyLogMessage(text.toString());
70 }
71 };
72
73 private StreamService service;
74 private final CountDownLatch terminalLatch = new CountDownLatch(1);
75 private static final int TRIAL_NUMBER = 3;
76
77 @Before public void setUp() throws IOException {
78 serverSocket = new ServerSocket(0);
79 }
80
81 @After public void closeSocket() throws IOException {
82 serverSocket.close();
83 }
84
85 @After public void stopService() {
86 if (service != null && service.state() != State.FAILED && service.state() != State.TERMINATED) {
87 service.stopAsync().awaitTerminated();
88 }
89 }
90
91 @Test public void testReadOutput() throws Exception {
92 makeService(FakeWorkers.PrintClient.class, "foo", "bar");
93 service.startAsync().awaitRunning();
94 StreamItem item1 = readItem();
95 assertEquals(Kind.DATA, item1.kind());
96 Set<String> lines = Sets.newHashSet();
97 lines.add(item1.content().toString());
98 StreamItem item2 = readItem();
99 assertEquals(Kind.DATA, item2.kind());
100 lines.add(item2.content().toString());
101 assertEquals(Sets.newHashSet("foo", "bar"), lines);
102 assertEquals(State.RUNNING, service.state());
103 StreamItem item3 = readItem();
104 assertEquals(Kind.EOF, item3.kind());
105 awaitStopped(100, TimeUnit.MILLISECONDS);
106 assertTerminated();
107 }
108
109 @Test public void failingProcess() throws Exception {
110 makeService(FakeWorkers.Exit.class, "1");
111 service.startAsync().awaitRunning();
112 assertEquals(Kind.EOF, readItem().kind());
113 awaitStopped(100, TimeUnit.MILLISECONDS);
114 assertEquals(State.FAILED, service.state());
115 }
116
117 @Test public void processDoesntExit() throws Exception {
118 // close all fds and then sleep
119 makeService(FakeWorkers.CloseAndSleep.class);
120 service.startAsync().awaitRunning();
121 assertEquals(Kind.EOF, readItem().kind());
122 awaitStopped(200, TimeUnit.MILLISECONDS); // we
123 assertEquals(State.FAILED, service.state());
124 }
125
126 @Test public void testSocketInputOutput() throws Exception {
127 int localport = serverSocket.getLocalPort();
128 // read from the socket and echo it back
129 makeService(FakeWorkers.SocketEchoClient.class, Integer.toString(localport));
130
131 service.startAsync().awaitRunning();
132 assertEquals(new DummyLogMessage("start"), readItem().content());
133 service.sendMessage(new DummyLogMessage("hello socket world"));
134 assertEquals(new DummyLogMessage("hello socket world"), readItem().content());
135 service.closeWriter();
136 assertEquals(State.RUNNING, service.state());
137 StreamItem nextItem = readItem();
138 assertEquals("Expected EOF " + nextItem, Kind.EOF, nextItem.kind());
139 awaitStopped(100, TimeUnit.MILLISECONDS);
140 assertTerminated();
141 }
142
143 @Test public void testSocketClosesBeforeProcess() throws Exception {
144 int localport = serverSocket.getLocalPort();
145 // read from the socket and echo it back
146 makeService(FakeWorkers.SocketEchoClient.class, Integer.toString(localport), "foo");
147 service.startAsync().awaitRunning();
148 assertEquals(new DummyLogMessage("start"), readItem().content());
149 service.sendMessage(new DummyLogMessage("hello socket world"));
150 assertEquals(new DummyLogMessage("hello socket world"), readItem().content());
151 service.closeWriter();
152
153 assertEquals("foo", readItem().content().toString());
154
155 assertEquals(State.RUNNING, service.state());
156 assertEquals(Kind.EOF, readItem().kind());
157 awaitStopped(100, TimeUnit.MILLISECONDS);
158 assertTerminated();
159 }
160
161 @Test public void failsToAcceptConnection() throws Exception {
162 serverSocket.close(); // This will force serverSocket.accept to throw a SocketException
163 makeService(FakeWorkers.Sleeper.class, Long.toString(TimeUnit.MINUTES.toMillis(10)));
164 try {
165 service.startAsync().awaitRunning();
166 fail();
167 } catch (IllegalStateException expected) {}
168 assertEquals(SocketException.class, service.failureCause().getClass());
169 }
170
171 /** Reads an item, asserting that there was no timeout. */
172 private StreamItem readItem() throws InterruptedException {
173 StreamItem item = service.readItem(10, TimeUnit.SECONDS);
174 assertNotSame("Timed out while reading item from worker", Kind.TIMEOUT, item.kind());
175 return item;
176 }
177
178 /**
179 * Wait for the service to reach a terminal state without calling stop.
180 */
181 private void awaitStopped(long time, TimeUnit unit) throws InterruptedException {
182 assertTrue(terminalLatch.await(time, unit));
183 }
184
185 private void assertTerminated() {
186 State state = service.state();
187 if (state != State.TERMINATED) {
188 if (state == State.FAILED) {
189 throw new AssertionError(service.failureCause());
190 }
191 fail("Expected service to be terminated but was: " + state);
192 }
193 }
194
195 @SuppressWarnings("resource")
196 private void makeService(Class<?> main, String ...args) {
197 checkState(service == null, "You can only make one StreamService per test");
198 UUID trialId = UUID.randomUUID();
199 TrialOutputLogger trialOutput = new TrialOutputLogger(new TrialOutputFactory() {
200 @Override public FileAndWriter getTrialOutputFile(int trialNumber)
201 throws FileNotFoundException {
202 checkArgument(trialNumber == TRIAL_NUMBER);
203 return new FileAndWriter(new File("/tmp/not-a-file"), stdout);
204 }
205
206 @Override public void persistFile(File f) {
207 throw new UnsupportedOperationException();
208 }
209
210 }, TRIAL_NUMBER, trialId, null /* experiment */);
211 try {
212 // normally the TrialRunLoop opens/closes the logger
213 trialOutput.open();
214 } catch (IOException e) {
215 throw new RuntimeException(e);
216 }
217 service = new StreamService(
218 new WorkerProcess(FakeWorkers.createProcessBuilder(main, args),
219 trialId,
220 getSocketFuture(),
221 new RuntimeShutdownHookRegistrar()),
222 parser,
223 trialOutput);
224 service.addListener(new Listener() {
225 @Override public void starting() {}
226 @Override public void running() {}
227 @Override public void stopping(State from) {}
228 @Override public void terminated(State from) {
229 terminalLatch.countDown();
230 }
231 @Override public void failed(State from, Throwable failure) {
232 terminalLatch.countDown();
233 }
234 }, MoreExecutors.directExecutor());
235 }
236
237 private ListenableFuture<OpenedSocket> getSocketFuture() {
238 ListenableFutureTask<OpenedSocket> openSocketTask = ListenableFutureTask.create(
239 new Callable<OpenedSocket>() {
240 @Override
241 public OpenedSocket call() throws Exception {
242 return OpenedSocket.fromSocket(serverSocket.accept());
243 }
244 });
245 // N.B. this thread will block on serverSocket.accept until a connection is accepted or the
246 // socket is closed, so no matter what this thread will die with the test.
247 Thread opener = new Thread(openSocketTask, "SocketOpener");
248 opener.setDaemon(true);
249 opener.start();
250 return openSocketTask;
251 }
252}