[RESTRICT AUTOMERGE] [DataBroker] Implement piping large data to ScriptExecutor

Bug: 200620524
Test: atest CarServiceUnitTest:DataBrokerTest

Change-Id: I9356e0f5f3e55201efbd3c8136e2a23da6c4b994
diff --git a/service/src/com/android/car/telemetry/databroker/DataBrokerImpl.java b/service/src/com/android/car/telemetry/databroker/DataBrokerImpl.java
index f4b3e6a..695254c 100644
--- a/service/src/com/android/car/telemetry/databroker/DataBrokerImpl.java
+++ b/service/src/com/android/car/telemetry/databroker/DataBrokerImpl.java
@@ -25,6 +25,7 @@
 import android.os.IBinder;
 import android.os.Looper;
 import android.os.Message;
+import android.os.ParcelFileDescriptor;
 import android.os.PersistableBundle;
 import android.os.RemoteException;
 import android.os.UserHandle;
@@ -43,6 +44,9 @@
 import com.android.car.telemetry.scriptexecutorinterface.IScriptExecutorListener;
 import com.android.internal.annotations.VisibleForTesting;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.List;
@@ -62,6 +66,12 @@
     /** Bind to script executor 5 times before entering disabled state. */
     private static final int MAX_BIND_SCRIPT_EXECUTOR_ATTEMPTS = 5;
 
+    /**
+     * If script input exceeds this size, it will be piped to script executor instead of directly
+     * passed via binder call.
+     */
+    private static final int LARGE_SCRIPT_INPUT_SIZE_BYTES = 20 * 1024; // 20 kb
+
     private static final String SCRIPT_EXECUTOR_PACKAGE = "com.android.car.scriptexecutor";
     private static final String SCRIPT_EXECUTOR_CLASS =
             "com.android.car.scriptexecutor.ScriptExecutor";
@@ -338,18 +348,23 @@
         if (task == null || task.getPriority() > mPriority) {
             return;
         }
+        // if script executor is null, bind service
+        if (mScriptExecutor == null) {
+            Slog.w(CarLog.TAG_TELEMETRY,
+                    "script executor is null, cannot execute task");
+            // upon successful binding, a task will be scheduled to run if there are any
+            mTelemetryHandler.sendEmptyMessage(MSG_BIND_TO_SCRIPT_EXECUTOR);
+            return;
+        }
         mTaskQueue.poll(); // remove task from queue
+        // update current name because a script is currently running
+        mCurrentScriptName = task.getMetricsConfig().getName();
         try {
-            if (mScriptExecutor == null) {
-                Slog.w(CarLog.TAG_TELEMETRY,
-                        "script executor is null, cannot execute task");
-                mTaskQueue.add(task);
-                // upon successful binding, a task will be scheduled to run if there are any
-                mTelemetryHandler.sendEmptyMessage(MSG_BIND_TO_SCRIPT_EXECUTOR);
+            if (task.getDataSizeBytes() >= LARGE_SCRIPT_INPUT_SIZE_BYTES) {
+                Slog.d(CarLog.TAG_TELEMETRY, "invoking script executor for large input");
+                invokeScriptForLargeInput(task);
             } else {
                 Slog.d(CarLog.TAG_TELEMETRY, "invoking script executor");
-                // update current name because a script is currently running
-                mCurrentScriptName = task.getMetricsConfig().getName();
                 mScriptExecutor.invokeScript(
                         task.getMetricsConfig().getScript(),
                         task.getHandlerName(),
@@ -358,9 +373,56 @@
                         mScriptExecutorListener);
             }
         } catch (RemoteException e) {
-            Slog.d(CarLog.TAG_TELEMETRY, "remote exception occurred invoking script", e);
-            mTaskQueue.add(task); // will not trigger scheduleNextTask()
+            Slog.w(CarLog.TAG_TELEMETRY, "remote exception occurred invoking script", e);
+            unbindScriptExecutor();
+            addTaskToQueue(task); // will trigger scheduleNextTask() and re-binding scriptexecutor
+        } catch (IOException e) {
+            Slog.w(CarLog.TAG_TELEMETRY, "Either unable to create pipe or failed to pipe data"
+                    + " to ScriptExecutor. Skipping the published data", e);
             mCurrentScriptName = null;
+            scheduleNextTask(); // drop this task and schedule the next one
+        }
+    }
+
+    /**
+     * Sets up pipes, invokes ScriptExecutor#invokeScriptForLargeInput() API, and writes the
+     * script input to the pipe.
+     *
+     * @param task containing all the necessary parameters for ScriptExecutor API.
+     * @throws IOException if cannot create pipe or cannot write the bundle to pipe.
+     * @throws RemoteException if ScriptExecutor failed.
+     */
+    private void invokeScriptForLargeInput(ScriptExecutionTask task)
+            throws IOException, RemoteException {
+        ParcelFileDescriptor[] fds = ParcelFileDescriptor.createPipe();
+        ParcelFileDescriptor readFd = fds[0];
+        ParcelFileDescriptor writeFd = fds[1];
+        try {
+            mScriptExecutor.invokeScriptForLargeInput(
+                    task.getMetricsConfig().getScript(),
+                    task.getHandlerName(),
+                    readFd,
+                    mResultStore.getInterimResult(mCurrentScriptName),
+                    mScriptExecutorListener);
+        } catch (RemoteException e) {
+            closeQuietly(readFd);
+            closeQuietly(writeFd);
+            throw e;
+        }
+        closeQuietly(readFd);
+
+        Slog.d(CarLog.TAG_TELEMETRY, "writing large script data to pipe");
+        try (OutputStream outputStream = new ParcelFileDescriptor.AutoCloseOutputStream(writeFd)) {
+            task.getData().writeToStream(outputStream);
+        }
+    }
+
+    /** Quietly closes Java Closeables, ignoring IOException. */
+    private void closeQuietly(Closeable closeable) {
+        try {
+            closeable.close();
+        } catch (IOException e) {
+            // Ignore
         }
     }
 
