Adds option to drop small buckets for statsd.

We notice that some of the pulled metrics have a ton of data, and
during app upgrades, we're forming partial buckets that represent
small periods of time but require many bytes of data. We now have an
option to drop these buckets that are too short. Note that we still
have to pull the data to keep the metrics for the next bucket
correct. We include a new field in the value and gauge metric outputs
so that it's easy to tell when a bucket was dropped.

We drop the partial buckets also from anomaly detection since we
should be computing anomalies from the same data that is reported.

Test: Added unit-tests for value and gauge metrics.
Bug: 77925710
Change-Id: Ic370496377c6afd380e02278a6c1ed8b521a2731
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 844c728..27fd78f 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -50,6 +50,9 @@
 const int FIELD_ID_VALUE_METRICS = 7;
 // for ValueMetricDataWrapper
 const int FIELD_ID_DATA = 1;
+const int FIELD_ID_SKIPPED = 2;
+const int FIELD_ID_SKIPPED_START = 1;
+const int FIELD_ID_SKIPPED_END = 2;
 // for ValueMetricData
 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
 const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
@@ -69,6 +72,7 @@
       mValueField(metric.value_field()),
       mStatsPullerManager(statsPullerManager),
       mPullTagId(pullTagId),
+      mMinBucketSizeNs(metric.min_bucket_size_nanos()),
       mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
@@ -156,12 +160,21 @@
     } else {
         flushIfNeededLocked(dumpTimeNs);
     }
-    if (mPastBuckets.empty()) {
+    if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
         return;
     }
     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
     uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);
 
+    for (const auto& pair : mSkippedBuckets) {
+        uint64_t wrapperToken =
+                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
+        protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START, (long long)pair.first);
+        protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END, (long long)pair.second);
+        protoOutput->end(wrapperToken);
+    }
+    mSkippedBuckets.clear();
+
     for (const auto& pair : mPastBuckets) {
         const MetricDimensionKey& dimensionKey = pair.first;
         VLOG("  dimension key %s", dimensionKey.toString().c_str());
@@ -391,18 +404,23 @@
         info.mBucketEndNs = fullBucketEndTimeNs;
     }
 
-    int tainted = 0;
-    for (const auto& slice : mCurrentSlicedBucket) {
-        tainted += slice.second.tainted;
-        tainted += slice.second.startUpdated;
-        if (slice.second.hasValue) {
-            info.mValue = slice.second.sum;
-            // it will auto create new vector of ValuebucketInfo if the key is not found.
-            auto& bucketList = mPastBuckets[slice.first];
-            bucketList.push_back(info);
+    if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
+        // The current bucket is large enough to keep.
+        int tainted = 0;
+        for (const auto& slice : mCurrentSlicedBucket) {
+            tainted += slice.second.tainted;
+            tainted += slice.second.startUpdated;
+            if (slice.second.hasValue) {
+                info.mValue = slice.second.sum;
+                // it will auto create new vector of ValuebucketInfo if the key is not found.
+                auto& bucketList = mPastBuckets[slice.first];
+                bucketList.push_back(info);
+            }
         }
+        VLOG("%d tainted pairs in the bucket", tainted);
+    } else {
+        mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
     }
-    VLOG("%d tainted pairs in the bucket", tainted);
 
     if (eventTimeNs > fullBucketEndTimeNs) {  // If full bucket, send to anomaly tracker.
         // Accumulate partial buckets with current value and then send to anomaly tracker.