Paul Duffin | e236301 | 2015-11-30 16:20:41 +0000 | [diff] [blame^] | 1 | /* |
| 2 | * Copyright (C) 2013 Google Inc. |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except |
| 5 | * in compliance with the License. You may obtain a copy of the License at |
| 6 | * |
| 7 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | * |
| 9 | * Unless required by applicable law or agreed to in writing, software distributed under the License |
| 10 | * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| 11 | * or implied. See the License for the specific language governing permissions and limitations under |
| 12 | * the License. |
| 13 | */ |
| 14 | |
| 15 | package com.google.caliper.runner; |
| 16 | |
| 17 | import static com.google.common.base.Preconditions.checkNotNull; |
| 18 | import static com.google.common.base.Preconditions.checkState; |
| 19 | |
| 20 | import com.google.caliper.bridge.LogMessage; |
| 21 | import com.google.caliper.bridge.OpenedSocket; |
| 22 | import com.google.caliper.bridge.StopMeasurementLogMessage; |
| 23 | import com.google.caliper.model.Measurement; |
| 24 | import com.google.caliper.runner.StreamService.StreamItem.Kind; |
| 25 | import com.google.caliper.util.Parser; |
| 26 | import com.google.common.base.MoreObjects; |
| 27 | import com.google.common.base.MoreObjects.ToStringHelper; |
| 28 | import com.google.common.collect.Queues; |
| 29 | import com.google.common.io.Closeables; |
| 30 | import com.google.common.io.LineReader; |
| 31 | import com.google.common.util.concurrent.AbstractService; |
| 32 | import com.google.common.util.concurrent.ListenableFuture; |
| 33 | import com.google.common.util.concurrent.ListeningExecutorService; |
| 34 | import com.google.common.util.concurrent.MoreExecutors; |
| 35 | import com.google.common.util.concurrent.Service; // for javadoc |
| 36 | import com.google.common.util.concurrent.Service.State; // for javadoc |
| 37 | import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| 38 | import com.google.common.util.concurrent.Uninterruptibles; |
| 39 | |
| 40 | import java.io.IOException; |
| 41 | import java.io.InputStreamReader; |
| 42 | import java.io.Reader; |
| 43 | import java.io.Serializable; |
| 44 | import java.nio.charset.Charset; |
| 45 | import java.text.ParseException; |
| 46 | import java.util.concurrent.BlockingQueue; |
| 47 | import java.util.concurrent.Callable; |
| 48 | import java.util.concurrent.ExecutionException; |
| 49 | import java.util.concurrent.Executors; |
| 50 | import java.util.concurrent.TimeUnit; |
| 51 | import java.util.concurrent.atomic.AtomicInteger; |
| 52 | import java.util.logging.Logger; |
| 53 | |
| 54 | import javax.annotation.Nullable; |
| 55 | import javax.inject.Inject; |
| 56 | |
| 57 | /** |
| 58 | * A {@link Service} that establishes a connection over a socket to a process and then allows |
| 59 | * multiplexed access to the processes' line oriented output over the socket and the standard |
| 60 | * process streams (stdout and stderr) as well as allowing data to be written over the socket. |
| 61 | * |
| 62 | * <p>The {@linkplain State states} of this service are as follows: |
| 63 | * <ul> |
| 64 | * <li>{@linkplain State#NEW NEW} : Idle state, no reading or writing is allowed. |
| 65 | * <li>{@linkplain State#STARTING STARTING} : Streams are being opened |
| 66 | * <li>{@linkplain State#RUNNING RUNNING} : At least one stream is still open or the writer has |
| 67 | * not been closed yet. |
| 68 | * <li>{@linkplain State#STOPPING STOPPING} : All streams have closed but some threads may still |
| 69 | * be running. |
| 70 | * <li>{@linkplain State#TERMINATED TERMINATED} : Idle state, all streams are closed |
| 71 | * <li>{@linkplain State#FAILED FAILED} : The service will transition to failed if it encounters |
| 72 | * any errors while reading from or writing to the streams, service failure will also cause |
| 73 | * the worker process to be forcibly shutdown and {@link #readItem(long, TimeUnit)}, |
| 74 | * {@link #closeWriter()} and {@link #sendMessage(Serializable)} will start throwing |
| 75 | * IllegalStateExceptions. |
| 76 | * </ul> |
| 77 | */ |
| 78 | @TrialScoped final class StreamService extends AbstractService { |
| 79 | /** How long to wait for a process that should be exiting to actually exit. */ |
| 80 | private static final int SHUTDOWN_WAIT_MILLIS = 10; |
| 81 | |
| 82 | private static final Logger logger = Logger.getLogger(StreamService.class.getName()); |
| 83 | private static final StreamItem TIMEOUT_ITEM = new StreamItem(Kind.TIMEOUT, null); |
| 84 | |
| 85 | /** The final item that will be sent down the stream. */ |
| 86 | static final StreamItem EOF_ITEM = new StreamItem(Kind.EOF, null); |
| 87 | |
| 88 | private final ListeningExecutorService streamExecutor = MoreExecutors.listeningDecorator( |
| 89 | Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build())); |
| 90 | private final BlockingQueue<StreamItem> outputQueue = Queues.newLinkedBlockingQueue(); |
| 91 | private final WorkerProcess worker; |
| 92 | private volatile Process process; |
| 93 | private final Parser<LogMessage> logMessageParser; |
| 94 | private final TrialOutputLogger trialOutput; |
| 95 | |
| 96 | /** |
| 97 | * This represents the number of open streams from the users perspective. i.e. can you still |
| 98 | * write to the socket and read items. |
| 99 | * |
| 100 | * <p>This is decremented when either the socket is closed for writing or the EOF_ITEM has been |
| 101 | * read by the user. |
| 102 | */ |
| 103 | private final AtomicInteger openStreams = new AtomicInteger(); |
| 104 | |
| 105 | /** |
| 106 | * Used to track how many read streams are open so we can correctly set the EOF_ITEM onto the |
| 107 | * queue. |
| 108 | */ |
| 109 | private final AtomicInteger runningReadStreams = new AtomicInteger(); |
| 110 | private OpenedSocket.Writer socketWriter; |
| 111 | |
| 112 | @Inject StreamService(WorkerProcess worker, |
| 113 | Parser<LogMessage> logMessageParser, |
| 114 | TrialOutputLogger trialOutput) { |
| 115 | this.worker = worker; |
| 116 | this.logMessageParser = logMessageParser; |
| 117 | this.trialOutput = trialOutput; |
| 118 | } |
| 119 | |
| 120 | @Override protected void doStart() { |
| 121 | try { |
| 122 | // TODO(lukes): write the commandline to the trial output file? |
| 123 | process = worker.startWorker(); |
| 124 | } catch (IOException e) { |
| 125 | notifyFailed(e); |
| 126 | return; |
| 127 | } |
| 128 | // Failsafe kill the process and the executor service. |
| 129 | // If the process has already exited cleanly, this will be a no-op. |
| 130 | addListener(new Listener() { |
| 131 | @Override public void starting() {} |
| 132 | @Override public void running() {} |
| 133 | @Override public void stopping(State from) {} |
| 134 | @Override public void terminated(State from) { |
| 135 | cleanup(); |
| 136 | } |
| 137 | @Override public void failed(State from, Throwable failure) { |
| 138 | cleanup(); |
| 139 | } |
| 140 | |
| 141 | void cleanup() { |
| 142 | streamExecutor.shutdown(); |
| 143 | process.destroy(); |
| 144 | try { |
| 145 | streamExecutor.awaitTermination(10, TimeUnit.MILLISECONDS); |
| 146 | } catch (InterruptedException e) { |
| 147 | Thread.currentThread().interrupt(); |
| 148 | } |
| 149 | streamExecutor.shutdownNow(); |
| 150 | } |
| 151 | }, MoreExecutors.directExecutor()); |
| 152 | // You may be thinking as you read this "Yo dawg, what if IOExceptions rain from the sky?" |
| 153 | // If a stream we are reading from throws an IOException then we fail the entire Service. This |
| 154 | // will cause the worker to be killed (if its not dead already) and the various StreamReaders to |
| 155 | // be interrupted (eventually). |
| 156 | |
| 157 | // use the default charset because worker streams will use the default for output |
| 158 | Charset processCharset = Charset.defaultCharset(); |
| 159 | runningReadStreams.addAndGet(2); |
| 160 | openStreams.addAndGet(1); |
| 161 | streamExecutor.submit( |
| 162 | threadRenaming("worker-stderr", |
| 163 | new StreamReader("stderr", |
| 164 | new InputStreamReader(process.getErrorStream(), processCharset)))); |
| 165 | streamExecutor.submit( |
| 166 | threadRenaming("worker-stdout", |
| 167 | new StreamReader("stdout", |
| 168 | new InputStreamReader(process.getInputStream(), processCharset)))); |
| 169 | worker.socketFuture().addListener( |
| 170 | new Runnable() { |
| 171 | @Override public void run() { |
| 172 | try { |
| 173 | OpenedSocket openedSocket = |
| 174 | Uninterruptibles.getUninterruptibly(worker.socketFuture()); |
| 175 | logger.fine("successfully opened the pipe from the worker"); |
| 176 | socketWriter = openedSocket.writer(); |
| 177 | runningReadStreams.addAndGet(1); |
| 178 | openStreams.addAndGet(1); |
| 179 | streamExecutor.submit(threadRenaming("worker-socket", |
| 180 | new SocketStreamReader(openedSocket.reader()))); |
| 181 | } catch (ExecutionException e) { |
| 182 | notifyFailed(e.getCause()); |
| 183 | } |
| 184 | } |
| 185 | }, |
| 186 | MoreExecutors.directExecutor()); |
| 187 | notifyStarted(); |
| 188 | } |
| 189 | |
| 190 | /** |
| 191 | * Reads a {@link StreamItem} from one of the streams waiting for one to become available if |
| 192 | * necessary. |
| 193 | */ |
| 194 | StreamItem readItem(long timeout, TimeUnit unit) throws InterruptedException { |
| 195 | checkState(isRunning(), "Cannot read items from a %s StreamService", state()); |
| 196 | StreamItem line = outputQueue.poll(timeout, unit); |
| 197 | if (line == EOF_ITEM) { |
| 198 | closeStream(); |
| 199 | } |
| 200 | return (line == null) ? TIMEOUT_ITEM : line; |
| 201 | } |
| 202 | |
| 203 | /** |
| 204 | * Write a line of data to the worker process over the socket. |
| 205 | * |
| 206 | * <p>N.B. Writing data via {@link #sendMessage(Serializable)} is only valid once the underlying |
| 207 | * socket has been opened. This should be fine assuming that socket writes are only in response |
| 208 | * to socket reads (which is currently the case), so there is no way that a write could happen |
| 209 | * prior to the socket being opened. |
| 210 | */ |
| 211 | void sendMessage(Serializable message) throws IOException { |
| 212 | checkState(isRunning(), "Cannot read items from a %s StreamService", state()); |
| 213 | checkState(socketWriter != null, "Attempted to write to the socket before it was opened."); |
| 214 | try { |
| 215 | socketWriter.write(message); |
| 216 | // We need to flush since this is a back and forth lockstep protocol, buffering can cause |
| 217 | // deadlock! |
| 218 | socketWriter.flush(); |
| 219 | } catch (IOException e) { |
| 220 | Closeables.close(socketWriter, true); |
| 221 | notifyFailed(e); |
| 222 | throw e; |
| 223 | } |
| 224 | } |
| 225 | |
| 226 | /** Closes the socket writer. */ |
| 227 | void closeWriter() throws IOException { |
| 228 | checkState(isRunning(), "Cannot read items from a %s StreamService", state()); |
| 229 | checkState(socketWriter != null, "Attempted to close the socket before it was opened."); |
| 230 | try { |
| 231 | socketWriter.close(); |
| 232 | } catch (IOException e) { |
| 233 | notifyFailed(e); |
| 234 | throw e; |
| 235 | } |
| 236 | closeStream(); |
| 237 | } |
| 238 | |
| 239 | @Override protected void doStop() { |
| 240 | if (openStreams.get() > 0) { |
| 241 | // This means stop was called on us externally and we are still reading/writing, just log a |
| 242 | // warning and do nothing |
| 243 | logger.warning("Attempting to stop the stream service with streams still open"); |
| 244 | } |
| 245 | final ListenableFuture<Integer> processFuture = streamExecutor.submit(new Callable<Integer>() { |
| 246 | @Override public Integer call() throws Exception { |
| 247 | return process.waitFor(); |
| 248 | } |
| 249 | }); |
| 250 | // Experimentally, even with well behaved processes there is some time between when all streams |
| 251 | // are closed as part of process shutdown and when the process has exited. So to not fail |
| 252 | // flakily when shutting down normally we need to do a timed wait |
| 253 | streamExecutor.submit(new Callable<Void>() { |
| 254 | @Override public Void call() throws Exception { |
| 255 | boolean threw = true; |
| 256 | try { |
| 257 | if (processFuture.get(SHUTDOWN_WAIT_MILLIS, TimeUnit.MILLISECONDS) == 0) { |
| 258 | notifyStopped(); |
| 259 | } else { |
| 260 | notifyFailed( |
| 261 | new Exception("Process failed to stop cleanly. Exit code: " + process.waitFor())); |
| 262 | } |
| 263 | threw = false; |
| 264 | } finally { |
| 265 | processFuture.cancel(true); // we don't need it anymore |
| 266 | if (threw) { |
| 267 | process.destroy(); |
| 268 | notifyFailed( |
| 269 | new Exception("Process failed to stop cleanly and was forcibly killed. Exit code: " |
| 270 | + process.waitFor())); |
| 271 | } |
| 272 | } |
| 273 | return null; |
| 274 | } |
| 275 | }); |
| 276 | } |
| 277 | |
| 278 | private void closeStream() { |
| 279 | if (openStreams.decrementAndGet() == 0) { |
| 280 | stopAsync(); |
| 281 | } |
| 282 | } |
| 283 | |
| 284 | private void closeReadStream() { |
| 285 | if (runningReadStreams.decrementAndGet() == 0) { |
| 286 | outputQueue.add(EOF_ITEM); |
| 287 | } |
| 288 | } |
| 289 | |
| 290 | /** An item read from one of the streams. */ |
| 291 | static class StreamItem { |
| 292 | enum Kind { |
| 293 | /** This indicates that it is the last item. */ |
| 294 | EOF, |
| 295 | /** This indicates that reading the item timed out. */ |
| 296 | TIMEOUT, |
| 297 | /** This indicates that this item has content. */ |
| 298 | DATA; |
| 299 | } |
| 300 | |
| 301 | @Nullable private final LogMessage logMessage; |
| 302 | private final Kind kind; |
| 303 | |
| 304 | private StreamItem(LogMessage line) { |
| 305 | this(Kind.DATA, checkNotNull(line)); |
| 306 | } |
| 307 | |
| 308 | private StreamItem(Kind state, @Nullable LogMessage logMessage) { |
| 309 | this.logMessage = logMessage; |
| 310 | this.kind = state; |
| 311 | } |
| 312 | |
| 313 | /** Returns the content. This is only valid if {@link #kind()} return {@link Kind#DATA}. */ |
| 314 | LogMessage content() { |
| 315 | checkState(kind == Kind.DATA, "Only data lines have content: %s", this); |
| 316 | return logMessage; |
| 317 | } |
| 318 | |
| 319 | Kind kind() { |
| 320 | return kind; |
| 321 | } |
| 322 | |
| 323 | @Override public String toString() { |
| 324 | ToStringHelper helper = MoreObjects.toStringHelper(StreamItem.class); |
| 325 | if (kind == Kind.DATA) { |
| 326 | helper.addValue(logMessage); |
| 327 | } else { |
| 328 | helper.addValue(kind); |
| 329 | } |
| 330 | return helper.toString(); |
| 331 | } |
| 332 | } |
| 333 | |
| 334 | /** Returns a callable that renames the the thread that the given callable runs in. */ |
| 335 | private static <T> Callable<T> threadRenaming(final String name, final Callable<T> callable) { |
| 336 | checkNotNull(name); |
| 337 | checkNotNull(callable); |
| 338 | return new Callable<T>() { |
| 339 | @Override public T call() throws Exception { |
| 340 | Thread currentThread = Thread.currentThread(); |
| 341 | String oldName = currentThread.getName(); |
| 342 | currentThread.setName(name); |
| 343 | try { |
| 344 | return callable.call(); |
| 345 | } finally { |
| 346 | currentThread.setName(oldName); |
| 347 | } |
| 348 | } |
| 349 | }; |
| 350 | } |
| 351 | |
| 352 | /** |
| 353 | * A background task that reads lines of text from a {@link Reader} and puts them onto a |
| 354 | * {@link BlockingQueue}. |
| 355 | */ |
| 356 | private final class StreamReader implements Callable<Void> { |
| 357 | final Reader reader; |
| 358 | final String streamName; |
| 359 | |
| 360 | StreamReader(String streamName, Reader reader) { |
| 361 | this.streamName = streamName; |
| 362 | this.reader = reader; |
| 363 | } |
| 364 | |
| 365 | @Override public Void call() throws IOException, InterruptedException, ParseException { |
| 366 | LineReader lineReader = new LineReader(reader); |
| 367 | boolean threw = true; |
| 368 | try { |
| 369 | String line; |
| 370 | while ((line = lineReader.readLine()) != null) { |
| 371 | trialOutput.log(streamName, line); |
| 372 | LogMessage logMessage = logMessageParser.parse(line); |
| 373 | if (logMessage != null) { |
| 374 | outputQueue.put(new StreamItem(logMessage)); |
| 375 | } |
| 376 | } |
| 377 | threw = false; |
| 378 | } catch (Exception e) { |
| 379 | notifyFailed(e); |
| 380 | } finally { |
| 381 | closeReadStream(); |
| 382 | Closeables.close(reader, threw); |
| 383 | } |
| 384 | return null; |
| 385 | } |
| 386 | } |
| 387 | |
| 388 | /** |
| 389 | * A background task that reads lines of text from a {@link OpenedSocket.Reader} and puts them |
| 390 | * onto a {@link BlockingQueue}. |
| 391 | */ |
| 392 | private final class SocketStreamReader implements Callable<Void> { |
| 393 | final OpenedSocket.Reader reader; |
| 394 | |
| 395 | SocketStreamReader(OpenedSocket.Reader reader) { |
| 396 | this.reader = reader; |
| 397 | } |
| 398 | |
| 399 | @Override public Void call() throws IOException, InterruptedException, ParseException { |
| 400 | boolean threw = true; |
| 401 | try { |
| 402 | Object obj; |
| 403 | while ((obj = reader.read()) != null) { |
| 404 | if (obj instanceof String) { |
| 405 | log(obj.toString()); |
| 406 | continue; |
| 407 | } |
| 408 | LogMessage message = (LogMessage) obj; |
| 409 | if (message instanceof StopMeasurementLogMessage) { |
| 410 | // TODO(lukes): how useful are these messages? They seem like leftover debugging info |
| 411 | for (Measurement measurement : ((StopMeasurementLogMessage) message).measurements()) { |
| 412 | log(String.format("I got a result! %s: %f%s%n", |
| 413 | measurement.description(), |
| 414 | measurement.value().magnitude() / measurement.weight(), |
| 415 | measurement.value().unit())); |
| 416 | } |
| 417 | } |
| 418 | outputQueue.put(new StreamItem(message)); |
| 419 | } |
| 420 | threw = false; |
| 421 | } catch (Exception e) { |
| 422 | notifyFailed(e); |
| 423 | } finally { |
| 424 | closeReadStream(); |
| 425 | Closeables.close(reader, threw); |
| 426 | } |
| 427 | return null; |
| 428 | } |
| 429 | |
| 430 | private void log(String text) { |
| 431 | trialOutput.log("socket", text); |
| 432 | } |
| 433 | } |
| 434 | } |