diff --git a/service/src/com/android/car/telemetry/databroker/ScriptExecutionTask.java b/service/src/com/android/car/telemetry/databroker/ScriptExecutionTask.java
index a1fb522..648fc79 100644
--- a/service/src/com/android/car/telemetry/databroker/ScriptExecutionTask.java
+++ b/service/src/com/android/car/telemetry/databroker/ScriptExecutionTask.java
@@ -16,6 +16,7 @@
 
 package com.android.car.telemetry.databroker;
 
+import android.os.Parcel;
 import android.os.PersistableBundle;
 
 import com.android.car.telemetry.TelemetryProto;
@@ -69,6 +70,18 @@
         return mSubscriber.getMetricsConfig().equals(metricsConfig);
     }
 
+    /**
+     * Returns the script input data size in bytes.
+     * TODO(b/201545154): Investigate how to get bundle size without making a full copy.
+     */
+    public int getDataSizeBytes() {
+        Parcel parcel = Parcel.obtain();
+        parcel.writePersistableBundle(mData);
+        int size = parcel.dataSize();
+        parcel.recycle();
+        return size;
+    }
+
     @Override
     public int compareTo(ScriptExecutionTask other) {
         if (getPriority() < other.getPriority()) {
diff --git a/tests/carservice_unit_test/src/com/android/car/telemetry/databroker/DataBrokerTest.java b/tests/carservice_unit_test/src/com/android/car/telemetry/databroker/DataBrokerTest.java
index 82f7e5e..0f37187 100644
--- a/tests/carservice_unit_test/src/com/android/car/telemetry/databroker/DataBrokerTest.java
+++ b/tests/carservice_unit_test/src/com/android/car/telemetry/databroker/DataBrokerTest.java
@@ -27,6 +27,7 @@
 import static org.mockito.Mockito.when;
 
 import android.annotation.Nullable;
+import android.car.AbstractExtendedMockitoCarServiceTestCase;
 import android.car.hardware.CarPropertyConfig;
 import android.content.Context;
 import android.content.ServiceConnection;
@@ -54,14 +55,17 @@
 import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.nio.file.Files;
 import java.util.Collections;
+import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 @RunWith(MockitoJUnitRunner.class)
-public class DataBrokerTest {
+public class DataBrokerTest extends AbstractExtendedMockitoCarServiceTestCase {
     private static final int PROP_ID = 100;
     private static final int PROP_AREA = 200;
     private static final int PRIORITY_HIGH = 1;
@@ -145,13 +149,18 @@
                 SystemClock.elapsedRealtime());
     }
 
+    @Override
+    protected void onSessionBuilder(CustomMockitoSessionBuilder builder) {
+        builder.spyStatic(ParcelFileDescriptor.class);
+    }
+
     @Test
     public void testSetTaskExecutionPriority_whenNoTask_shouldNotInvokeScriptExecutor()
             throws Exception {
         mDataBroker.setTaskExecutionPriority(PRIORITY_HIGH);
 
-        waitForHandlerThreadToFinish();
-        assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(0);
+        waitForTelemetryThreadToFinish();
+        assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(0);
     }
 
     @Test
@@ -161,10 +170,10 @@
 
         mDataBroker.setTaskExecutionPriority(PRIORITY_HIGH);
 
-        waitForHandlerThreadToFinish();
+        waitForTelemetryThreadToFinish();
         // task is not polled
         assertThat(mDataBroker.getTaskQueue().peek()).isEqualTo(mLowPriorityTask);
-        assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(0);
+        assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(0);
     }
 
     @Test
@@ -174,18 +183,18 @@
 
         mDataBroker.setTaskExecutionPriority(PRIORITY_HIGH);
 
-        waitForHandlerThreadToFinish();
+        waitForTelemetryThreadToFinish();
         // task is polled and run
         assertThat(mDataBroker.getTaskQueue().peek()).isNull();
-        assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(1);
+        assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(1);
     }
 
     @Test
     public void testScheduleNextTask_whenNoTask_shouldNotInvokeScriptExecutor() throws Exception {
         mDataBroker.scheduleNextTask();
 
-        waitForHandlerThreadToFinish();
-        assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(0);
+        waitForTelemetryThreadToFinish();
+        assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(0);
     }
 
     @Test
