Merge 322ec94ce05c4271cea08e20e5129f2ea86812ca on remote branch

Change-Id: Ic98e6d87ded26c37457248f814e214d9067a286f
diff --git a/apex_manifest.json b/apex_manifest.json
index 48ab7ca..9fb5d80 100644
--- a/apex_manifest.json
+++ b/apex_manifest.json
@@ -1,4 +1,4 @@
 {
   "name": "com.android.appsearch",
-  "version": 339990000
+  "version": 330400000
 }
diff --git a/service/java/com/android/server/appsearch/contactsindexer/ContactsIndexerUserInstance.java b/service/java/com/android/server/appsearch/contactsindexer/ContactsIndexerUserInstance.java
index 9beea1f..8928343 100644
--- a/service/java/com/android/server/appsearch/contactsindexer/ContactsIndexerUserInstance.java
+++ b/service/java/com/android/server/appsearch/contactsindexer/ContactsIndexerUserInstance.java
@@ -28,6 +28,7 @@
 import android.provider.ContactsContract;
 import android.util.Log;
 
+import com.android.internal.annotations.GuardedBy;
 import com.android.internal.annotations.VisibleForTesting;
 import com.android.server.appsearch.stats.AppSearchStatsLog;
 
@@ -42,7 +43,6 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Contacts Indexer for a single user.
@@ -62,9 +62,17 @@
     private final File mDataDir;
     private final ContactsIndexerSettings mSettings;
     private final ContactsObserver mContactsObserver;
-    // Used for batching/throttling the contact change notification so we won't schedule too many
-    // delta updates.
-    private final AtomicBoolean mDeltaUpdatePending = new AtomicBoolean(/*initialValue=*/ false);
+
+    // Those two booleans below are used for batching/throttling the contact change
+    // notification so we won't schedule too many delta updates.
+    private final Object mDeltaUpdateLock = new Object();
+    // Whether a delta update has been scheduled or run. Now we only allow one delta update being
+    // run at a time.
+    @GuardedBy("mDeltaUpdateLock")
+    private boolean mDeltaUpdateScheduled = false;
+    // Whether we are receiving notifications from CP2.
+    @GuardedBy("mDeltaUpdateLock")
+    private boolean mCp2ChangePending = false;
 
     private final AppSearchHelper mAppSearchHelper;
     private final ContactsIndexerImpl mContactsIndexerImpl;
@@ -309,14 +317,14 @@
     /**
      * Does the delta/instant update to sync the contacts from CP2 to AppSearch.
      *
-     * <p>{@link #mDeltaUpdatePending} is being used to avoid scheduling any update BEFORE an active
-     * update is being processed.
+     * <p>{@link #mDeltaUpdateScheduled} is being used to avoid scheduling any update BEFORE an
+     * active update finishes.
      *
      * <p>{@link #mSingleThreadedExecutor} is being used to make sure there is one and only one
-     * running update, and at most one pending update is queued while the current active update is
-     * running.
+     * delta update can be scheduled and run.
      */
