blob: a5852d0e6bf0e523bbdc27375e8fa0be15be52e3 [file] [log] [blame]
Paul Duffine2363012015-11-30 16:20:41 +00001/*
2 * Copyright (C) 2013 Google Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5 * in compliance with the License. You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software distributed under the License
10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11 * or implied. See the License for the specific language governing permissions and limitations under
12 * the License.
13 */
14
15package com.google.caliper.runner;
16
17import static com.google.common.base.Preconditions.checkNotNull;
18import static com.google.common.base.Preconditions.checkState;
19
20import com.google.caliper.bridge.LogMessage;
21import com.google.caliper.bridge.OpenedSocket;
22import com.google.caliper.bridge.StopMeasurementLogMessage;
23import com.google.caliper.model.Measurement;
24import com.google.caliper.runner.StreamService.StreamItem.Kind;
25import com.google.caliper.util.Parser;
26import com.google.common.base.MoreObjects;
27import com.google.common.base.MoreObjects.ToStringHelper;
28import com.google.common.collect.Queues;
29import com.google.common.io.Closeables;
30import com.google.common.io.LineReader;
31import com.google.common.util.concurrent.AbstractService;
32import com.google.common.util.concurrent.ListenableFuture;
33import com.google.common.util.concurrent.ListeningExecutorService;
34import com.google.common.util.concurrent.MoreExecutors;
35import com.google.common.util.concurrent.Service; // for javadoc
36import com.google.common.util.concurrent.Service.State; // for javadoc
37import com.google.common.util.concurrent.ThreadFactoryBuilder;
38import com.google.common.util.concurrent.Uninterruptibles;
39
40import java.io.IOException;
41import java.io.InputStreamReader;
42import java.io.Reader;
43import java.io.Serializable;
44import java.nio.charset.Charset;
45import java.text.ParseException;
46import java.util.concurrent.BlockingQueue;
47import java.util.concurrent.Callable;
48import java.util.concurrent.ExecutionException;
49import java.util.concurrent.Executors;
50import java.util.concurrent.TimeUnit;
51import java.util.concurrent.atomic.AtomicInteger;
52import java.util.logging.Logger;
53
54import javax.annotation.Nullable;
55import javax.inject.Inject;
56
57/**
58 * A {@link Service} that establishes a connection over a socket to a process and then allows
59 * multiplexed access to the processes' line oriented output over the socket and the standard
60 * process streams (stdout and stderr) as well as allowing data to be written over the socket.
61 *
62 * <p>The {@linkplain State states} of this service are as follows:
63 * <ul>
64 * <li>{@linkplain State#NEW NEW} : Idle state, no reading or writing is allowed.
65 * <li>{@linkplain State#STARTING STARTING} : Streams are being opened
66 * <li>{@linkplain State#RUNNING RUNNING} : At least one stream is still open or the writer has
67 * not been closed yet.
68 * <li>{@linkplain State#STOPPING STOPPING} : All streams have closed but some threads may still
69 * be running.
70 * <li>{@linkplain State#TERMINATED TERMINATED} : Idle state, all streams are closed
71 * <li>{@linkplain State#FAILED FAILED} : The service will transition to failed if it encounters
72 * any errors while reading from or writing to the streams, service failure will also cause
73 * the worker process to be forcibly shutdown and {@link #readItem(long, TimeUnit)},
74 * {@link #closeWriter()} and {@link #sendMessage(Serializable)} will start throwing
75 * IllegalStateExceptions.
76 * </ul>
77 */
78@TrialScoped final class StreamService extends AbstractService {
79 /** How long to wait for a process that should be exiting to actually exit. */
80 private static final int SHUTDOWN_WAIT_MILLIS = 10;
81
82 private static final Logger logger = Logger.getLogger(StreamService.class.getName());
83 private static final StreamItem TIMEOUT_ITEM = new StreamItem(Kind.TIMEOUT, null);
84
85 /** The final item that will be sent down the stream. */
86 static final StreamItem EOF_ITEM = new StreamItem(Kind.EOF, null);
87
88 private final ListeningExecutorService streamExecutor = MoreExecutors.listeningDecorator(
89 Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build()));
90 private final BlockingQueue<StreamItem> outputQueue = Queues.newLinkedBlockingQueue();
91 private final WorkerProcess worker;
92 private volatile Process process;
93 private final Parser<LogMessage> logMessageParser;
94 private final TrialOutputLogger trialOutput;
95
96 /**
97 * This represents the number of open streams from the users perspective. i.e. can you still
98 * write to the socket and read items.
99 *
100 * <p>This is decremented when either the socket is closed for writing or the EOF_ITEM has been
101 * read by the user.
102 */
103 private final AtomicInteger openStreams = new AtomicInteger();
104
105 /**
106 * Used to track how many read streams are open so we can correctly set the EOF_ITEM onto the
107 * queue.
108 */
109 private final AtomicInteger runningReadStreams = new AtomicInteger();
110 private OpenedSocket.Writer socketWriter;
111
112 @Inject StreamService(WorkerProcess worker,
113 Parser<LogMessage> logMessageParser,
114 TrialOutputLogger trialOutput) {
115 this.worker = worker;
116 this.logMessageParser = logMessageParser;
117 this.trialOutput = trialOutput;
118 }
119
120 @Override protected void doStart() {
121 try {
122 // TODO(lukes): write the commandline to the trial output file?
123 process = worker.startWorker();
124 } catch (IOException e) {
125 notifyFailed(e);
126 return;
127 }
128 // Failsafe kill the process and the executor service.
129 // If the process has already exited cleanly, this will be a no-op.
130 addListener(new Listener() {
131 @Override public void starting() {}
132 @Override public void running() {}
133 @Override public void stopping(State from) {}
134 @Override public void terminated(State from) {
135 cleanup();
136 }
137 @Override public void failed(State from, Throwable failure) {
138 cleanup();
139 }
140
141 void cleanup() {
142 streamExecutor.shutdown();
143 process.destroy();
144 try {
145 streamExecutor.awaitTermination(10, TimeUnit.MILLISECONDS);
146 } catch (InterruptedException e) {
147 Thread.currentThread().interrupt();
148 }
149 streamExecutor.shutdownNow();
150 }
151 }, MoreExecutors.directExecutor());
152 // You may be thinking as you read this "Yo dawg, what if IOExceptions rain from the sky?"
153 // If a stream we are reading from throws an IOException then we fail the entire Service. This
154 // will cause the worker to be killed (if its not dead already) and the various StreamReaders to
155 // be interrupted (eventually).
156
157 // use the default charset because worker streams will use the default for output
158 Charset processCharset = Charset.defaultCharset();
159 runningReadStreams.addAndGet(2);
160 openStreams.addAndGet(1);
161 streamExecutor.submit(
162 threadRenaming("worker-stderr",
163 new StreamReader("stderr",
164 new InputStreamReader(process.getErrorStream(), processCharset))));
165 streamExecutor.submit(
166 threadRenaming("worker-stdout",
167 new StreamReader("stdout",
168 new InputStreamReader(process.getInputStream(), processCharset))));
169 worker.socketFuture().addListener(
170 new Runnable() {
171 @Override public void run() {
172 try {
173 OpenedSocket openedSocket =
174 Uninterruptibles.getUninterruptibly(worker.socketFuture());
175 logger.fine("successfully opened the pipe from the worker");
176 socketWriter = openedSocket.writer();
177 runningReadStreams.addAndGet(1);
178 openStreams.addAndGet(1);
179 streamExecutor.submit(threadRenaming("worker-socket",
180 new SocketStreamReader(openedSocket.reader())));
181 } catch (ExecutionException e) {
182 notifyFailed(e.getCause());
183 }
184 }
185 },
186 MoreExecutors.directExecutor());
187 notifyStarted();
188 }
189
190 /**
191 * Reads a {@link StreamItem} from one of the streams waiting for one to become available if
192 * necessary.
193 */
194 StreamItem readItem(long timeout, TimeUnit unit) throws InterruptedException {
195 checkState(isRunning(), "Cannot read items from a %s StreamService", state());
196 StreamItem line = outputQueue.poll(timeout, unit);
197 if (line == EOF_ITEM) {
198 closeStream();
199 }
200 return (line == null) ? TIMEOUT_ITEM : line;
201 }
202
203 /**
204 * Write a line of data to the worker process over the socket.
205 *
206 * <p>N.B. Writing data via {@link #sendMessage(Serializable)} is only valid once the underlying
207 * socket has been opened. This should be fine assuming that socket writes are only in response
208 * to socket reads (which is currently the case), so there is no way that a write could happen
209 * prior to the socket being opened.
210 */
211 void sendMessage(Serializable message) throws IOException {
212 checkState(isRunning(), "Cannot read items from a %s StreamService", state());
213 checkState(socketWriter != null, "Attempted to write to the socket before it was opened.");
214 try {
215 socketWriter.write(message);
216 // We need to flush since this is a back and forth lockstep protocol, buffering can cause
217 // deadlock!
218 socketWriter.flush();
219 } catch (IOException e) {
220 Closeables.close(socketWriter, true);
221 notifyFailed(e);
222 throw e;
223 }
224 }
225
226 /** Closes the socket writer. */
227 void closeWriter() throws IOException {
228 checkState(isRunning(), "Cannot read items from a %s StreamService", state());
229 checkState(socketWriter != null, "Attempted to close the socket before it was opened.");
230 try {
231 socketWriter.close();
232 } catch (IOException e) {
233 notifyFailed(e);
234 throw e;
235 }
236 closeStream();
237 }
238
239 @Override protected void doStop() {
240 if (openStreams.get() > 0) {
241 // This means stop was called on us externally and we are still reading/writing, just log a
242 // warning and do nothing
243 logger.warning("Attempting to stop the stream service with streams still open");
244 }
245 final ListenableFuture<Integer> processFuture = streamExecutor.submit(new Callable<Integer>() {
246 @Override public Integer call() throws Exception {
247 return process.waitFor();
248 }
249 });
250 // Experimentally, even with well behaved processes there is some time between when all streams
251 // are closed as part of process shutdown and when the process has exited. So to not fail
252 // flakily when shutting down normally we need to do a timed wait
253 streamExecutor.submit(new Callable<Void>() {
254 @Override public Void call() throws Exception {
255 boolean threw = true;
256 try {
257 if (processFuture.get(SHUTDOWN_WAIT_MILLIS, TimeUnit.MILLISECONDS) == 0) {
258 notifyStopped();
259 } else {
260 notifyFailed(
261 new Exception("Process failed to stop cleanly. Exit code: " + process.waitFor()));
262 }
263 threw = false;
264 } finally {
265 processFuture.cancel(true); // we don't need it anymore
266 if (threw) {
267 process.destroy();
268 notifyFailed(
269 new Exception("Process failed to stop cleanly and was forcibly killed. Exit code: "
270 + process.waitFor()));
271 }
272 }
273 return null;
274 }
275 });
276 }
277
278 private void closeStream() {
279 if (openStreams.decrementAndGet() == 0) {
280 stopAsync();
281 }
282 }
283
284 private void closeReadStream() {
285 if (runningReadStreams.decrementAndGet() == 0) {
286 outputQueue.add(EOF_ITEM);
287 }
288 }
289
290 /** An item read from one of the streams. */
291 static class StreamItem {
292 enum Kind {
293 /** This indicates that it is the last item. */
294 EOF,
295 /** This indicates that reading the item timed out. */
296 TIMEOUT,
297 /** This indicates that this item has content. */
298 DATA;
299 }
300
301 @Nullable private final LogMessage logMessage;
302 private final Kind kind;
303
304 private StreamItem(LogMessage line) {
305 this(Kind.DATA, checkNotNull(line));
306 }
307
308 private StreamItem(Kind state, @Nullable LogMessage logMessage) {
309 this.logMessage = logMessage;
310 this.kind = state;
311 }
312
313 /** Returns the content. This is only valid if {@link #kind()} return {@link Kind#DATA}. */
314 LogMessage content() {
315 checkState(kind == Kind.DATA, "Only data lines have content: %s", this);
316 return logMessage;
317 }
318
319 Kind kind() {
320 return kind;
321 }
322
323 @Override public String toString() {
324 ToStringHelper helper = MoreObjects.toStringHelper(StreamItem.class);
325 if (kind == Kind.DATA) {
326 helper.addValue(logMessage);
327 } else {
328 helper.addValue(kind);
329 }
330 return helper.toString();
331 }
332 }
333
334 /** Returns a callable that renames the the thread that the given callable runs in. */
335 private static <T> Callable<T> threadRenaming(final String name, final Callable<T> callable) {
336 checkNotNull(name);
337 checkNotNull(callable);
338 return new Callable<T>() {
339 @Override public T call() throws Exception {
340 Thread currentThread = Thread.currentThread();
341 String oldName = currentThread.getName();
342 currentThread.setName(name);
343 try {
344 return callable.call();
345 } finally {
346 currentThread.setName(oldName);
347 }
348 }
349 };
350 }
351
352 /**
353 * A background task that reads lines of text from a {@link Reader} and puts them onto a
354 * {@link BlockingQueue}.
355 */
356 private final class StreamReader implements Callable<Void> {
357 final Reader reader;
358 final String streamName;
359
360 StreamReader(String streamName, Reader reader) {
361 this.streamName = streamName;
362 this.reader = reader;
363 }
364
365 @Override public Void call() throws IOException, InterruptedException, ParseException {
366 LineReader lineReader = new LineReader(reader);
367 boolean threw = true;
368 try {
369 String line;
370 while ((line = lineReader.readLine()) != null) {
371 trialOutput.log(streamName, line);
372 LogMessage logMessage = logMessageParser.parse(line);
373 if (logMessage != null) {
374 outputQueue.put(new StreamItem(logMessage));
375 }
376 }
377 threw = false;
378 } catch (Exception e) {
379 notifyFailed(e);
380 } finally {
381 closeReadStream();
382 Closeables.close(reader, threw);
383 }
384 return null;
385 }
386 }
387
388 /**
389 * A background task that reads lines of text from a {@link OpenedSocket.Reader} and puts them
390 * onto a {@link BlockingQueue}.
391 */
392 private final class SocketStreamReader implements Callable<Void> {
393 final OpenedSocket.Reader reader;
394
395 SocketStreamReader(OpenedSocket.Reader reader) {
396 this.reader = reader;
397 }
398
399 @Override public Void call() throws IOException, InterruptedException, ParseException {
400 boolean threw = true;
401 try {
402 Object obj;
403 while ((obj = reader.read()) != null) {
404 if (obj instanceof String) {
405 log(obj.toString());
406 continue;
407 }
408 LogMessage message = (LogMessage) obj;
409 if (message instanceof StopMeasurementLogMessage) {
410 // TODO(lukes): how useful are these messages? They seem like leftover debugging info
411 for (Measurement measurement : ((StopMeasurementLogMessage) message).measurements()) {
412 log(String.format("I got a result! %s: %f%s%n",
413 measurement.description(),
414 measurement.value().magnitude() / measurement.weight(),
415 measurement.value().unit()));
416 }
417 }
418 outputQueue.put(new StreamItem(message));
419 }
420 threw = false;
421 } catch (Exception e) {
422 notifyFailed(e);
423 } finally {
424 closeReadStream();
425 Closeables.close(reader, threw);
426 }
427 return null;
428 }
429
430 private void log(String text) {
431 trialOutput.log("socket", text);
432 }
433 }
434}