@@ -194,17 +203,17 @@
         PriorityBlockingQueue<ScriptExecutionTask> taskQueue = mDataBroker.getTaskQueue();
         taskQueue.add(mHighPriorityTask);
         mDataBroker.scheduleNextTask(); // start a task
-        waitForHandlerThreadToFinish();
+        waitForTelemetryThreadToFinish();
         assertThat(taskQueue.peek()).isNull(); // assert that task is polled and running
         taskQueue.add(mHighPriorityTask); // add another task into the queue
 
         mDataBroker.scheduleNextTask(); // schedule next task while the last task is in progress
 
-        waitForHandlerThreadToFinish();
+        waitForTelemetryThreadToFinish();
         // verify task is not polled
         assertThat(taskQueue.peek()).isEqualTo(mHighPriorityTask);
         // expect one invocation for the task that is running
-        assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(1);
+        assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(1);
     }
 
     @Test
@@ -216,14 +225,14 @@
         taskQueue.add(mHighPriorityTask);
 
         mDataBroker.scheduleNextTask(); // start a task
-        waitForHandlerThreadToFinish();
+        waitForTelemetryThreadToFinish();
         // end a task, should automatically schedule the next task
         mFakeScriptExecutor.notifyScriptSuccess(mData); // posts to telemetry handler
 
-        waitForHandlerThreadToFinish();
+        waitForTelemetryThreadToFinish();
         // verify queue is empty, both tasks are polled and executed
         assertThat(taskQueue.peek()).isNull();
-        assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(2);
+        assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(2);
     }
 
     @Test
@@ -233,11 +242,11 @@
         mDataBroker.getTaskQueue().add(mHighPriorityTask);
 
         mDataBroker.scheduleNextTask();
-        waitForHandlerThreadToFinish();
+        waitForTelemetryThreadToFinish();
         mFakeScriptExecutor.notifyScriptSuccess(mData); // posts to telemetry handler
 
-        waitForHandlerThreadToFinish();
-        assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(1);
+        waitForTelemetryThreadToFinish();
+        assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(1);
         verify(mMockResultStore).putInterimResult(
                 eq(mHighPriorityTask.getMetricsConfig().getName()), eq(mData));
     }
@@ -254,11 +263,11 @@
                 .build();
 
         mDataBroker.scheduleNextTask();
-        waitForHandlerThreadToFinish();
+        waitForTelemetryThreadToFinish();
         mFakeScriptExecutor.notifyScriptError(errorType.getNumber(), errorMessage);
 
