Merge 322ec94ce05c4271cea08e20e5129f2ea86812ca on remote branch
Change-Id: Ic98e6d87ded26c37457248f814e214d9067a286f
diff --git a/apex_manifest.json b/apex_manifest.json
index 48ab7ca..9fb5d80 100644
--- a/apex_manifest.json
+++ b/apex_manifest.json
@@ -1,4 +1,4 @@
{
"name": "com.android.appsearch",
- "version": 339990000
+ "version": 330400000
}
diff --git a/service/java/com/android/server/appsearch/contactsindexer/ContactsIndexerUserInstance.java b/service/java/com/android/server/appsearch/contactsindexer/ContactsIndexerUserInstance.java
index 9beea1f..8928343 100644
--- a/service/java/com/android/server/appsearch/contactsindexer/ContactsIndexerUserInstance.java
+++ b/service/java/com/android/server/appsearch/contactsindexer/ContactsIndexerUserInstance.java
@@ -28,6 +28,7 @@
import android.provider.ContactsContract;
import android.util.Log;
+import com.android.internal.annotations.GuardedBy;
import com.android.internal.annotations.VisibleForTesting;
import com.android.server.appsearch.stats.AppSearchStatsLog;
@@ -42,7 +43,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* Contacts Indexer for a single user.
@@ -62,9 +62,17 @@
private final File mDataDir;
private final ContactsIndexerSettings mSettings;
private final ContactsObserver mContactsObserver;
- // Used for batching/throttling the contact change notification so we won't schedule too many
- // delta updates.
- private final AtomicBoolean mDeltaUpdatePending = new AtomicBoolean(/*initialValue=*/ false);
+
+ // Those two booleans below are used for batching/throttling the contact change
+ // notification so we won't schedule too many delta updates.
+ private final Object mDeltaUpdateLock = new Object();
+ // Whether a delta update has been scheduled or run. Now we only allow one delta update being
+ // run at a time.
+ @GuardedBy("mDeltaUpdateLock")
+ private boolean mDeltaUpdateScheduled = false;
+ // Whether we are receiving notifications from CP2.
+ @GuardedBy("mDeltaUpdateLock")
+ private boolean mCp2ChangePending = false;
private final AppSearchHelper mAppSearchHelper;
private final ContactsIndexerImpl mContactsIndexerImpl;
@@ -309,14 +317,14 @@
/**
* Does the delta/instant update to sync the contacts from CP2 to AppSearch.
*
- * <p>{@link #mDeltaUpdatePending} is being used to avoid scheduling any update BEFORE an active
- * update is being processed.
+ * <p>{@link #mDeltaUpdateScheduled} is being used to avoid scheduling any update BEFORE an
+ * active update finishes.
*
* <p>{@link #mSingleThreadedExecutor} is being used to make sure there is one and only one
- * running update, and at most one pending update is queued while the current active update is
- * running.
+ * delta update can be scheduled and run.
*/
- private void handleDeltaUpdate() {
+ @VisibleForTesting
+ /*package*/ void handleDeltaUpdate() {
if (!ContentResolver.getCurrentSyncs().isEmpty()) {
// TODO(b/221905367): make sure that the delta update is scheduled as soon
// as the current sync is completed.
@@ -326,44 +334,55 @@
return;
}
- // We want to batch (trigger only one update) on all Contact Updates for the associated
- // user within the time window(delaySec). And we hope the query to CP2 "Give me all the
- // contacts from timestamp T" would catch all the unhandled contact change notifications.
- if (!mDeltaUpdatePending.getAndSet(true)) {
- executeOnSingleThreadedExecutor(() -> {
- ContactsUpdateStats updateStats = new ContactsUpdateStats();
- // TODO(b/226489369): apply instant indexing limit on CP2 changes also?
- // TODO(b/222126568): refactor doDeltaUpdateAsync() to return a future value of
- // ContactsUpdateStats so that it can be checked and logged here, instead of the
- // placeholder exceptionally() block that only logs to the console.
- doDeltaUpdateAsync(mContactsIndexerConfig.getContactsDeltaUpdateLimit(),
- updateStats).exceptionally(t -> {
- if (LogUtil.DEBUG) {
- Log.d(TAG, "Failed to index CP2 change", t);
- }
- return null;
- });
- });
+ synchronized (mDeltaUpdateLock) {
+ // Record that a CP2 change notification has been received, and will be handled
+ // by the next delta update task.
+ mCp2ChangePending = true;
+ scheduleDeltaUpdateLocked();
}
}
/**
- * Does the delta update. It also resets {@link ContactsIndexerUserInstance#mDeltaUpdatePending}
- * to false.
+ * Schedule a delta update. No new delta update can be scheduled if there is one delta update
+ * already scheduled or currently being run.
+ *
+ * <p>ATTENTION!!! This function needs to be light weight since it is being called by CP2 with a
+ * lock.
+ */
+ @GuardedBy("mDeltaUpdateLock")
+ private void scheduleDeltaUpdateLocked() {
+ if (mDeltaUpdateScheduled) {
+ return;
+ }
+ mDeltaUpdateScheduled = true;
+ executeOnSingleThreadedExecutor(() -> {
+ ContactsUpdateStats updateStats = new ContactsUpdateStats();
+ // TODO(b/226489369): apply instant indexing limit on CP2 changes also?
+ // TODO(b/222126568): refactor doDeltaUpdateAsync() to return a future value of
+ // ContactsUpdateStats so that it can be checked and logged here, instead of the
+ // placeholder exceptionally() block that only logs to the console.
+ doDeltaUpdateAsync(mContactsIndexerConfig.getContactsDeltaUpdateLimit(),
+ updateStats).exceptionally(t -> {
+ if (LogUtil.DEBUG) {
+ Log.d(TAG, "Failed to index CP2 change", t);
+ }
+ return null;
+ });
+ });
+ }
+
+ /**
+ * Does the delta update. It also resets
+ * {@link ContactsIndexerUserInstance#mDeltaUpdateScheduled} to false.
*/
@VisibleForTesting
/*package*/ CompletableFuture<Void> doDeltaUpdateAsync(
int indexingLimit, @NonNull ContactsUpdateStats updateStats) {
- // Reset the delta update pending flag at the top of this method. This allows the next
- // ContentObserver.onChange() notification to schedule another delta-update task on the
- // executor. Note that additional change notifications will not schedule more
- // delta-update
- // tasks.
- // Resetting the delta update pending flag after calling ContentResolver.query() to get
- // the updated contact IDs and deleted contact IDs runs the risk of a race condition
- // where a change notification is sent and handled after the query() ends but before the
- // flag is reset.
- mDeltaUpdatePending.set(false);
+ synchronized (mDeltaUpdateLock) {
+ // Record that the CP2 change notification is being handled by this delta update task.
+ mCp2ChangePending = false;
+ }
+
updateStats.mUpdateType = ContactsUpdateStats.DELTA_UPDATE;
updateStats.mUpdateAndDeleteStartTimeMillis = System.currentTimeMillis();
long lastDeltaUpdateTimestampMillis = mSettings.getLastDeltaUpdateTimestampMillis();
@@ -392,46 +411,61 @@
// little complicated.
return mContactsIndexerImpl.updatePersonCorpusAsync(wantedIds, unWantedIds, updateStats)
.handle((x, t) -> {
- if (t != null) {
- Log.w(TAG, "Failed to perform delta update", t);
- // Just clear all the remaining contacts in case of error.
- mContactsIndexerImpl.cancelUpdatePersonCorpus();
- if (updateStats.mUpdateStatuses.isEmpty()
- && updateStats.mDeleteStatuses.isEmpty()) {
- // Somehow this error is not reflected in the stats, and
- // unfortunately we don't know which part is wrong. Just add an error
- // code for the update.
- updateStats.mUpdateStatuses.add(
- AppSearchResult.RESULT_UNKNOWN_ERROR);
+ try {
+ if (t != null) {
+ Log.w(TAG, "Failed to perform delta update", t);
+ // Just clear all the remaining contacts in case of error.
+ mContactsIndexerImpl.cancelUpdatePersonCorpus();
+ if (updateStats.mUpdateStatuses.isEmpty()
+ && updateStats.mDeleteStatuses.isEmpty()) {
+ // Somehow this error is not reflected in the stats, and
+ // unfortunately we don't know which part is wrong. Just add an
+ // error code for the update.
+ updateStats.mUpdateStatuses.add(
+ AppSearchResult.RESULT_UNKNOWN_ERROR);
+ }
+ }
+ // Persisting timestamping and logging, no matter if update succeeds or not.
+ if (LogUtil.DEBUG) {
+ Log.d(TAG, "updated timestamps --"
+ + " lastDeltaUpdateTimestampMillis: "
+ + mostRecentContactLastUpdateTimestampMillis
+ + " lastDeltaDeleteTimestampMillis: "
+ + mostRecentContactDeletedTimestampMillis);
+ }
+ mSettings.setLastDeltaUpdateTimestampMillis(
+ mostRecentContactLastUpdateTimestampMillis);
+ mSettings.setLastDeltaDeleteTimestampMillis(
+ mostRecentContactDeletedTimestampMillis);
+ persistSettings();
+ logStats(updateStats);
+ if (updateStats.mUpdateStatuses.contains(
+ AppSearchResult.RESULT_OUT_OF_SPACE)) {
+ // Some indexing failed due to OUT_OF_SPACE from AppSearch. We can
+ // simply schedule a full update so we can trim the Person corpus in
+ // AppSearch to make some room for delta update. We need to monitor
+ // the failure count and reasons for indexing during full update to
+ // see if that limit (10,000) is too big right now, considering we
+ // are sharing this limit with any AppSearch clients, e.g.
+ // ShortcutManager, in the system server.
+ ContactsIndexerMaintenanceService.scheduleFullUpdateJob(mContext,
+ mContext.getUser().getIdentifier(), /*periodic=*/ false,
+ /*intervalMillis=*/ -1);
+ }
+
+ return null;
+ } finally {
+ synchronized (mDeltaUpdateLock) {
+ // The current delta update is done. Reset the flag so new delta
+ // update can be scheduled and run.
+ mDeltaUpdateScheduled = false;
+ // If another CP2 change notifications were received while this delta
+ // update task was running, schedule it again.
+ if (mCp2ChangePending) {
+ scheduleDeltaUpdateLocked();
+ }
}
}
- // Persisting timestamping and logging, no matter if update succeeds or not.
- if (LogUtil.DEBUG) {
- Log.d(TAG, "updated timestamps --"
- + " lastDeltaUpdateTimestampMillis: "
- + mostRecentContactLastUpdateTimestampMillis
- + " lastDeltaDeleteTimestampMillis: "
- + mostRecentContactDeletedTimestampMillis);
- }
- mSettings.setLastDeltaUpdateTimestampMillis(
- mostRecentContactLastUpdateTimestampMillis);
- mSettings.setLastDeltaDeleteTimestampMillis(
- mostRecentContactDeletedTimestampMillis);
- persistSettings();
- logStats(updateStats);
- if (updateStats.mUpdateStatuses.contains(AppSearchResult.RESULT_OUT_OF_SPACE)) {
- // Some indexing failed due to OUT_OF_SPACE from AppSearch. We can simply
- // schedule a full update so we can trim the Person corpus in AppSearch
- // to make some room for delta update. We need to monitor the failure
- // count and reasons for indexing during full update to see if that limit
- // (10,000) is too big right now, considering we are sharing this limit
- // with any AppSearch clients, e.g. ShortcutManager, in the system server.
- ContactsIndexerMaintenanceService.scheduleFullUpdateJob(mContext,
- mContext.getUser().getIdentifier(), /*periodic=*/ false,
- /*intervalMillis=*/ -1);
- }
-
- return null;
});
}
diff --git a/testing/contactsindexertests/src/com/android/server/appsearch/contactsindexer/ContactsIndexerUserInstanceTest.java b/testing/contactsindexertests/src/com/android/server/appsearch/contactsindexer/ContactsIndexerUserInstanceTest.java
index 72d10b6..5df0f0f 100644
--- a/testing/contactsindexertests/src/com/android/server/appsearch/contactsindexer/ContactsIndexerUserInstanceTest.java
+++ b/testing/contactsindexertests/src/com/android/server/appsearch/contactsindexer/ContactsIndexerUserInstanceTest.java
@@ -69,11 +69,14 @@
import java.io.File;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -137,6 +140,101 @@
}
@Test
+ public void testHandleMultipleNotifications_onlyOneDeltaUpdateCanBeScheduledAndRun()
+ throws Exception {
+ try {
+ long dataQueryDelayMs = 5000;
+ getProvider().setDataQueryDelayMs(dataQueryDelayMs);
+ BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+ ThreadPoolExecutor singleThreadedExecutor =
+ new ThreadPoolExecutor(/*corePoolSize=*/1, /*maximumPoolSize=*/
+ 1, /*KeepAliveTime=*/ 0L, TimeUnit.MILLISECONDS, queue);
+ ContactsIndexerUserInstance instance = ContactsIndexerUserInstance.createInstance(
+ mContext, mContactsDir,
+ mConfigForTest, singleThreadedExecutor);
+
+ int numOfNotifications = 20;
+ for (int i = 0; i < numOfNotifications / 2; ++i) {
+ int docCount = 2;
+ // Insert contacts to trigger delta update.
+ ContentResolver resolver = mContext.getContentResolver();
+ ContentValues dummyValues = new ContentValues();
+ for (int j = 0; j < docCount; j++) {
+ resolver.insert(ContactsContract.Contacts.CONTENT_URI, dummyValues);
+ }
+ instance.handleDeltaUpdate();
+ }
+ // Sleep here so the active delta update can be run for some time. While
+ // notifications come before and afterwards.
+ Thread.sleep(1500);
+ long totalTaskAfterFirstDeltaUpdate = singleThreadedExecutor.getTaskCount();
+ for (int i = 0; i < numOfNotifications / 2; ++i) {
+ int docCount = 2;
+ // Insert contacts to trigger delta update.
+ ContentResolver resolver = mContext.getContentResolver();
+ ContentValues dummyValues = new ContentValues();
+ for (int j = 0; j < docCount; j++) {
+ resolver.insert(ContactsContract.Contacts.CONTENT_URI, dummyValues);
+ }
+ instance.handleDeltaUpdate();
+ }
+
+ // The total task count will be increased if there is another delta update scheduled.
+ assertThat(singleThreadedExecutor.getTaskCount()).isEqualTo(
+ totalTaskAfterFirstDeltaUpdate);
+ } finally {
+ getProvider().setDataQueryDelayMs(0);
+ }
+ }
+
+ @Test
+ public void testHandleNotificationDuringUpdate_oneAdditionalUpdateWillBeRun()
+ throws Exception {
+ try {
+ long dataQueryDelayMs = 5000;
+ getProvider().setDataQueryDelayMs(dataQueryDelayMs);
+ BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+ ThreadPoolExecutor singleThreadedExecutor =
+ new ThreadPoolExecutor(/*corePoolSize=*/1, /*maximumPoolSize=*/
+ 1, /*KeepAliveTime=*/ 0L, TimeUnit.MILLISECONDS, queue);
+ ContactsIndexerUserInstance instance = ContactsIndexerUserInstance.createInstance(
+ mContext, mContactsDir,
+ mConfigForTest, singleThreadedExecutor);
+ int docCount = 10;
+ // Insert contacts to trigger delta update.
+ ContentResolver resolver = mContext.getContentResolver();
+ ContentValues dummyValues = new ContentValues();
+
+ for (int j = 0; j < docCount; j++) {
+ resolver.insert(ContactsContract.Contacts.CONTENT_URI, dummyValues);
+ }
+ instance.handleDeltaUpdate();
+ // Sleep here so the active delta update can be run for some time to make sure
+ // notification come during an update.
+ Thread.sleep(1500);
+ long totalTaskAfterFirstDeltaUpdate = singleThreadedExecutor.getTaskCount();
+ // Insert contacts to trigger delta update.
+ for (int j = 0; j < docCount; j++) {
+ resolver.insert(ContactsContract.Contacts.CONTENT_URI, dummyValues);
+ }
+ instance.handleDeltaUpdate();
+
+ //The 2nd update won't be scheduled right away.
+ assertThat(singleThreadedExecutor.getTaskCount()).isEqualTo(
+ totalTaskAfterFirstDeltaUpdate);
+
+ // To make sure the 1st update has been finished.
+ Thread.sleep(dataQueryDelayMs);
+
+ // This means additional task has been run by the 1st delta update to handle
+ // the change notification.
+ assertThat(singleThreadedExecutor.getActiveCount()).isEqualTo(1);
+ } finally {
+ getProvider().setDataQueryDelayMs(0);
+ }
+ }
+
+ @Test
public void testCreateInstance_dataDirectoryCreatedAsynchronously() throws Exception {
File dataDir = new File(mTemporaryFolder.newFolder(), "contacts");
boolean isDataDirectoryCreatedSynchronously = mSingleThreadedExecutor.submit(() -> {
@@ -279,7 +377,7 @@
}
executeAndWaitForCompletion(mInstance.doDeltaUpdateAsync(/*indexingLimit=*/ -1,
- mUpdateStats),
+ mUpdateStats),
mSingleThreadedExecutor);
AppSearchHelper searchHelper = AppSearchHelper.createAppSearchHelper(mContext,
@@ -317,7 +415,7 @@
}
executeAndWaitForCompletion(mInstance.doDeltaUpdateAsync(/*indexingLimit=*/ 100,
- mUpdateStats),
+ mUpdateStats),
mSingleThreadedExecutor);
AppSearchHelper searchHelper = AppSearchHelper.createAppSearchHelper(mContext,
@@ -337,7 +435,7 @@
}
executeAndWaitForCompletion(mInstance.doDeltaUpdateAsync(/*indexingLimit=*/ -1,
- mUpdateStats),
+ mUpdateStats),
mSingleThreadedExecutor);
// Delete a few contacts to trigger delta update.
@@ -351,7 +449,7 @@
/*extras=*/ null);
executeAndWaitForCompletion(mInstance.doDeltaUpdateAsync(/*indexingLimit=*/ -1,
- mUpdateStats),
+ mUpdateStats),
mSingleThreadedExecutor);
AppSearchHelper searchHelper = AppSearchHelper.createAppSearchHelper(mContext,
@@ -401,7 +499,7 @@
mUpdateStats.clear();
executeAndWaitForCompletion(mInstance.doDeltaUpdateAsync(/*indexingLimit=*/ -1,
- mUpdateStats),
+ mUpdateStats),
mSingleThreadedExecutor);
AppSearchHelper searchHelper = AppSearchHelper.createAppSearchHelper(mContext,
@@ -462,7 +560,7 @@
mUpdateStats.clear();
executeAndWaitForCompletion(mInstance.doDeltaUpdateAsync(/*indexingLimit=*/ -1,
- mUpdateStats),
+ mUpdateStats),
mSingleThreadedExecutor);
AppSearchHelper searchHelper = AppSearchHelper.createAppSearchHelper(mContext,
diff --git a/testing/contactsindexertests/src/com/android/server/appsearch/contactsindexer/FakeContactsProvider.java b/testing/contactsindexertests/src/com/android/server/appsearch/contactsindexer/FakeContactsProvider.java
index 9498cde..6e2f4c8 100644
--- a/testing/contactsindexertests/src/com/android/server/appsearch/contactsindexer/FakeContactsProvider.java
+++ b/testing/contactsindexertests/src/com/android/server/appsearch/contactsindexer/FakeContactsProvider.java
@@ -134,6 +134,9 @@
private long mMostRecentContactLastUpdatedTimestampMillis;
private long mMostRecentDeletedContactTimestampMillis;
+ // Data Query delay in millis added for testing
+ private long mDataQueryDelayMs = 0;
+
// Only odd contactIds should have additional data.
private static boolean shouldhaveAdditionalData(long contactId) {
return (contactId & 1) > 0;
@@ -280,6 +283,10 @@
this(ApplicationProvider.getApplicationContext().getResources());
}
+ public void setDataQueryDelayMs(long dataQueryDelayMs) {
+ mDataQueryDelayMs = dataQueryDelayMs;
+ }
+
FakeContactsProvider(Resources resources) {
mResources = resources;
}
@@ -320,6 +327,14 @@
// together. We only process CONTACTS_DATA_ORDER_BY.
protected Cursor manageDataQuery(
String[] projection, String selection, String[] selectionArgs, String orderBy) {
+ if (mDataQueryDelayMs > 0) {
+ try {
+ Thread.sleep(mDataQueryDelayMs);
+ } catch (InterruptedException e) {
+ Log.d(TAG, "Got exception while applying data query delay.", e);
+ }
+ }
+
MatrixCursor cursor = null;
// Details in id list.
if (CONTACTS_DATA_ORDER_BY.equals(orderBy) && (projection != null)) {