Puller refactor

1) Refactor pullers and add tests.

2) Add timeout to a puller.
mPullTimeoutNs is intrinsic to puller. A pull taking longer than this is
deemed failed and the data discarded.
A metric or StatsPullerManager requesting a pull should monitor the pull
and have deadlineNs. A successful pull may come later than desired due
to statsd processing delays.

3) Add unit tests to puller now that the base puller is more
complicated.

Bug: 118756964
Test: unit test
Change-Id: I0e5d47e2527391f7beef4b2d06bfd5c2f82f1179
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index ec60244..67a1a47 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -68,15 +68,12 @@
 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 7;
 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 8;
 
-GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
-                                         const int conditionIndex,
-                                         const sp<ConditionWizard>& wizard,
-                                         const int whatMatcherIndex,
-                                         const sp<EventMatcherWizard>& matcherWizard,
-                                         const int pullTagId,
-                                         const int triggerAtomId, const int atomId,
-                                         const int64_t timeBaseNs, const int64_t startTimeNs,
-                                         const sp<StatsPullerManager>& pullerManager)
+GaugeMetricProducer::GaugeMetricProducer(
+        const ConfigKey& key, const GaugeMetric& metric, const int conditionIndex,
+        const sp<ConditionWizard>& wizard, const int whatMatcherIndex,
+        const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int triggerAtomId,
+        const int atomId, const int64_t timeBaseNs, const int64_t startTimeNs,
+        const sp<StatsPullerManager>& pullerManager)
     : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard),
       mWhatMatcherIndex(whatMatcherIndex),
       mEventMatcherWizard(matcherWizard),
@@ -86,6 +83,8 @@
       mAtomId(atomId),
       mIsPulled(pullTagId != -1),
       mMinBucketSizeNs(metric.min_bucket_size_nanos()),
+      mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
+                                                      : StatsdStats::kPullMaxDelayNs),
       mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
@@ -338,14 +337,24 @@
         return;
     }
     vector<std::shared_ptr<LogEvent>> allData;
-    if (!mPullerManager->Pull(mPullTagId, timestampNs, &allData)) {
+    if (!mPullerManager->Pull(mPullTagId, &allData)) {
         ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
         return;
     }
+    const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs;
+    if (pullDelayNs > mMaxPullDelayNs) {
+        ALOGE("Pull finish too late for atom %d", mPullTagId);
+        StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
+        StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
+        return;
+    }
+    StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
     for (const auto& data : allData) {
-        if (mEventMatcherWizard->matchLogEvent(
-                *data, mWhatMatcherIndex) == MatchingState::kMatched) {
-            onMatchedLogEventLocked(mWhatMatcherIndex, *data);
+        LogEvent localCopy = data->makeCopy();
+        localCopy.setElapsedTimestampNs(timestampNs);
+        if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
+            MatchingState::kMatched) {
+            onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
         }
     }
 }
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index 6e3530b..a1a5061 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -160,6 +160,8 @@
 
     GaugeMetric::SamplingType mSamplingType;
 
+    const int64_t mMaxPullDelayNs;
+
     // apply a whitelist on the original input
     std::shared_ptr<vector<FieldValue>> getGaugeFields(const LogEvent& event);
 
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index cf56e2d..9a8e3bd 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -103,7 +103,9 @@
       mValueDirection(metric.value_direction()),
       mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
       mUseZeroDefaultBase(metric.use_zero_default_base()),
-      mHasGlobalBase(false) {
+      mHasGlobalBase(false),
+      mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
+                                                      : StatsdStats::kPullMaxDelayNs) {
     int64_t bucketSizeMills = 0;
     if (metric.has_bucket()) {
         bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
@@ -340,19 +342,32 @@
 
 void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
     vector<std::shared_ptr<LogEvent>> allData;
-    if (mPullerManager->Pull(mPullTagId, timestampNs, &allData)) {
-        for (const auto& data : allData) {
-            if (mEventMatcherWizard->matchLogEvent(
-                *data, mWhatMatcherIndex) == MatchingState::kMatched) {
-                onMatchedLogEventLocked(mWhatMatcherIndex, *data);
-            }
-        }
-        mHasGlobalBase = true;
-    } else {
-        // for pulled data, every pull is needed. So we reset the base if any
-        // pull fails.
+    if (!mPullerManager->Pull(mPullTagId, &allData)) {
+        ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
         resetBase();
+        return;
     }
+    const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs;
+    if (pullDelayNs > mMaxPullDelayNs) {
+        ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
+              (long long)mMaxPullDelayNs);
+        StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
+        StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
+        resetBase();
+        return;
+    }
+    StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
+
+    for (const auto& data : allData) {
+        // make a copy before doing and changes
+        LogEvent localCopy = data->makeCopy();
+        localCopy.setElapsedTimestampNs(timestampNs);
+        if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
+            MatchingState::kMatched) {
+            onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
+        }
+    }
+    mHasGlobalBase = true;
 }
 
 int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
@@ -381,10 +396,11 @@
             return;
         }
         for (const auto& data : allData) {
-            if (mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex) ==
+            LogEvent localCopy = data->makeCopy();
+            if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
                 MatchingState::kMatched) {
-                data->setElapsedTimestampNs(bucketEndTime);
-                onMatchedLogEventLocked(mWhatMatcherIndex, *data);
+                localCopy.setElapsedTimestampNs(bucketEndTime);
+                onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
             }
         }
         mHasGlobalBase = true;
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 4991af4..4865aee 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -183,6 +183,8 @@
     // diff against.
     bool mHasGlobalBase;
 
+    const int64_t mMaxPullDelayNs;
+
     FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsNoCondition);
     FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering);
     FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset);
@@ -207,6 +209,8 @@
     FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase);
     FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures);
     FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey);
+    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFail);
+    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate);
 };
 
 }  // namespace statsd