Merge d87dbf9bf0c351f2549a22400185b2a11055bff8 on remote branch
Change-Id: Ie77caeeffa5474b57c8cd20eb6787cb0fd09b91d
diff --git a/java/com/android/modules/utils/SynchronousResultReceiver.java b/java/com/android/modules/utils/SynchronousResultReceiver.java
index ad270d2..c12d739 100644
--- a/java/com/android/modules/utils/SynchronousResultReceiver.java
+++ b/java/com/android/modules/utils/SynchronousResultReceiver.java
@@ -25,11 +25,14 @@
import android.os.SystemClock;
import android.util.Log;
+import com.android.internal.annotations.GuardedBy;
+
import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -39,19 +42,59 @@
* Allow the server end to synchronously wait on the response from the client.
* This enables an RPC like system but with the ability to timeout and discard late results.
*
- * <p>NOTE: Can only be used for one response.
- * Subsequent responses on the same instance will throw a {@link IllegalStateException}
+ * <p>NOTE: Use the static {@link #get} method to retrieve an available instance of this class.
+ * If no instances are available, a new one is created.
*/
public final class SynchronousResultReceiver<T> implements Parcelable {
private static final String TAG = "SynchronousResultReceiver";
private final boolean mLocal;
private boolean mIsCompleted;
+ private final static Object sLock = new Object();
+ private final static int QUEUE_THRESHOLD = 4;
- public SynchronousResultReceiver() {
+ @GuardedBy("sLock")
+ private CompletableFuture<Result<T>> mFuture = new CompletableFuture<>();
+
+ @GuardedBy("sLock")
+ private static final ConcurrentLinkedQueue<SynchronousResultReceiver> sAvailableReceivers
+ = new ConcurrentLinkedQueue<>();
+
+ public static <T> SynchronousResultReceiver<T> get() {
+ synchronized(sLock) {
+ if (sAvailableReceivers.isEmpty()) {
+ return new SynchronousResultReceiver();
+ }
+ SynchronousResultReceiver receiver = sAvailableReceivers.poll();
+ receiver.resetLocked();
+ return receiver;
+ }
+ }
+
+ private SynchronousResultReceiver() {
mLocal = true;
mIsCompleted = false;
}
+ @GuardedBy("sLock")
+ private void releaseLocked() {
+ mFuture = null;
+ if (sAvailableReceivers.size() < QUEUE_THRESHOLD) {
+ sAvailableReceivers.add(this);
+ }
+ }
+
+ @GuardedBy("sLock")
+ private void resetLocked() {
+ mFuture = new CompletableFuture<>();
+ mIsCompleted = false;
+ }
+
+ private CompletableFuture<Result<T>> getFuture() {
+ synchronized (sLock) {
+ return mFuture;
+ }
+ }
+
public static class Result<T> implements Parcelable {
private final @Nullable T mObject;
private final RuntimeException mException;
@@ -105,20 +148,24 @@
};
}
- private final CompletableFuture<Result<T>> mFuture = new CompletableFuture<>();
-
private void complete(Result<T> result) {
if (mIsCompleted) {
throw new IllegalStateException("Receiver has already been completed");
}
mIsCompleted = true;
if (mLocal) {
- mFuture.complete(result);
- } else if (mReceiver != null) {
- try {
- mReceiver.send(result);
- } catch (RemoteException e) {
- Log.w(TAG, "Failed to complete future");
+ getFuture().complete(result);
+ } else {
+ final ISynchronousResultReceiver rr;
+ synchronized (this) {
+ rr = mReceiver;
+ }
+ if (rr != null) {
+ try {
+ rr.send(result);
+ } catch (RemoteException e) {
+ Log.w(TAG, "Failed to complete future");
+ }
}
}
}
@@ -160,7 +207,11 @@
Duration remainingTime = timeout;
while (!remainingTime.isNegative()) {
try {
- return mFuture.get(remainingTime.toMillis(), TimeUnit.MILLISECONDS);
+ Result<T> result = getFuture().get(remainingTime.toMillis(), TimeUnit.MILLISECONDS);
+ synchronized (sLock) {
+ releaseLocked();
+ return result;
+ }
} catch (ExecutionException e) {
// This will NEVER happen.
throw new AssertionError("Error receiving response", e);
@@ -171,6 +222,9 @@
Duration.ofNanos(SystemClock.elapsedRealtimeNanos() - startWaitNanoTime));
}
}
+ synchronized (sLock) {
+ releaseLocked();
+ }
throw new TimeoutException();
}
@@ -179,7 +233,11 @@
private final class MyResultReceiver extends ISynchronousResultReceiver.Stub {
public void send(@SuppressWarnings("rawtypes") @NonNull Result result) {
@SuppressWarnings("unchecked") Result<T> res = (Result<T>) result;
- mFuture.complete(res);
+ CompletableFuture<Result<T>> future;
+ future = getFuture();
+ if (future != null) {
+ future.complete(res);
+ }
}
}
diff --git a/javatests/com/android/modules/utils/SynchronousResultReceiverTest.java b/javatests/com/android/modules/utils/SynchronousResultReceiverTest.java
index 43345a9..82aa97d 100644
--- a/javatests/com/android/modules/utils/SynchronousResultReceiverTest.java
+++ b/javatests/com/android/modules/utils/SynchronousResultReceiverTest.java
@@ -35,7 +35,7 @@
@Test
public void testSimpleData() throws Exception {
- final SynchronousResultReceiver<Boolean> recv = new SynchronousResultReceiver();
+ final SynchronousResultReceiver<Boolean> recv = SynchronousResultReceiver.get();
recv.send(true);
final boolean result = recv.awaitResultNoInterrupt(OK_TIME).getValue(false);
assertTrue(result);
@@ -43,7 +43,7 @@
@Test
public void testDoubleComplete() throws Exception {
- final SynchronousResultReceiver<Boolean> recv = new SynchronousResultReceiver();
+ final SynchronousResultReceiver<Boolean> recv = SynchronousResultReceiver.get();
recv.send(true);
Assert.assertThrows(IllegalStateException.class,
() -> recv.send(true));
@@ -51,14 +51,14 @@
@Test
public void testDefaultValue() throws Exception {
- final SynchronousResultReceiver<Boolean> recv = new SynchronousResultReceiver();
+ final SynchronousResultReceiver<Boolean> recv = SynchronousResultReceiver.get();
recv.send(null);
assertTrue(recv.awaitResultNoInterrupt(OK_TIME).getValue(true));
}
@Test
public void testPropagateException() throws Exception {
- final SynchronousResultReceiver<Boolean> recv = new SynchronousResultReceiver();
+ final SynchronousResultReceiver<Boolean> recv = SynchronousResultReceiver.get();
recv.propagateException(new RuntimeException("Placeholder exception"));
Assert.assertThrows(RuntimeException.class,
() -> recv.awaitResultNoInterrupt(OK_TIME).getValue(false));
@@ -66,14 +66,14 @@
@Test
public void testTimeout() throws Exception {
- final SynchronousResultReceiver<Boolean> recv = new SynchronousResultReceiver();
+ final SynchronousResultReceiver<Boolean> recv = SynchronousResultReceiver.get();
Assert.assertThrows(TimeoutException.class,
() -> recv.awaitResultNoInterrupt(OK_TIME));
}
@Test
public void testNegativeTime() throws Exception {
- final SynchronousResultReceiver<Boolean> recv = new SynchronousResultReceiver();
+ final SynchronousResultReceiver<Boolean> recv = SynchronousResultReceiver.get();
recv.send(false);
Assert.assertThrows(TimeoutException.class,
() -> recv.awaitResultNoInterrupt(NEG_TIME));