optional default base for ValueMetric

For pulled atoms, if use_zero_default_base is set to true, and if use_diff, the first data
piece will assume 0 value as base.
This requires a successful previous pull to set mHasGlobalBase.
mHasGlobalBase is reset when condition changes to false or we skip more
than 1 bucket.

Bug: 120476027
Bug: 120129928
Test: unit test
Change-Id: Id01a7bf8796394777f02ba2c9bfdc860f528b98f
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index a34df8aa..bff2334 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -72,17 +72,15 @@
 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
 
+const Value ZERO_LONG((int64_t)0);
+const Value ZERO_DOUBLE((int64_t)0);
+
 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
-ValueMetricProducer::ValueMetricProducer(const ConfigKey& key,
-                                         const ValueMetric& metric,
-                                         const int conditionIndex,
-                                         const sp<ConditionWizard>& conditionWizard,
-                                         const int whatMatcherIndex,
-                                         const sp<EventMatcherWizard>& matcherWizard,
-                                         const int pullTagId,
-                                         const int64_t timeBaseNs,
-                                         const int64_t startTimeNs,
-                                         const sp<StatsPullerManager>& pullerManager)
+ValueMetricProducer::ValueMetricProducer(
+        const ConfigKey& key, const ValueMetric& metric, const int conditionIndex,
+        const sp<ConditionWizard>& conditionWizard, const int whatMatcherIndex,
+        const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int64_t timeBaseNs,
+        const int64_t startTimeNs, const sp<StatsPullerManager>& pullerManager)
     : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, conditionWizard),
       mWhatMatcherIndex(whatMatcherIndex),
       mEventMatcherWizard(matcherWizard),
@@ -102,7 +100,9 @@
       mAggregationType(metric.aggregation_type()),
       mUseDiff(metric.has_use_diff() ? metric.use_diff() : (mIsPulled ? true : false)),
       mValueDirection(metric.value_direction()),
-      mSkipZeroDiffOutput(metric.skip_zero_diff_output()) {
+      mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
+      mUseZeroDefaultBase(metric.use_zero_default_base()),
+      mHasGlobalBase(false) {
     int64_t bucketSizeMills = 0;
     if (metric.has_bucket()) {
         bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
@@ -302,6 +302,15 @@
     }
 }
 
+void ValueMetricProducer::resetBase() {
+    for (auto& slice : mCurrentSlicedBucket) {
+        for (auto& interval : slice.second) {
+            interval.hasBase = false;
+        }
+    }
+    mHasGlobalBase = false;
+}
+
 void ValueMetricProducer::onConditionChangedLocked(const bool condition,
                                                    const int64_t eventTimeNs) {
     if (eventTimeNs < mCurrentBucketStartTimeNs) {
@@ -317,13 +326,10 @@
         pullAndMatchEventsLocked(eventTimeNs);
     }
 
-    // when condition change from true to false, clear diff base
+    // when condition change from true to false, clear diff base but don't
+    // reset other counters as we may accumulate more value in the bucket.
     if (mUseDiff && mCondition && !condition) {
-        for (auto& slice : mCurrentSlicedBucket) {
-            for (auto& interval : slice.second) {
-                interval.hasBase = false;
-            }
-        }
+        resetBase();
     }
 
     mCondition = condition;
@@ -332,15 +338,17 @@
 void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
     vector<std::shared_ptr<LogEvent>> allData;
     if (mPullerManager->Pull(mPullTagId, timestampNs, &allData)) {
-        if (allData.size() == 0) {
-            return;
-        }
         for (const auto& data : allData) {
             if (mEventMatcherWizard->matchLogEvent(
                 *data, mWhatMatcherIndex) == MatchingState::kMatched) {
                 onMatchedLogEventLocked(mWhatMatcherIndex, *data);
             }
         }
+        mHasGlobalBase = true;
+    } else {
+        // for pulled data, every pull is needed. So we reset the base if any
+        // pull fails.
+        resetBase();
     }
 }
 
@@ -376,6 +384,7 @@
                 onMatchedLogEventLocked(mWhatMatcherIndex, *data);
             }
         }
+        mHasGlobalBase = true;
     } else {
         VLOG("No need to commit data on condition false.");
     }
@@ -486,11 +495,18 @@
         }
 
         if (mUseDiff) {
-            // no base. just update base and return.
             if (!interval.hasBase) {
-                interval.base = value;
-                interval.hasBase = true;
-                return;
+                if (mHasGlobalBase && mUseZeroDefaultBase) {
+                    // The bucket has global base. This key does not.
+                    // Optionally use zero as base.
+                    interval.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
+                    interval.hasBase = true;
+                } else {
+                    // no base. just update base and return.
+                    interval.base = value;
+                    interval.hasBase = true;
+                    return;
+                }
             }
             Value diff;
             switch (mValueDirection) {
@@ -580,11 +596,7 @@
     if (numBucketsForward > 1) {
         VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
         // take base again in future good bucket.
-        for (auto& slice : mCurrentSlicedBucket) {
-            for (auto& interval : slice.second) {
-                interval.hasBase = false;
-            }
-        }
+        resetBase();
     }
     VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
          (long long)mCurrentBucketStartTimeNs);