Merge "[RESTRICT AUTOMERGE] [DataBroker] Implement piping large data to ScriptExecutor" into sc-v2-dev
diff --git a/service/src/com/android/car/telemetry/databroker/DataBrokerImpl.java b/service/src/com/android/car/telemetry/databroker/DataBrokerImpl.java
index f4b3e6a..695254c 100644
--- a/service/src/com/android/car/telemetry/databroker/DataBrokerImpl.java
+++ b/service/src/com/android/car/telemetry/databroker/DataBrokerImpl.java
@@ -25,6 +25,7 @@
import android.os.IBinder;
import android.os.Looper;
import android.os.Message;
+import android.os.ParcelFileDescriptor;
import android.os.PersistableBundle;
import android.os.RemoteException;
import android.os.UserHandle;
@@ -43,6 +44,9 @@
import com.android.car.telemetry.scriptexecutorinterface.IScriptExecutorListener;
import com.android.internal.annotations.VisibleForTesting;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
@@ -62,6 +66,12 @@
/** Bind to script executor 5 times before entering disabled state. */
private static final int MAX_BIND_SCRIPT_EXECUTOR_ATTEMPTS = 5;
+ /**
+ * If script input exceeds this size, it will be piped to script executor instead of directly
+ * passed via binder call.
+ */
+ private static final int LARGE_SCRIPT_INPUT_SIZE_BYTES = 20 * 1024; // 20 kb
+
private static final String SCRIPT_EXECUTOR_PACKAGE = "com.android.car.scriptexecutor";
private static final String SCRIPT_EXECUTOR_CLASS =
"com.android.car.scriptexecutor.ScriptExecutor";
@@ -338,18 +348,23 @@
if (task == null || task.getPriority() > mPriority) {
return;
}
+ // if script executor is null, bind service
+ if (mScriptExecutor == null) {
+ Slog.w(CarLog.TAG_TELEMETRY,
+ "script executor is null, cannot execute task");
+ // upon successful binding, a task will be scheduled to run if there are any
+ mTelemetryHandler.sendEmptyMessage(MSG_BIND_TO_SCRIPT_EXECUTOR);
+ return;
+ }
mTaskQueue.poll(); // remove task from queue
+ // update current name because a script is currently running
+ mCurrentScriptName = task.getMetricsConfig().getName();
try {
- if (mScriptExecutor == null) {
- Slog.w(CarLog.TAG_TELEMETRY,
- "script executor is null, cannot execute task");
- mTaskQueue.add(task);
- // upon successful binding, a task will be scheduled to run if there are any
- mTelemetryHandler.sendEmptyMessage(MSG_BIND_TO_SCRIPT_EXECUTOR);
+ if (task.getDataSizeBytes() >= LARGE_SCRIPT_INPUT_SIZE_BYTES) {
+ Slog.d(CarLog.TAG_TELEMETRY, "invoking script executor for large input");
+ invokeScriptForLargeInput(task);
} else {
Slog.d(CarLog.TAG_TELEMETRY, "invoking script executor");
- // update current name because a script is currently running
- mCurrentScriptName = task.getMetricsConfig().getName();
mScriptExecutor.invokeScript(
task.getMetricsConfig().getScript(),
task.getHandlerName(),
@@ -358,9 +373,56 @@
mScriptExecutorListener);
}
} catch (RemoteException e) {
- Slog.d(CarLog.TAG_TELEMETRY, "remote exception occurred invoking script", e);
- mTaskQueue.add(task); // will not trigger scheduleNextTask()
+ Slog.w(CarLog.TAG_TELEMETRY, "remote exception occurred invoking script", e);
+ unbindScriptExecutor();
+ addTaskToQueue(task); // will trigger scheduleNextTask() and re-binding scriptexecutor
+ } catch (IOException e) {
+ Slog.w(CarLog.TAG_TELEMETRY, "Either unable to create pipe or failed to pipe data"
+ + " to ScriptExecutor. Skipping the published data", e);
mCurrentScriptName = null;
+ scheduleNextTask(); // drop this task and schedule the next one
+ }
+ }
+
+ /**
+ * Sets up pipes, invokes ScriptExecutor#invokeScriptForLargeInput() API, and writes the
+ * script input to the pipe.
+ *
+ * @param task containing all the necessary parameters for ScriptExecutor API.
+ * @throws IOException if cannot create pipe or cannot write the bundle to pipe.
+ * @throws RemoteException if ScriptExecutor failed.
+ */
+ private void invokeScriptForLargeInput(ScriptExecutionTask task)
+ throws IOException, RemoteException {
+ ParcelFileDescriptor[] fds = ParcelFileDescriptor.createPipe();
+ ParcelFileDescriptor readFd = fds[0];
+ ParcelFileDescriptor writeFd = fds[1];
+ try {
+ mScriptExecutor.invokeScriptForLargeInput(
+ task.getMetricsConfig().getScript(),
+ task.getHandlerName(),
+ readFd,
+ mResultStore.getInterimResult(mCurrentScriptName),
+ mScriptExecutorListener);
+ } catch (RemoteException e) {
+ closeQuietly(readFd);
+ closeQuietly(writeFd);
+ throw e;
+ }
+ closeQuietly(readFd);
+
+ Slog.d(CarLog.TAG_TELEMETRY, "writing large script data to pipe");
+ try (OutputStream outputStream = new ParcelFileDescriptor.AutoCloseOutputStream(writeFd)) {
+ task.getData().writeToStream(outputStream);
+ }
+ }
+
+ /** Quietly closes Java Closeables, ignoring IOException. */
+ private void closeQuietly(Closeable closeable) {
+ try {
+ closeable.close();
+ } catch (IOException e) {
+ // Ignore
}
}
diff --git a/service/src/com/android/car/telemetry/databroker/ScriptExecutionTask.java b/service/src/com/android/car/telemetry/databroker/ScriptExecutionTask.java
index a1fb522..648fc79 100644
--- a/service/src/com/android/car/telemetry/databroker/ScriptExecutionTask.java
+++ b/service/src/com/android/car/telemetry/databroker/ScriptExecutionTask.java
@@ -16,6 +16,7 @@
package com.android.car.telemetry.databroker;
+import android.os.Parcel;
import android.os.PersistableBundle;
import com.android.car.telemetry.TelemetryProto;
@@ -69,6 +70,18 @@
return mSubscriber.getMetricsConfig().equals(metricsConfig);
}
+ /**
+ * Returns the script input data size in bytes.
+ * TODO(b/201545154): Investigate how to get bundle size without making a full copy.
+ */
+ public int getDataSizeBytes() {
+ Parcel parcel = Parcel.obtain();
+ parcel.writePersistableBundle(mData);
+ int size = parcel.dataSize();
+ parcel.recycle();
+ return size;
+ }
+
@Override
public int compareTo(ScriptExecutionTask other) {
if (getPriority() < other.getPriority()) {
diff --git a/tests/carservice_unit_test/src/com/android/car/telemetry/databroker/DataBrokerTest.java b/tests/carservice_unit_test/src/com/android/car/telemetry/databroker/DataBrokerTest.java
index 82f7e5e..0f37187 100644
--- a/tests/carservice_unit_test/src/com/android/car/telemetry/databroker/DataBrokerTest.java
+++ b/tests/carservice_unit_test/src/com/android/car/telemetry/databroker/DataBrokerTest.java
@@ -27,6 +27,7 @@
import static org.mockito.Mockito.when;
import android.annotation.Nullable;
+import android.car.AbstractExtendedMockitoCarServiceTestCase;
import android.car.hardware.CarPropertyConfig;
import android.content.Context;
import android.content.ServiceConnection;
@@ -54,14 +55,17 @@
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
+import java.io.IOException;
+import java.io.InputStream;
import java.nio.file.Files;
import java.util.Collections;
+import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
@RunWith(MockitoJUnitRunner.class)
-public class DataBrokerTest {
+public class DataBrokerTest extends AbstractExtendedMockitoCarServiceTestCase {
private static final int PROP_ID = 100;
private static final int PROP_AREA = 200;
private static final int PRIORITY_HIGH = 1;
@@ -145,13 +149,18 @@
SystemClock.elapsedRealtime());
}
+ @Override
+ protected void onSessionBuilder(CustomMockitoSessionBuilder builder) {
+ builder.spyStatic(ParcelFileDescriptor.class);
+ }
+
@Test
public void testSetTaskExecutionPriority_whenNoTask_shouldNotInvokeScriptExecutor()
throws Exception {
mDataBroker.setTaskExecutionPriority(PRIORITY_HIGH);
- waitForHandlerThreadToFinish();
- assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(0);
+ waitForTelemetryThreadToFinish();
+ assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(0);
}
@Test
@@ -161,10 +170,10 @@
mDataBroker.setTaskExecutionPriority(PRIORITY_HIGH);
- waitForHandlerThreadToFinish();
+ waitForTelemetryThreadToFinish();
// task is not polled
assertThat(mDataBroker.getTaskQueue().peek()).isEqualTo(mLowPriorityTask);
- assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(0);
+ assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(0);
}
@Test
@@ -174,18 +183,18 @@
mDataBroker.setTaskExecutionPriority(PRIORITY_HIGH);
- waitForHandlerThreadToFinish();
+ waitForTelemetryThreadToFinish();
// task is polled and run
assertThat(mDataBroker.getTaskQueue().peek()).isNull();
- assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(1);
+ assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(1);
}
@Test
public void testScheduleNextTask_whenNoTask_shouldNotInvokeScriptExecutor() throws Exception {
mDataBroker.scheduleNextTask();
- waitForHandlerThreadToFinish();
- assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(0);
+ waitForTelemetryThreadToFinish();
+ assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(0);
}
@Test
@@ -194,17 +203,17 @@
PriorityBlockingQueue<ScriptExecutionTask> taskQueue = mDataBroker.getTaskQueue();
taskQueue.add(mHighPriorityTask);
mDataBroker.scheduleNextTask(); // start a task
- waitForHandlerThreadToFinish();
+ waitForTelemetryThreadToFinish();
assertThat(taskQueue.peek()).isNull(); // assert that task is polled and running
taskQueue.add(mHighPriorityTask); // add another task into the queue
mDataBroker.scheduleNextTask(); // schedule next task while the last task is in progress
- waitForHandlerThreadToFinish();
+ waitForTelemetryThreadToFinish();
// verify task is not polled
assertThat(taskQueue.peek()).isEqualTo(mHighPriorityTask);
// expect one invocation for the task that is running
- assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(1);
+ assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(1);
}
@Test
@@ -216,14 +225,14 @@
taskQueue.add(mHighPriorityTask);
mDataBroker.scheduleNextTask(); // start a task
- waitForHandlerThreadToFinish();
+ waitForTelemetryThreadToFinish();
// end a task, should automatically schedule the next task
mFakeScriptExecutor.notifyScriptSuccess(mData); // posts to telemetry handler
- waitForHandlerThreadToFinish();
+ waitForTelemetryThreadToFinish();
// verify queue is empty, both tasks are polled and executed
assertThat(taskQueue.peek()).isNull();
- assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(2);
+ assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(2);
}
@Test
@@ -233,11 +242,11 @@
mDataBroker.getTaskQueue().add(mHighPriorityTask);
mDataBroker.scheduleNextTask();
- waitForHandlerThreadToFinish();
+ waitForTelemetryThreadToFinish();
mFakeScriptExecutor.notifyScriptSuccess(mData); // posts to telemetry handler
- waitForHandlerThreadToFinish();
- assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(1);
+ waitForTelemetryThreadToFinish();
+ assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(1);
verify(mMockResultStore).putInterimResult(
eq(mHighPriorityTask.getMetricsConfig().getName()), eq(mData));
}
@@ -254,11 +263,11 @@
.build();
mDataBroker.scheduleNextTask();
- waitForHandlerThreadToFinish();
+ waitForTelemetryThreadToFinish();
mFakeScriptExecutor.notifyScriptError(errorType.getNumber(), errorMessage);
- waitForHandlerThreadToFinish();
- assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(1);
+ waitForTelemetryThreadToFinish();
+ assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(1);
verify(mMockResultStore).putError(eq(METRICS_CONFIG_FOO.getName()), eq(expectedError));
}
@@ -270,11 +279,11 @@
mDataBroker.getTaskQueue().add(mHighPriorityTask);
mDataBroker.scheduleNextTask();
- waitForHandlerThreadToFinish();
+ waitForTelemetryThreadToFinish();
mFakeScriptExecutor.notifyScriptFinish(mData); // posts to telemetry handler
- waitForHandlerThreadToFinish();
- assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(1);
+ waitForTelemetryThreadToFinish();
+ assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(1);
verify(mMockResultStore).putFinalResult(
eq(mHighPriorityTask.getMetricsConfig().getName()), eq(mData));
}
@@ -289,12 +298,45 @@
mDataBroker.scheduleNextTask();
- waitForHandlerThreadToFinish();
- assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(1);
+ waitForTelemetryThreadToFinish();
+ assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(1);
assertThat(mFakeScriptExecutor.getSavedState()).isEqualTo(mData);
}
@Test
+ public void testScheduleNextTask_largeInput_shouldPipeData() throws Exception {
+ mData.putBooleanArray("1 MB Array", new boolean [1024 * 1024]);
+ mDataBroker.getTaskQueue().add(mHighPriorityTask);
+
+ mDataBroker.scheduleNextTask();
+
+ waitForTelemetryThreadToFinish();
+ assertThat(mFakeScriptExecutor.getInvokeScriptForLargeInputCount()).isEqualTo(1);
+ }
+
+ @Test
+ public void testScheduleNextTask_largeInputPipeIOException_shouldIgnoreCurrentTask()
+ throws Exception {
+ mData.putBooleanArray("1 MB Array", new boolean [1024 * 1024]);
+ PriorityBlockingQueue<ScriptExecutionTask> taskQueue = mDataBroker.getTaskQueue();
+ taskQueue.add(mHighPriorityTask); // invokeScriptForLargeInput() path
+ taskQueue.add(new ScriptExecutionTask(
+ new DataSubscriber(mDataBroker, METRICS_CONFIG_FOO, SUBSCRIBER_FOO),
+ new PersistableBundle(),
+ SystemClock.elapsedRealtime())); // invokeScript() path
+ ParcelFileDescriptor[] fds = ParcelFileDescriptor.createPipe();
+ when(ParcelFileDescriptor.createPipe()).thenReturn(fds);
+ fds[1].close(); // cause IO Exception in invokeScriptForLargeInput() path
+
+ mDataBroker.scheduleNextTask();
+
+ waitForTelemetryThreadToFinish();
+ assertThat(mFakeScriptExecutor.getInvokeScriptForLargeInputCount()).isEqualTo(1);
+ assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(1);
+ assertThat(taskQueue).isEmpty();
+ }
+
+ @Test
public void testScheduleNextTask_bindScriptExecutorFailedOnce_shouldRebind()
throws Exception {
Mockito.reset(mMockContext);
@@ -320,9 +362,9 @@
// will rebind to ScriptExecutor if it is null
mDataBroker.scheduleNextTask();
- waitForHandlerThreadToFinish();
+ waitForTelemetryThreadToFinish();
assertThat(taskQueue.peek()).isNull();
- assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(1);
+ assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(1);
}
@Test
@@ -340,14 +382,14 @@
// will rebind to ScriptExecutor if it is null
mDataBroker.scheduleNextTask();
- waitForHandlerThreadToFinish();
+ waitForTelemetryThreadToFinish();
// broker disabled, all subscribers should have been removed
assertThat(mDataBroker.getSubscriptionMap()).hasSize(0);
- assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(0);
+ assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(0);
}
@Test
- public void testScheduleNextTask_whenScriptExecutorThrowsException_shouldRequeueTask()
+ public void testScheduleNextTask_whenScriptExecutorThrowsException_shouldResetAndTryAgain()
throws Exception {
PriorityBlockingQueue<ScriptExecutionTask> taskQueue = mDataBroker.getTaskQueue();
taskQueue.add(mHighPriorityTask);
@@ -355,18 +397,18 @@
mDataBroker.scheduleNextTask();
- waitForHandlerThreadToFinish();
- // expect invokeScript() to be called and failed, causing the same task to be re-queued
- assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(1);
- assertThat(taskQueue.peek()).isEqualTo(mHighPriorityTask);
+ waitForTelemetryThreadToFinish();
+ // invokeScript() failed, task is re-queued and re-run
+ assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(2);
+ assertThat(taskQueue).isEmpty();
}
@Test
public void testAddTaskToQueue_shouldInvokeScriptExecutor() throws Exception {
mDataBroker.addTaskToQueue(mHighPriorityTask);
- waitForHandlerThreadToFinish();
- assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(1);
+ waitForTelemetryThreadToFinish();
+ assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(1);
}
@Test
@@ -417,7 +459,7 @@
assertThat(mDataBroker.getSubscriptionMap()).hasSize(0);
}
- private void waitForHandlerThreadToFinish() throws Exception {
+ private void waitForTelemetryThreadToFinish() throws Exception {
assertWithMessage("handler not idle in %sms", TIMEOUT_MS)
.that(mIdleHandlerLatch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)).isTrue();
mIdleHandlerLatch = new CountDownLatch(1); // reset idle handler condition
@@ -425,7 +467,8 @@
private static class FakeScriptExecutor implements IScriptExecutor {
private IScriptExecutorListener mListener;
- private int mApiInvocationCount = 0;
+ private int mInvokeScriptCount = 0;
+ private int mInvokeScriptForLargeInputCount = 0;
private int mFailApi = 0;
private PersistableBundle mSavedState = null;
@@ -434,7 +477,7 @@
PersistableBundle publishedData, @Nullable PersistableBundle savedState,
IScriptExecutorListener listener)
throws RemoteException {
- mApiInvocationCount++;
+ mInvokeScriptCount++;
mSavedState = savedState;
mListener = listener;
if (mFailApi > 0) {
@@ -448,13 +491,28 @@
ParcelFileDescriptor publishedDataFileDescriptor,
@Nullable PersistableBundle savedState,
IScriptExecutorListener listener) throws RemoteException {
- mApiInvocationCount++;
+ mInvokeScriptForLargeInputCount++;
mSavedState = savedState;
mListener = listener;
if (mFailApi > 0) {
mFailApi--;
throw new RemoteException("Simulated failure");
}
+ // Since DataBrokerImpl and FakeScriptExecutor are in the same process, they do not
+ // use real IPC and share the fd. When DataBroker closes the fd, it affects
+ // FakeScriptExecutor. Therefore FakeScriptExecutor must dup the fd before it is
+ // closed by DataBroker
+ ParcelFileDescriptor dup = null;
+ try {
+ dup = publishedDataFileDescriptor.dup();
+ } catch (IOException e) { }
+ final ParcelFileDescriptor fd = Objects.requireNonNull(dup);
+ // to prevent deadlock, read and write must happen on separate threads
+ Handler.getMain().post(() -> {
+ try (InputStream input = new ParcelFileDescriptor.AutoCloseInputStream(fd)) {
+ PersistableBundle.readFromStream(input);
+ } catch (IOException e) { }
+ });
}
@Override
@@ -494,9 +552,14 @@
mFailApi = n;
}
- /** Returns number of times the ScriptExecutor API was invoked. */
- public int getApiInvocationCount() {
- return mApiInvocationCount;
+ /** Returns number of times invokeScript() was called. */
+ public int getInvokeScriptCount() {
+ return mInvokeScriptCount;
+ }
+
+ /** Returns number of times invokeScriptForLargeInput() was called. */
+ public int getInvokeScriptForLargeInputCount() {
+ return mInvokeScriptForLargeInputCount;
}
/** Returns the interim data passed in invokeScript(). */