-        waitForHandlerThreadToFinish();
-        assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(1);
+        waitForTelemetryThreadToFinish();
+        assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(1);
         verify(mMockResultStore).putError(eq(METRICS_CONFIG_FOO.getName()), eq(expectedError));
     }
 
@@ -270,11 +279,11 @@
         mDataBroker.getTaskQueue().add(mHighPriorityTask);
 
         mDataBroker.scheduleNextTask();
-        waitForHandlerThreadToFinish();
+        waitForTelemetryThreadToFinish();
         mFakeScriptExecutor.notifyScriptFinish(mData); // posts to telemetry handler
 
-        waitForHandlerThreadToFinish();
-        assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(1);
+        waitForTelemetryThreadToFinish();
+        assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(1);
         verify(mMockResultStore).putFinalResult(
                 eq(mHighPriorityTask.getMetricsConfig().getName()), eq(mData));
     }
@@ -289,12 +298,45 @@
 
         mDataBroker.scheduleNextTask();
 
-        waitForHandlerThreadToFinish();
-        assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(1);
+        waitForTelemetryThreadToFinish();
+        assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(1);
         assertThat(mFakeScriptExecutor.getSavedState()).isEqualTo(mData);
     }
 
     @Test
+    public void testScheduleNextTask_largeInput_shouldPipeData() throws Exception {
+        mData.putBooleanArray("1 MB Array", new boolean [1024 * 1024]);
+        mDataBroker.getTaskQueue().add(mHighPriorityTask);
+
+        mDataBroker.scheduleNextTask();
+
+        waitForTelemetryThreadToFinish();
+        assertThat(mFakeScriptExecutor.getInvokeScriptForLargeInputCount()).isEqualTo(1);
+    }
+
+    @Test
+    public void testScheduleNextTask_largeInputPipeIOException_shouldIgnoreCurrentTask()
+            throws Exception {
+        mData.putBooleanArray("1 MB Array", new boolean [1024 * 1024]);
+        PriorityBlockingQueue<ScriptExecutionTask> taskQueue = mDataBroker.getTaskQueue();
+        taskQueue.add(mHighPriorityTask); // invokeScriptForLargeInput() path
+        taskQueue.add(new ScriptExecutionTask(
+                new DataSubscriber(mDataBroker, METRICS_CONFIG_FOO, SUBSCRIBER_FOO),
+                new PersistableBundle(),
+                SystemClock.elapsedRealtime())); // invokeScript() path
+        ParcelFileDescriptor[] fds = ParcelFileDescriptor.createPipe();
+        when(ParcelFileDescriptor.createPipe()).thenReturn(fds);
+        fds[1].close(); // cause IO Exception in invokeScriptForLargeInput() path
+
+        mDataBroker.scheduleNextTask();
+
+        waitForTelemetryThreadToFinish();
+        assertThat(mFakeScriptExecutor.getInvokeScriptForLargeInputCount()).isEqualTo(1);
+        assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(1);
+        assertThat(taskQueue).isEmpty();
+    }
+
+    @Test
     public void testScheduleNextTask_bindScriptExecutorFailedOnce_shouldRebind()
             throws Exception {
         Mockito.reset(mMockContext);
@@ -320,9 +362,9 @@
         // will rebind to ScriptExecutor if it is null
         mDataBroker.scheduleNextTask();
 
-        waitForHandlerThreadToFinish();
+        waitForTelemetryThreadToFinish();
         assertThat(taskQueue.peek()).isNull();
-        assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(1);
+        assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(1);
     }
 
     @Test
@@ -340,14 +382,14 @@
         // will rebind to ScriptExecutor if it is null
         mDataBroker.scheduleNextTask();
 
-        waitForHandlerThreadToFinish();
+        waitForTelemetryThreadToFinish();
         // broker disabled, all subscribers should have been removed
         assertThat(mDataBroker.getSubscriptionMap()).hasSize(0);
-        assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(0);
+        assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(0);
     }
 
     @Test
