blob: ed93dd5fdcb80615b0f855193d1cf55c5ad0ae56 [file] [log] [blame]
Paul Duffine2363012015-11-30 16:20:41 +00001/*
2 * Copyright (C) 2013 Google Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5 * in compliance with the License. You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software distributed under the License
10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11 * or implied. See the License for the specific language governing permissions and limitations under
12 * the License.
13 */
14
15package com.google.caliper.runner;
16
17import static com.google.common.base.Preconditions.checkState;
18
19import com.google.caliper.bridge.OpenedSocket;
20import com.google.caliper.bridge.StartupAnnounceMessage;
21import com.google.common.base.Supplier;
22import com.google.common.collect.Maps;
23import com.google.common.collect.Multimaps;
24import com.google.common.collect.SetMultimap;
25import com.google.common.collect.Sets;
26import com.google.common.util.concurrent.AbstractExecutionThreadService;
27import com.google.common.util.concurrent.ListenableFuture;
28import com.google.common.util.concurrent.Service;
29import com.google.common.util.concurrent.SettableFuture;
30
31import java.io.IOException;
32import java.net.ServerSocket;
33import java.net.Socket;
34import java.net.SocketException;
35import java.util.Collection;
36import java.util.Map;
37import java.util.Set;
38import java.util.UUID;
39import java.util.concurrent.locks.Lock;
40import java.util.concurrent.locks.ReentrantLock;
41
42import javax.annotation.concurrent.GuardedBy;
43import javax.inject.Inject;
44import javax.inject.Singleton;
45
46/**
47 * A {@link Service} that manages a {@link ServerSocket}.
48 *
49 * <p> This service provides two pieces of functionality:
50 * <ol>
51 * <li>It adapts {@link ServerSocket#accept()} to a {@link ListenableFuture} of an opened socket.
52 * <li>It demultiplexes incoming connections based on a {@link StartupAnnounceMessage} that is
53 * sent over the socket.
54 * </ol>
55 *
56 * <p>The {@linkplain State states} of this service are as follows:
57 * <ul>
58 * <li>{@linkplain State#NEW NEW} : Idle state, the {@link ServerSocket} is not open yet.
59 * <li>{@linkplain State#STARTING STARTING} : {@link ServerSocket} is opened
60 * <li>{@linkplain State#RUNNING RUNNING} : We are continuously accepting and parsing connections
61 * from the socket.
62 * <li>{@linkplain State#STOPPING STOPPING} : The server socket is closing and all pending
63 * connection requests are terminated, connection requests will fail immediately.
64 * <li>{@linkplain State#TERMINATED TERMINATED} : Idle state, the socket is closed.
65 * <li>{@linkplain State#FAILED FAILED} : The service will transition to failed if it encounters
66 * any errors while accepting connections or reading from connections.
67 * </ul>
68 *
69 * <p>Note to future self. There have been a few attempts to make it so that it is no longer
70 * necessary to dedicate a thread to this service (basically turn it into an AbstractIdleService).
71 * The general idea has been to make callers to getConnection invoke accept, here is why it didn't
72 * work.
73 * <ul>
74 * <li>If you make getConnection a blocking method that calls accept until it finds the
75 * connection with its id, then there is no way to deal with connections that never arrive.
76 * For example, if the worker crashes before connecting then the thread calling accept will
77 * block forever waiting for it. The only way to unblock a thread stuck on accept() is to
78 * close the socket (this holds for ServerSocketChannels and normal ServerSockets), but we
79 * cannot do that in this case because the socket is a shared resource.
80 * <li>If you make getConnection a non-blocking, polling based method then you expose yourself
81 * to potential deadlocks (due to missed signals) depending on what thread you poll from.
82 * If the polling thread is any of the threads that are involved with processing messages
83 * from the worker I believe there to be a deadlock risk. Basically, if the worker sends
84 * messages over its output streams and then calls Socket.connect, and no printing to stdout
85 * or stderr occurs while connecting. Then if the runner polls, but misses the connection
86 * and then tries to read again, it will deadlock.
87 * </ul>
88 */
89@Singleton
90final class ServerSocketService extends AbstractExecutionThreadService {
91 private enum Source { REQUEST, ACCEPT}
92
93 private final Lock lock = new ReentrantLock();
94
95 /**
96 * Contains futures that have either only been accepted or requested. Once both occur they are
97 * removed from this map.
98 */
99 @GuardedBy("lock")
100 private final Map<UUID, SettableFuture<OpenedSocket>> halfFinishedConnections = Maps.newHashMap();
101
102 /**
103 * Contains the history of connections so we can ensure that each id is only accepted once and
104 * requested once.
105 */
106 @GuardedBy("lock")
107 private final SetMultimap<Source, UUID> connectionState = Multimaps.newSetMultimap(
108 Maps.<Source, Collection<UUID>>newEnumMap(Source.class),
109 new Supplier<Set<UUID>>(){
110 @Override public Set<UUID> get() {
111 return Sets.newHashSet();
112 }
113 });
114
115 private ServerSocket serverSocket;
116
117 @Inject ServerSocketService() {}
118
119 int getPort() {
120 awaitRunning();
121 checkState(serverSocket != null, "Socket has not been opened yet");
122 return serverSocket.getLocalPort();
123 }
124
125 /**
126 * Returns a {@link ListenableFuture} for an open connection corresponding to the given id.
127 *
128 * <p>N.B. calling this method 'consumes' the connection and as such calling it twice with the
129 * same id will not work, the second future returned will never complete. Similarly calling it
130 * with an id that does not correspond to a worker trying to connect will also fail.
131 */
132 public ListenableFuture<OpenedSocket> getConnection(UUID id) {
133 checkState(isRunning(), "You can only get connections from a running service: %s", this);
134 return getConnectionImpl(id, Source.REQUEST);
135 }
136
137 @Override protected void startUp() throws Exception {
138 serverSocket = new ServerSocket(0 /* bind to any available port */);
139 }
140
141 @Override protected void run() throws Exception {
142 while (isRunning()) {
143 Socket socket;
144 try {
145 socket = serverSocket.accept();
146 } catch (SocketException e) {
147 // we were closed
148 return;
149 }
150 OpenedSocket openedSocket = OpenedSocket.fromSocket(socket);
151
152 UUID id = ((StartupAnnounceMessage) openedSocket.reader().read()).trialId();
153 // N.B. you should not call set with the lock held, to prevent same thread executors from
154 // running with the lock.
155 getConnectionImpl(id, Source.ACCEPT).set(openedSocket);
156 }
157 }
158
159 /**
160 * Returns a {@link SettableFuture} from the map of connections.
161 *
162 * <p>This method has the following properties:
163 * <ul>
164 * <li>If the id is present in {@link #connectionState}, this will throw an
165 * {@link IllegalStateException}.
166 * <li>The id and source are recorded in {@link #connectionState}
167 * <li>If the future is already in {@link #halfFinishedConnections}, it is removed and
168 * returned.
169 * <li>If the future is not in {@link #halfFinishedConnections}, a new {@link SettableFuture}
170 * is added and then returned.
171 *
172 * <p>These features together ensure that each connection can only be accepted once, only
173 * requested once and once both have happened it will be removed from
174 * {@link #halfFinishedConnections}.
175 */
176 private SettableFuture<OpenedSocket> getConnectionImpl(UUID id, Source source) {
177 lock.lock();
178 try {
179 checkState(connectionState.put(source, id), "Connection for %s has already been %s",
180 id, source);
181 SettableFuture<OpenedSocket> future = halfFinishedConnections.get(id);
182 if (future == null) {
183 future = SettableFuture.create();
184 halfFinishedConnections.put(id, future);
185 } else {
186 halfFinishedConnections.remove(id);
187 }
188 return future;
189 } finally {
190 lock.unlock();
191 }
192 }
193
194 @Override protected void triggerShutdown() {
195 try {
196 serverSocket.close();
197 } catch (IOException e) {
198 // best effort...
199 }
200 }
201
202 @Override protected void shutDown() throws Exception {
203 serverSocket.close();
204 // Now we have either been asked to stop or have failed with some kind of exception, we want to
205 // notify all pending requests, so if there are any references outside of this class they will
206 // notice.
207 lock.lock();
208 try {
209 for (SettableFuture<OpenedSocket> future : halfFinishedConnections.values()) {
210 future.setException(new Exception("The socket has been closed"));
211 }
212 halfFinishedConnections.clear();
213 connectionState.clear();
214 } finally {
215 lock.unlock();
216 }
217 }
218}