-    private void handleDeltaUpdate() {
+    @VisibleForTesting
+    /*package*/ void handleDeltaUpdate() {
         if (!ContentResolver.getCurrentSyncs().isEmpty()) {
             // TODO(b/221905367): make sure that the delta update is scheduled as soon
             //  as the current sync is completed.
@@ -326,44 +334,55 @@
             return;
         }
 
-        // We want to batch (trigger only one update) on all Contact Updates for the associated
-        // user within the time window(delaySec). And we hope the query to CP2 "Give me all the
-        // contacts from timestamp T" would catch all the unhandled contact change notifications.
-        if (!mDeltaUpdatePending.getAndSet(true)) {
-            executeOnSingleThreadedExecutor(() -> {
-                ContactsUpdateStats updateStats = new ContactsUpdateStats();
-                // TODO(b/226489369): apply instant indexing limit on CP2 changes also?
-                // TODO(b/222126568): refactor doDeltaUpdateAsync() to return a future value of
-                //  ContactsUpdateStats so that it can be checked and logged here, instead of the
-                //  placeholder exceptionally() block that only logs to the console.
-                doDeltaUpdateAsync(mContactsIndexerConfig.getContactsDeltaUpdateLimit(),
-                        updateStats).exceptionally(t -> {
-                    if (LogUtil.DEBUG) {
-                        Log.d(TAG, "Failed to index CP2 change", t);
-                    }
-                    return null;
-                });
-            });
+        synchronized (mDeltaUpdateLock) {
+            // Record that a CP2 change notification has been received, and will be handled
+            // by the next delta update task.
+            mCp2ChangePending = true;
+            scheduleDeltaUpdateLocked();
         }
     }
 
     /**
-     * Does the delta update. It also resets {@link ContactsIndexerUserInstance#mDeltaUpdatePending}
-     * to false.
+     * Schedule a delta update. No new delta update can be scheduled if there is one delta update
+     * already scheduled or currently being run.
+     *
+     * <p>ATTENTION!!! This function needs to be light weight since it is being called by CP2 with a
+     * lock.
+     */
+    @GuardedBy("mDeltaUpdateLock")
+    private void scheduleDeltaUpdateLocked() {
+        if (mDeltaUpdateScheduled) {
+            return;
+        }
+        mDeltaUpdateScheduled = true;
+        executeOnSingleThreadedExecutor(() -> {
+            ContactsUpdateStats updateStats = new ContactsUpdateStats();
+            // TODO(b/226489369): apply instant indexing limit on CP2 changes also?
+            // TODO(b/222126568): refactor doDeltaUpdateAsync() to return a future value of
+            //  ContactsUpdateStats so that it can be checked and logged here, instead of the
+            //  placeholder exceptionally() block that only logs to the console.
+            doDeltaUpdateAsync(mContactsIndexerConfig.getContactsDeltaUpdateLimit(),
+                    updateStats).exceptionally(t -> {
+                if (LogUtil.DEBUG) {
+                    Log.d(TAG, "Failed to index CP2 change", t);
+                }
+                return null;
+            });
+        });
+    }
+
+    /**
+     * Does the delta update. It also resets
+     * {@link ContactsIndexerUserInstance#mDeltaUpdateScheduled} to false.
      */
     @VisibleForTesting
     /*package*/ CompletableFuture<Void> doDeltaUpdateAsync(
             int indexingLimit, @NonNull ContactsUpdateStats updateStats) {
-        // Reset the delta update pending flag at the top of this method. This allows the next
-        // ContentObserver.onChange() notification to schedule another delta-update task on the
-        // executor. Note that additional change notifications will not schedule more
-        // delta-update
-        // tasks.
-        // Resetting the delta update pending flag after calling ContentResolver.query() to get
-        // the updated contact IDs and deleted contact IDs runs the risk of a race condition
-        // where a change notification is sent and handled after the query() ends but before the
-        // flag is reset.
-        mDeltaUpdatePending.set(false);
+        synchronized (mDeltaUpdateLock) {
+            // Record that the CP2 change notification is being handled by this delta update task.
+            mCp2ChangePending = false;
+        }
+
         updateStats.mUpdateType = ContactsUpdateStats.DELTA_UPDATE;
         updateStats.mUpdateAndDeleteStartTimeMillis = System.currentTimeMillis();
         long lastDeltaUpdateTimestampMillis = mSettings.getLastDeltaUpdateTimestampMillis();
@@ -392,46 +411,61 @@
         //  little complicated.
         return mContactsIndexerImpl.updatePersonCorpusAsync(wantedIds, unWantedIds, updateStats)
                 .handle((x, t) -> {
-                    if (t != null) {
-                        Log.w(TAG, "Failed to perform delta update", t);
-                        // Just clear all the remaining contacts in case of error.
-                        mContactsIndexerImpl.cancelUpdatePersonCorpus();
-                        if (updateStats.mUpdateStatuses.isEmpty()
-                                && updateStats.mDeleteStatuses.isEmpty()) {
-                            // Somehow this error is not reflected in the stats, and
-                            // unfortunately we don't know which part is wrong. Just add an error
-                            // code for the update.
-                            updateStats.mUpdateStatuses.add(
-                                    AppSearchResult.RESULT_UNKNOWN_ERROR);
+                    try {
+                        if (t != null) {
+                            Log.w(TAG, "Failed to perform delta update", t);
+                            // Just clear all the remaining contacts in case of error.
+                            mContactsIndexerImpl.cancelUpdatePersonCorpus();
+                            if (updateStats.mUpdateStatuses.isEmpty()
+                                    && updateStats.mDeleteStatuses.isEmpty()) {
+                                // Somehow this error is not reflected in the stats, and
+                                // unfortunately we don't know which part is wrong. Just add an
+                                // error code for the update.
+                                updateStats.mUpdateStatuses.add(
+                                        AppSearchResult.RESULT_UNKNOWN_ERROR);
+                            }
+                        }
+                        // Persisting timestamping and logging, no matter if update succeeds or not.
+                        if (LogUtil.DEBUG) {
+                            Log.d(TAG, "updated timestamps --"
+                                    + " lastDeltaUpdateTimestampMillis: "
+                                    + mostRecentContactLastUpdateTimestampMillis
+                                    + " lastDeltaDeleteTimestampMillis: "
+                                    + mostRecentContactDeletedTimestampMillis);
+                        }
+                        mSettings.setLastDeltaUpdateTimestampMillis(
+                                mostRecentContactLastUpdateTimestampMillis);
+                        mSettings.setLastDeltaDeleteTimestampMillis(
+                                mostRecentContactDeletedTimestampMillis);
+                        persistSettings();
+                        logStats(updateStats);
+                        if (updateStats.mUpdateStatuses.contains(
+                                AppSearchResult.RESULT_OUT_OF_SPACE)) {
+                            // Some indexing failed due to OUT_OF_SPACE from AppSearch. We can
+                            // simply schedule a full update so we can trim the Person corpus in
+                            // AppSearch to make some room for delta update. We need to monitor
+                            // the failure count and reasons for indexing during full update to
+                            // see if that limit (10,000) is too big right now, considering we
+                            // are sharing this limit with any AppSearch clients, e.g.
+                            // ShortcutManager, in the system server.
+                            ContactsIndexerMaintenanceService.scheduleFullUpdateJob(mContext,
+                                    mContext.getUser().getIdentifier(), /*periodic=*/ false,
+                                    /*intervalMillis=*/ -1);
+                        }
+
+                        return null;
+                    } finally {
+                        synchronized (mDeltaUpdateLock) {
+                            // The current delta update is done. Reset the flag so new delta
+                            // update can be scheduled and run.
+                            mDeltaUpdateScheduled = false;
+                            // If another CP2 change notifications were received while this delta
+                            // update task was running, schedule it again.
+                            if (mCp2ChangePending) {
+                                scheduleDeltaUpdateLocked();
+                            }
                         }
                     }
-                    // Persisting timestamping and logging, no matter if update succeeds or not.
-                    if (LogUtil.DEBUG) {
-                        Log.d(TAG, "updated timestamps --"
-                                + " lastDeltaUpdateTimestampMillis: "
-                                + mostRecentContactLastUpdateTimestampMillis
-                                + " lastDeltaDeleteTimestampMillis: "
-                                + mostRecentContactDeletedTimestampMillis);
-                    }
-                    mSettings.setLastDeltaUpdateTimestampMillis(
-                            mostRecentContactLastUpdateTimestampMillis);
-                    mSettings.setLastDeltaDeleteTimestampMillis(
-                            mostRecentContactDeletedTimestampMillis);
-                    persistSettings();
-                    logStats(updateStats);
-                    if (updateStats.mUpdateStatuses.contains(AppSearchResult.RESULT_OUT_OF_SPACE)) {
-                        // Some indexing failed due to OUT_OF_SPACE from AppSearch. We can simply
-                        // schedule a full update so we can trim the Person corpus in AppSearch
-                        // to make some room for delta update. We need to monitor the failure
-                        // count and reasons for indexing during full update to see if that limit
-                        // (10,000) is too big right now, considering we are sharing this limit
-                        // with any AppSearch clients, e.g. ShortcutManager, in the system server.
-                        ContactsIndexerMaintenanceService.scheduleFullUpdateJob(mContext,
-                                mContext.getUser().getIdentifier(), /*periodic=*/ false,
-                                /*intervalMillis=*/ -1);
-                    }
-
-                    return null;
                 });
     }
 
diff --git a/testing/contactsindexertests/src/com/android/server/appsearch/contactsindexer/ContactsIndexerUserInstanceTest.java b/testing/contactsindexertests/src/com/android/server/appsearch/contactsindexer/ContactsIndexerUserInstanceTest.java
index 72d10b6..5df0f0f 100644
--- a/testing/contactsindexertests/src/com/android/server/appsearch/contactsindexer/ContactsIndexerUserInstanceTest.java
+++ b/testing/contactsindexertests/src/com/android/server/appsearch/contactsindexer/ContactsIndexerUserInstanceTest.java
@@ -69,11 +69,14 @@
 import java.io.File;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -137,6 +140,101 @@
     }
 
     @Test
+    public void testHandleMultipleNotifications_onlyOneDeltaUpdateCanBeScheduledAndRun()
+            throws Exception {
+        try {
+            long dataQueryDelayMs = 5000;
+            getProvider().setDataQueryDelayMs(dataQueryDelayMs);
+            BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+            ThreadPoolExecutor singleThreadedExecutor =
+                    new ThreadPoolExecutor(/*corePoolSize=*/1, /*maximumPoolSize=*/
+                            1, /*KeepAliveTime=*/ 0L, TimeUnit.MILLISECONDS, queue);
+            ContactsIndexerUserInstance instance = ContactsIndexerUserInstance.createInstance(
+                    mContext, mContactsDir,
+                    mConfigForTest, singleThreadedExecutor);
+
+            int numOfNotifications = 20;
+            for (int i = 0; i < numOfNotifications / 2; ++i) {
+                int docCount = 2;
+                // Insert contacts to trigger delta update.
+                ContentResolver resolver = mContext.getContentResolver();
+                ContentValues dummyValues = new ContentValues();
+                for (int j = 0; j < docCount; j++) {
+                    resolver.insert(ContactsContract.Contacts.CONTENT_URI, dummyValues);
+                }
+                instance.handleDeltaUpdate();
+            }
+            // Sleep here so the active delta update can be run for some time. While
+            // notifications come before and afterwards.
+            Thread.sleep(1500);
+            long totalTaskAfterFirstDeltaUpdate = singleThreadedExecutor.getTaskCount();
+            for (int i = 0; i < numOfNotifications / 2; ++i) {
+                int docCount = 2;
+                // Insert contacts to trigger delta update.
+                ContentResolver resolver = mContext.getContentResolver();
+                ContentValues dummyValues = new ContentValues();
+                for (int j = 0; j < docCount; j++) {
+                    resolver.insert(ContactsContract.Contacts.CONTENT_URI, dummyValues);
+                }
+                instance.handleDeltaUpdate();
+            }
+
+            // The total task count will be increased if there is another delta update scheduled.
+            assertThat(singleThreadedExecutor.getTaskCount()).isEqualTo(
+                    totalTaskAfterFirstDeltaUpdate);
+        } finally {
+            getProvider().setDataQueryDelayMs(0);
+        }
+    }
+
+    @Test
+    public void testHandleNotificationDuringUpdate_oneAdditionalUpdateWillBeRun()
+            throws Exception {
+        try {
+            long dataQueryDelayMs = 5000;
+            getProvider().setDataQueryDelayMs(dataQueryDelayMs);
+            BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+            ThreadPoolExecutor singleThreadedExecutor =
+                    new ThreadPoolExecutor(/*corePoolSize=*/1, /*maximumPoolSize=*/
+                            1, /*KeepAliveTime=*/ 0L, TimeUnit.MILLISECONDS, queue);
+            ContactsIndexerUserInstance instance = ContactsIndexerUserInstance.createInstance(
+                    mContext, mContactsDir,
+                    mConfigForTest, singleThreadedExecutor);
+            int docCount = 10;
+            // Insert contacts to trigger delta update.
+            ContentResolver resolver = mContext.getContentResolver();
+            ContentValues dummyValues = new ContentValues();
+
+            for (int j = 0; j < docCount; j++) {
+                resolver.insert(ContactsContract.Contacts.CONTENT_URI, dummyValues);
+            }
+            instance.handleDeltaUpdate();
+            // Sleep here so the active delta update can be run for some time to make sure
+            // notification come during an update.
+            Thread.sleep(1500);
+            long totalTaskAfterFirstDeltaUpdate = singleThreadedExecutor.getTaskCount();
+            // Insert contacts to trigger delta update.
+            for (int j = 0; j < docCount; j++) {
+                resolver.insert(ContactsContract.Contacts.CONTENT_URI, dummyValues);
+            }
+            instance.handleDeltaUpdate();
+
+            //The 2nd update won't be scheduled right away.
+            assertThat(singleThreadedExecutor.getTaskCount()).isEqualTo(
+                    totalTaskAfterFirstDeltaUpdate);
+
+            // To make sure the 1st update has been finished.
+            Thread.sleep(dataQueryDelayMs);
+
+            // This means additional task has been run by the 1st delta update to handle
+            // the change notification.
+            assertThat(singleThreadedExecutor.getActiveCount()).isEqualTo(1);
+        } finally {
+            getProvider().setDataQueryDelayMs(0);
+        }
+    }
+
+    @Test
     public void testCreateInstance_dataDirectoryCreatedAsynchronously() throws Exception {
         File dataDir = new File(mTemporaryFolder.newFolder(), "contacts");
         boolean isDataDirectoryCreatedSynchronously = mSingleThreadedExecutor.submit(() -> {
@@ -279,7 +377,7 @@
         }
 
         executeAndWaitForCompletion(mInstance.doDeltaUpdateAsync(/*indexingLimit=*/ -1,
-                        mUpdateStats),
+                mUpdateStats),
                 mSingleThreadedExecutor);
 
         AppSearchHelper searchHelper = AppSearchHelper.createAppSearchHelper(mContext,
@@ -317,7 +415,7 @@
         }
 
         executeAndWaitForCompletion(mInstance.doDeltaUpdateAsync(/*indexingLimit=*/ 100,
-                        mUpdateStats),
+                mUpdateStats),
                 mSingleThreadedExecutor);
 
         AppSearchHelper searchHelper = AppSearchHelper.createAppSearchHelper(mContext,
@@ -337,7 +435,7 @@
         }
 
         executeAndWaitForCompletion(mInstance.doDeltaUpdateAsync(/*indexingLimit=*/ -1,
-                        mUpdateStats),
+                mUpdateStats),
                 mSingleThreadedExecutor);
 
         // Delete a few contacts to trigger delta update.
@@ -351,7 +449,7 @@
                 /*extras=*/ null);
 
         executeAndWaitForCompletion(mInstance.doDeltaUpdateAsync(/*indexingLimit=*/ -1,
-                        mUpdateStats),
+                mUpdateStats),
                 mSingleThreadedExecutor);
 
         AppSearchHelper searchHelper = AppSearchHelper.createAppSearchHelper(mContext,
@@ -401,7 +499,7 @@
 
         mUpdateStats.clear();
         executeAndWaitForCompletion(mInstance.doDeltaUpdateAsync(/*indexingLimit=*/ -1,
-                        mUpdateStats),
+                mUpdateStats),
                 mSingleThreadedExecutor);
 
         AppSearchHelper searchHelper = AppSearchHelper.createAppSearchHelper(mContext,
@@ -462,7 +560,7 @@
 
         mUpdateStats.clear();
         executeAndWaitForCompletion(mInstance.doDeltaUpdateAsync(/*indexingLimit=*/ -1,
-                        mUpdateStats),
+                mUpdateStats),
                 mSingleThreadedExecutor);
 
         AppSearchHelper searchHelper = AppSearchHelper.createAppSearchHelper(mContext,
diff --git a/testing/contactsindexertests/src/com/android/server/appsearch/contactsindexer/FakeContactsProvider.java b/testing/contactsindexertests/src/com/android/server/appsearch/contactsindexer/FakeContactsProvider.java
index 9498cde..6e2f4c8 100644
--- a/testing/contactsindexertests/src/com/android/server/appsearch/contactsindexer/FakeContactsProvider.java
+++ b/testing/contactsindexertests/src/com/android/server/appsearch/contactsindexer/FakeContactsProvider.java
@@ -134,6 +134,9 @@
     private long mMostRecentContactLastUpdatedTimestampMillis;
     private long mMostRecentDeletedContactTimestampMillis;
 
+    // Data Query delay in millis added for testing
+    private long mDataQueryDelayMs = 0;
+
     // Only odd contactIds should have additional data.
     private static boolean shouldhaveAdditionalData(long contactId) {
         return (contactId & 1) > 0;
@@ -280,6 +283,10 @@
         this(ApplicationProvider.getApplicationContext().getResources());
     }
 
+    public void setDataQueryDelayMs(long dataQueryDelayMs) {
+        mDataQueryDelayMs = dataQueryDelayMs;
+    }
+
     FakeContactsProvider(Resources resources) {
         mResources = resources;
     }
@@ -320,6 +327,14 @@
     // together. We only process CONTACTS_DATA_ORDER_BY.
     protected Cursor manageDataQuery(
             String[] projection, String selection, String[] selectionArgs, String orderBy) {
+        if (mDataQueryDelayMs > 0) {
+             try {
+                Thread.sleep(mDataQueryDelayMs);
+            } catch (InterruptedException e) {
+                Log.d(TAG, "Got exception while applying data query delay.", e);
+            }
+        }
+
         MatrixCursor cursor = null;
         // Details in id list.
         if (CONTACTS_DATA_ORDER_BY.equals(orderBy) && (projection != null)) {