Puller refactor

1) Refactor pullers and add tests.

2) Add timeout to a puller.
mPullTimeoutNs is intrinsic to puller. A pull taking longer than this is
deemed failed and the data discarded.
A metric or StatsPullerManager requesting a pull should monitor the pull
and have deadlineNs. A successful pull may come later than desired due
to statsd processing delays.

3) Add unit tests to puller now that the base puller is more
complicated.

Bug: 118756964
Test: unit test
Change-Id: I0e5d47e2527391f7beef4b2d06bfd5c2f82f1179
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index cf56e2d..9a8e3bd 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -103,7 +103,9 @@
       mValueDirection(metric.value_direction()),
       mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
       mUseZeroDefaultBase(metric.use_zero_default_base()),
-      mHasGlobalBase(false) {
+      mHasGlobalBase(false),
+      mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
+                                                      : StatsdStats::kPullMaxDelayNs) {
     int64_t bucketSizeMills = 0;
     if (metric.has_bucket()) {
         bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
@@ -340,19 +342,32 @@
 
 void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
     vector<std::shared_ptr<LogEvent>> allData;
-    if (mPullerManager->Pull(mPullTagId, timestampNs, &allData)) {
-        for (const auto& data : allData) {
-            if (mEventMatcherWizard->matchLogEvent(
-                *data, mWhatMatcherIndex) == MatchingState::kMatched) {
-                onMatchedLogEventLocked(mWhatMatcherIndex, *data);
-            }
-        }
-        mHasGlobalBase = true;
-    } else {
-        // for pulled data, every pull is needed. So we reset the base if any
-        // pull fails.
+    if (!mPullerManager->Pull(mPullTagId, &allData)) {
+        ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
         resetBase();
+        return;
     }
+    const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs;
+    if (pullDelayNs > mMaxPullDelayNs) {
+        ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
+              (long long)mMaxPullDelayNs);
+        StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
+        StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
+        resetBase();
+        return;
+    }
+    StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
+
+    for (const auto& data : allData) {
+        // make a copy before doing and changes
+        LogEvent localCopy = data->makeCopy();
+        localCopy.setElapsedTimestampNs(timestampNs);
+        if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
+            MatchingState::kMatched) {
+            onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
+        }
+    }
+    mHasGlobalBase = true;
 }
 
 int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
@@ -381,10 +396,11 @@
             return;
         }
         for (const auto& data : allData) {
-            if (mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex) ==
+            LogEvent localCopy = data->makeCopy();
+            if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
                 MatchingState::kMatched) {
-                data->setElapsedTimestampNs(bucketEndTime);
-                onMatchedLogEventLocked(mWhatMatcherIndex, *data);
+                localCopy.setElapsedTimestampNs(bucketEndTime);
+                onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
             }
         }
         mHasGlobalBase = true;