blob: a5852d0e6bf0e523bbdc27375e8fa0be15be52e3 [file] [log] [blame]
/*
* Copyright (C) 2013 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.caliper.runner;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.caliper.bridge.LogMessage;
import com.google.caliper.bridge.OpenedSocket;
import com.google.caliper.bridge.StopMeasurementLogMessage;
import com.google.caliper.model.Measurement;
import com.google.caliper.runner.StreamService.StreamItem.Kind;
import com.google.caliper.util.Parser;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.collect.Queues;
import com.google.common.io.Closeables;
import com.google.common.io.LineReader;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service; // for javadoc
import com.google.common.util.concurrent.Service.State; // for javadoc
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.text.ParseException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.inject.Inject;
/**
* A {@link Service} that establishes a connection over a socket to a process and then allows
* multiplexed access to the processes' line oriented output over the socket and the standard
* process streams (stdout and stderr) as well as allowing data to be written over the socket.
*
* <p>The {@linkplain State states} of this service are as follows:
* <ul>
* <li>{@linkplain State#NEW NEW} : Idle state, no reading or writing is allowed.
* <li>{@linkplain State#STARTING STARTING} : Streams are being opened
* <li>{@linkplain State#RUNNING RUNNING} : At least one stream is still open or the writer has
* not been closed yet.
* <li>{@linkplain State#STOPPING STOPPING} : All streams have closed but some threads may still
* be running.
* <li>{@linkplain State#TERMINATED TERMINATED} : Idle state, all streams are closed
* <li>{@linkplain State#FAILED FAILED} : The service will transition to failed if it encounters
* any errors while reading from or writing to the streams, service failure will also cause
* the worker process to be forcibly shutdown and {@link #readItem(long, TimeUnit)},
* {@link #closeWriter()} and {@link #sendMessage(Serializable)} will start throwing
* IllegalStateExceptions.
* </ul>
*/
@TrialScoped final class StreamService extends AbstractService {
/** How long to wait for a process that should be exiting to actually exit. */
private static final int SHUTDOWN_WAIT_MILLIS = 10;
private static final Logger logger = Logger.getLogger(StreamService.class.getName());
private static final StreamItem TIMEOUT_ITEM = new StreamItem(Kind.TIMEOUT, null);
/** The final item that will be sent down the stream. */
static final StreamItem EOF_ITEM = new StreamItem(Kind.EOF, null);
private final ListeningExecutorService streamExecutor = MoreExecutors.listeningDecorator(
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build()));
private final BlockingQueue<StreamItem> outputQueue = Queues.newLinkedBlockingQueue();
private final WorkerProcess worker;
private volatile Process process;
private final Parser<LogMessage> logMessageParser;
private final TrialOutputLogger trialOutput;
/**
* This represents the number of open streams from the users perspective. i.e. can you still
* write to the socket and read items.
*
* <p>This is decremented when either the socket is closed for writing or the EOF_ITEM has been
* read by the user.
*/
private final AtomicInteger openStreams = new AtomicInteger();
/**
* Used to track how many read streams are open so we can correctly set the EOF_ITEM onto the
* queue.
*/
private final AtomicInteger runningReadStreams = new AtomicInteger();
private OpenedSocket.Writer socketWriter;
@Inject StreamService(WorkerProcess worker,
Parser<LogMessage> logMessageParser,
TrialOutputLogger trialOutput) {
this.worker = worker;
this.logMessageParser = logMessageParser;
this.trialOutput = trialOutput;
}
@Override protected void doStart() {
try {
// TODO(lukes): write the commandline to the trial output file?
process = worker.startWorker();
} catch (IOException e) {
notifyFailed(e);
return;
}
// Failsafe kill the process and the executor service.
// If the process has already exited cleanly, this will be a no-op.
addListener(new Listener() {
@Override public void starting() {}
@Override public void running() {}
@Override public void stopping(State from) {}
@Override public void terminated(State from) {
cleanup();
}
@Override public void failed(State from, Throwable failure) {
cleanup();
}
void cleanup() {
streamExecutor.shutdown();
process.destroy();
try {
streamExecutor.awaitTermination(10, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
streamExecutor.shutdownNow();
}
}, MoreExecutors.directExecutor());
// You may be thinking as you read this "Yo dawg, what if IOExceptions rain from the sky?"
// If a stream we are reading from throws an IOException then we fail the entire Service. This
// will cause the worker to be killed (if its not dead already) and the various StreamReaders to
// be interrupted (eventually).
// use the default charset because worker streams will use the default for output
Charset processCharset = Charset.defaultCharset();
runningReadStreams.addAndGet(2);
openStreams.addAndGet(1);
streamExecutor.submit(
threadRenaming("worker-stderr",
new StreamReader("stderr",
new InputStreamReader(process.getErrorStream(), processCharset))));
streamExecutor.submit(
threadRenaming("worker-stdout",
new StreamReader("stdout",
new InputStreamReader(process.getInputStream(), processCharset))));
worker.socketFuture().addListener(
new Runnable() {
@Override public void run() {
try {
OpenedSocket openedSocket =
Uninterruptibles.getUninterruptibly(worker.socketFuture());
logger.fine("successfully opened the pipe from the worker");
socketWriter = openedSocket.writer();
runningReadStreams.addAndGet(1);
openStreams.addAndGet(1);
streamExecutor.submit(threadRenaming("worker-socket",
new SocketStreamReader(openedSocket.reader())));
} catch (ExecutionException e) {
notifyFailed(e.getCause());
}
}
},
MoreExecutors.directExecutor());
notifyStarted();
}
/**
* Reads a {@link StreamItem} from one of the streams waiting for one to become available if
* necessary.
*/
StreamItem readItem(long timeout, TimeUnit unit) throws InterruptedException {
checkState(isRunning(), "Cannot read items from a %s StreamService", state());
StreamItem line = outputQueue.poll(timeout, unit);
if (line == EOF_ITEM) {
closeStream();
}
return (line == null) ? TIMEOUT_ITEM : line;
}
/**
* Write a line of data to the worker process over the socket.
*
* <p>N.B. Writing data via {@link #sendMessage(Serializable)} is only valid once the underlying
* socket has been opened. This should be fine assuming that socket writes are only in response
* to socket reads (which is currently the case), so there is no way that a write could happen
* prior to the socket being opened.
*/
void sendMessage(Serializable message) throws IOException {
checkState(isRunning(), "Cannot read items from a %s StreamService", state());
checkState(socketWriter != null, "Attempted to write to the socket before it was opened.");
try {
socketWriter.write(message);
// We need to flush since this is a back and forth lockstep protocol, buffering can cause
// deadlock!
socketWriter.flush();
} catch (IOException e) {
Closeables.close(socketWriter, true);
notifyFailed(e);
throw e;
}
}
/** Closes the socket writer. */
void closeWriter() throws IOException {
checkState(isRunning(), "Cannot read items from a %s StreamService", state());
checkState(socketWriter != null, "Attempted to close the socket before it was opened.");
try {
socketWriter.close();
} catch (IOException e) {
notifyFailed(e);
throw e;
}
closeStream();
}
@Override protected void doStop() {
if (openStreams.get() > 0) {
// This means stop was called on us externally and we are still reading/writing, just log a
// warning and do nothing
logger.warning("Attempting to stop the stream service with streams still open");
}
final ListenableFuture<Integer> processFuture = streamExecutor.submit(new Callable<Integer>() {
@Override public Integer call() throws Exception {
return process.waitFor();
}
});
// Experimentally, even with well behaved processes there is some time between when all streams
// are closed as part of process shutdown and when the process has exited. So to not fail
// flakily when shutting down normally we need to do a timed wait
streamExecutor.submit(new Callable<Void>() {
@Override public Void call() throws Exception {
boolean threw = true;
try {
if (processFuture.get(SHUTDOWN_WAIT_MILLIS, TimeUnit.MILLISECONDS) == 0) {
notifyStopped();
} else {
notifyFailed(
new Exception("Process failed to stop cleanly. Exit code: " + process.waitFor()));
}
threw = false;
} finally {
processFuture.cancel(true); // we don't need it anymore
if (threw) {
process.destroy();
notifyFailed(
new Exception("Process failed to stop cleanly and was forcibly killed. Exit code: "
+ process.waitFor()));
}
}
return null;
}
});
}
private void closeStream() {
if (openStreams.decrementAndGet() == 0) {
stopAsync();
}
}
private void closeReadStream() {
if (runningReadStreams.decrementAndGet() == 0) {
outputQueue.add(EOF_ITEM);
}
}
/** An item read from one of the streams. */
static class StreamItem {
enum Kind {
/** This indicates that it is the last item. */
EOF,
/** This indicates that reading the item timed out. */
TIMEOUT,
/** This indicates that this item has content. */
DATA;
}
@Nullable private final LogMessage logMessage;
private final Kind kind;
private StreamItem(LogMessage line) {
this(Kind.DATA, checkNotNull(line));
}
private StreamItem(Kind state, @Nullable LogMessage logMessage) {
this.logMessage = logMessage;
this.kind = state;
}
/** Returns the content. This is only valid if {@link #kind()} return {@link Kind#DATA}. */
LogMessage content() {
checkState(kind == Kind.DATA, "Only data lines have content: %s", this);
return logMessage;
}
Kind kind() {
return kind;
}
@Override public String toString() {
ToStringHelper helper = MoreObjects.toStringHelper(StreamItem.class);
if (kind == Kind.DATA) {
helper.addValue(logMessage);
} else {
helper.addValue(kind);
}
return helper.toString();
}
}
/** Returns a callable that renames the the thread that the given callable runs in. */
private static <T> Callable<T> threadRenaming(final String name, final Callable<T> callable) {
checkNotNull(name);
checkNotNull(callable);
return new Callable<T>() {
@Override public T call() throws Exception {
Thread currentThread = Thread.currentThread();
String oldName = currentThread.getName();
currentThread.setName(name);
try {
return callable.call();
} finally {
currentThread.setName(oldName);
}
}
};
}
/**
* A background task that reads lines of text from a {@link Reader} and puts them onto a
* {@link BlockingQueue}.
*/
private final class StreamReader implements Callable<Void> {
final Reader reader;
final String streamName;
StreamReader(String streamName, Reader reader) {
this.streamName = streamName;
this.reader = reader;
}
@Override public Void call() throws IOException, InterruptedException, ParseException {
LineReader lineReader = new LineReader(reader);
boolean threw = true;
try {
String line;
while ((line = lineReader.readLine()) != null) {
trialOutput.log(streamName, line);
LogMessage logMessage = logMessageParser.parse(line);
if (logMessage != null) {
outputQueue.put(new StreamItem(logMessage));
}
}
threw = false;
} catch (Exception e) {
notifyFailed(e);
} finally {
closeReadStream();
Closeables.close(reader, threw);
}
return null;
}
}
/**
* A background task that reads lines of text from a {@link OpenedSocket.Reader} and puts them
* onto a {@link BlockingQueue}.
*/
private final class SocketStreamReader implements Callable<Void> {
final OpenedSocket.Reader reader;
SocketStreamReader(OpenedSocket.Reader reader) {
this.reader = reader;
}
@Override public Void call() throws IOException, InterruptedException, ParseException {
boolean threw = true;
try {
Object obj;
while ((obj = reader.read()) != null) {
if (obj instanceof String) {
log(obj.toString());
continue;
}
LogMessage message = (LogMessage) obj;
if (message instanceof StopMeasurementLogMessage) {
// TODO(lukes): how useful are these messages? They seem like leftover debugging info
for (Measurement measurement : ((StopMeasurementLogMessage) message).measurements()) {
log(String.format("I got a result! %s: %f%s%n",
measurement.description(),
measurement.value().magnitude() / measurement.weight(),
measurement.value().unit()));
}
}
outputQueue.put(new StreamItem(message));
}
threw = false;
} catch (Exception e) {
notifyFailed(e);
} finally {
closeReadStream();
Closeables.close(reader, threw);
}
return null;
}
private void log(String text) {
trialOutput.log("socket", text);
}
}
}