-    public void testScheduleNextTask_whenScriptExecutorThrowsException_shouldRequeueTask()
+    public void testScheduleNextTask_whenScriptExecutorThrowsException_shouldResetAndTryAgain()
             throws Exception {
         PriorityBlockingQueue<ScriptExecutionTask> taskQueue = mDataBroker.getTaskQueue();
         taskQueue.add(mHighPriorityTask);
@@ -355,18 +397,18 @@
 
         mDataBroker.scheduleNextTask();
 
-        waitForHandlerThreadToFinish();
-        // expect invokeScript() to be called and failed, causing the same task to be re-queued
-        assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(1);
-        assertThat(taskQueue.peek()).isEqualTo(mHighPriorityTask);
+        waitForTelemetryThreadToFinish();
+        // invokeScript() failed, task is re-queued and re-run
+        assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(2);
+        assertThat(taskQueue).isEmpty();
     }
 
     @Test
     public void testAddTaskToQueue_shouldInvokeScriptExecutor() throws Exception {
         mDataBroker.addTaskToQueue(mHighPriorityTask);
 
-        waitForHandlerThreadToFinish();
-        assertThat(mFakeScriptExecutor.getApiInvocationCount()).isEqualTo(1);
+        waitForTelemetryThreadToFinish();
+        assertThat(mFakeScriptExecutor.getInvokeScriptCount()).isEqualTo(1);
     }
 
     @Test
@@ -417,7 +459,7 @@
         assertThat(mDataBroker.getSubscriptionMap()).hasSize(0);
     }
 
-    private void waitForHandlerThreadToFinish() throws Exception {
+    private void waitForTelemetryThreadToFinish() throws Exception {
         assertWithMessage("handler not idle in %sms", TIMEOUT_MS)
                 .that(mIdleHandlerLatch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)).isTrue();
         mIdleHandlerLatch = new CountDownLatch(1); // reset idle handler condition
@@ -425,7 +467,8 @@
 
     private static class FakeScriptExecutor implements IScriptExecutor {
         private IScriptExecutorListener mListener;
-        private int mApiInvocationCount = 0;
+        private int mInvokeScriptCount = 0;
+        private int mInvokeScriptForLargeInputCount = 0;
         private int mFailApi = 0;
         private PersistableBundle mSavedState = null;
 
@@ -434,7 +477,7 @@
                 PersistableBundle publishedData, @Nullable PersistableBundle savedState,
                 IScriptExecutorListener listener)
                 throws RemoteException {
-            mApiInvocationCount++;
+            mInvokeScriptCount++;
             mSavedState = savedState;
             mListener = listener;
             if (mFailApi > 0) {
@@ -448,13 +491,28 @@
                 ParcelFileDescriptor publishedDataFileDescriptor,
                 @Nullable PersistableBundle savedState,
                 IScriptExecutorListener listener) throws RemoteException {
-            mApiInvocationCount++;
+            mInvokeScriptForLargeInputCount++;
             mSavedState = savedState;
             mListener = listener;
             if (mFailApi > 0) {
                 mFailApi--;
                 throw new RemoteException("Simulated failure");
             }
+            // Since DataBrokerImpl and FakeScriptExecutor are in the same process, they do not
+            // use real IPC and share the fd. When DataBroker closes the fd, it affects
+            // FakeScriptExecutor. Therefore FakeScriptExecutor must dup the fd before it is
+            // closed by DataBroker
+            ParcelFileDescriptor dup = null;
+            try {
+                dup = publishedDataFileDescriptor.dup();
+            } catch (IOException e) { }
+            final ParcelFileDescriptor fd = Objects.requireNonNull(dup);
+            // to prevent deadlock, read and write must happen on separate threads
+            Handler.getMain().post(() -> {
+                try (InputStream input = new ParcelFileDescriptor.AutoCloseInputStream(fd)) {
+                    PersistableBundle.readFromStream(input);
+                } catch (IOException e) { }
+            });
         }
 
         @Override
@@ -494,9 +552,14 @@
             mFailApi = n;
         }
 
-        /** Returns number of times the ScriptExecutor API was invoked. */
-        public int getApiInvocationCount() {
-            return mApiInvocationCount;
+        /** Returns number of times invokeScript() was called. */
+        public int getInvokeScriptCount() {
+            return mInvokeScriptCount;
+        }
+
+        /** Returns number of times invokeScriptForLargeInput() was called. */
+        public int getInvokeScriptForLargeInputCount() {
+            return mInvokeScriptForLargeInputCount;
         }
 
         /** Returns the interim data passed in invokeScript(). */