Adds option to drop small buckets for statsd.

We notice that some of the pulled metrics have a ton of data, and
during app upgrades, we're forming partial buckets that represent
small periods of time but require many bytes of data. We now have an
option to drop these buckets that are too short. Note that we still
have to pull the data to keep the metrics for the next bucket
correct. We include a new field in the value and gauge metric outputs
so that it's easy to tell when a bucket was dropped.

We drop the partial buckets also from anomaly detection since we
should be computing anomalies from the same data that is reported.

Test: Added unit-tests for value and gauge metrics.
Bug: 77925710
Change-Id: Ic370496377c6afd380e02278a6c1ed8b521a2731
diff --git a/cmds/statsd/src/StatsService.h b/cmds/statsd/src/StatsService.h
index 774a3e9..b5957b5 100644
--- a/cmds/statsd/src/StatsService.h
+++ b/cmds/statsd/src/StatsService.h
@@ -272,6 +272,10 @@
     FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnUpgrade);
     FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnRemoval);
     FRIEND_TEST(PartialBucketE2eTest, TestCountMetricWithoutSplit);
+    FRIEND_TEST(PartialBucketE2eTest, TestValueMetricWithoutMinPartialBucket);
+    FRIEND_TEST(PartialBucketE2eTest, TestValueMetricWithMinPartialBucket);
+    FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricWithoutMinPartialBucket);
+    FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricWithMinPartialBucket);
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index 6886f7c..1270856 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -47,6 +47,9 @@
 const int FIELD_ID_GAUGE_METRICS = 8;
 // for GaugeMetricDataWrapper
 const int FIELD_ID_DATA = 1;
+const int FIELD_ID_SKIPPED = 2;
+const int FIELD_ID_SKIPPED_START = 1;
+const int FIELD_ID_SKIPPED_END = 2;
 // for GaugeMetricData
 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
 const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
@@ -66,6 +69,7 @@
     : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard),
       mStatsPullerManager(statsPullerManager),
       mPullTagId(pullTagId),
+      mMinBucketSizeNs(metric.min_bucket_size_nanos()),
       mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
@@ -174,6 +178,15 @@
     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
     uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
 
+    for (const auto& pair : mSkippedBuckets) {
+        uint64_t wrapperToken =
+                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
+        protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START, (long long)pair.first);
+        protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END, (long long)pair.second);
+        protoOutput->end(wrapperToken);
+    }
+    mSkippedBuckets.clear();
+
     for (const auto& pair : mPastBuckets) {
         const MetricDimensionKey& dimensionKey = pair.first;
 
@@ -440,12 +453,16 @@
         info.mBucketEndNs = fullBucketEndTimeNs;
     }
 
-    for (const auto& slice : *mCurrentSlicedBucket) {
-        info.mGaugeAtoms = slice.second;
-        auto& bucketList = mPastBuckets[slice.first];
-        bucketList.push_back(info);
-        VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId,
-             slice.first.toString().c_str());
+    if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
+        for (const auto& slice : *mCurrentSlicedBucket) {
+            info.mGaugeAtoms = slice.second;
+            auto& bucketList = mPastBuckets[slice.first];
+            bucketList.push_back(info);
+            VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId,
+                 slice.first.toString().c_str());
+        }
+    } else {
+        mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
     }
 
     // If we have anomaly trackers, we need to update the partial bucket values.
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index 08765c2..71d5912 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -136,6 +136,11 @@
     // this slice (ie, for partial buckets, we use the last partial bucket in this full bucket).
     std::shared_ptr<DimToValMap> mCurrentSlicedBucketForAnomaly;
 
+    // Pairs of (elapsed start, elapsed end) denoting buckets that were skipped.
+    std::list<std::pair<int64_t, int64_t>> mSkippedBuckets;
+
+    const int64_t mMinBucketSizeNs;
+
     // Translate Atom based bucket to single numeric value bucket for anomaly and updates the map
     // for each slice with the latest value.
     void updateCurrentSlicedBucketForAnomaly();
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 844c728..27fd78f 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -50,6 +50,9 @@
 const int FIELD_ID_VALUE_METRICS = 7;
 // for ValueMetricDataWrapper
 const int FIELD_ID_DATA = 1;
+const int FIELD_ID_SKIPPED = 2;
+const int FIELD_ID_SKIPPED_START = 1;
+const int FIELD_ID_SKIPPED_END = 2;
 // for ValueMetricData
 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
 const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
@@ -69,6 +72,7 @@
       mValueField(metric.value_field()),
       mStatsPullerManager(statsPullerManager),
       mPullTagId(pullTagId),
+      mMinBucketSizeNs(metric.min_bucket_size_nanos()),
       mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
@@ -156,12 +160,21 @@
     } else {
         flushIfNeededLocked(dumpTimeNs);
     }
-    if (mPastBuckets.empty()) {
+    if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
         return;
     }
     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
     uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);
 
+    for (const auto& pair : mSkippedBuckets) {
+        uint64_t wrapperToken =
+                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
+        protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START, (long long)pair.first);
+        protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END, (long long)pair.second);
+        protoOutput->end(wrapperToken);
+    }
+    mSkippedBuckets.clear();
+
     for (const auto& pair : mPastBuckets) {
         const MetricDimensionKey& dimensionKey = pair.first;
         VLOG("  dimension key %s", dimensionKey.toString().c_str());
@@ -391,18 +404,23 @@
         info.mBucketEndNs = fullBucketEndTimeNs;
     }
 
-    int tainted = 0;
-    for (const auto& slice : mCurrentSlicedBucket) {
-        tainted += slice.second.tainted;
-        tainted += slice.second.startUpdated;
-        if (slice.second.hasValue) {
-            info.mValue = slice.second.sum;
-            // it will auto create new vector of ValuebucketInfo if the key is not found.
-            auto& bucketList = mPastBuckets[slice.first];
-            bucketList.push_back(info);
+    if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
+        // The current bucket is large enough to keep.
+        int tainted = 0;
+        for (const auto& slice : mCurrentSlicedBucket) {
+            tainted += slice.second.tainted;
+            tainted += slice.second.startUpdated;
+            if (slice.second.hasValue) {
+                info.mValue = slice.second.sum;
+                // it will auto create new vector of ValuebucketInfo if the key is not found.
+                auto& bucketList = mPastBuckets[slice.first];
+                bucketList.push_back(info);
+            }
         }
+        VLOG("%d tainted pairs in the bucket", tainted);
+    } else {
+        mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
     }
-    VLOG("%d tainted pairs in the bucket", tainted);
 
     if (eventTimeNs > fullBucketEndTimeNs) {  // If full bucket, send to anomaly tracker.
         // Accumulate partial buckets with current value and then send to anomaly tracker.
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 9c5a56c..8df30d3 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -148,6 +148,11 @@
     // TODO: Add a lock to mPastBuckets.
     std::unordered_map<MetricDimensionKey, std::vector<ValueBucket>> mPastBuckets;
 
+    // Pairs of (elapsed start, elapsed end) denoting buckets that were skipped.
+    std::list<std::pair<int64_t, int64_t>> mSkippedBuckets;
+
+    const int64_t mMinBucketSizeNs;
+
     // Util function to check whether the specified dimension hits the guardrail.
     bool hitGuardRailLocked(const MetricDimensionKey& newKey);
 
diff --git a/cmds/statsd/src/packages/UidMap.cpp b/cmds/statsd/src/packages/UidMap.cpp
index b3425a4..8eb5327 100644
--- a/cmds/statsd/src/packages/UidMap.cpp
+++ b/cmds/statsd/src/packages/UidMap.cpp
@@ -166,6 +166,8 @@
         } else {
             // Only notify the listeners if this is an app upgrade. If this app is being installed
             // for the first time, then we don't notify the listeners.
+            // It's also OK to split again if we're forming a partial bucket after re-installing an
+            // app after deletion.
             getListenerListCopyLocked(&broadcastList);
         }
         mChanges.emplace_back(false, timestamp, appName, uid, versionCode, prevVersion);
diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto
index eaa7bf1..e1b6386 100644
--- a/cmds/statsd/src/stats_log.proto
+++ b/cmds/statsd/src/stats_log.proto
@@ -121,6 +121,11 @@
 
   // Fields 2 and 3 are reserved.
 
+  message SkippedBuckets {
+      optional int64 start_elapsed_nanos = 1;
+      optional int64 end_elapsed_nanos = 2;
+  }
+
   message EventMetricDataWrapper {
     repeated EventMetricData data = 1;
   }
@@ -132,10 +137,12 @@
   }
   message ValueMetricDataWrapper {
     repeated ValueMetricData data = 1;
+    repeated SkippedBuckets skipped = 2;
   }
 
   message GaugeMetricDataWrapper {
     repeated GaugeMetricData data = 1;
+    repeated SkippedBuckets skipped = 2;
   }
 
   oneof data {
diff --git a/cmds/statsd/src/statsd_config.proto b/cmds/statsd/src/statsd_config.proto
index 870f92e..fd36560 100644
--- a/cmds/statsd/src/statsd_config.proto
+++ b/cmds/statsd/src/statsd_config.proto
@@ -236,6 +236,8 @@
     ALL_CONDITION_CHANGES = 2;
   }
   optional SamplingType sampling_type = 9 [default = RANDOM_ONE_SAMPLE] ;
+
+  optional int64 min_bucket_size_nanos = 10;
 }
 
 message ValueMetric {
@@ -259,6 +261,8 @@
     SUM = 1;
   }
   optional AggregationType aggregation_type = 8 [default = SUM];
+
+  optional int64 min_bucket_size_nanos = 10;
 }
 
 message Alert {
diff --git a/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp b/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp
index af07f6b..a34aaf0 100644
--- a/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp
+++ b/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp
@@ -14,6 +14,7 @@
 
 #include <gtest/gtest.h>
 
+#include <binder/IPCThreadState.h>
 #include "src/StatsLogProcessor.h"
 #include "src/StatsService.h"
 #include "src/stats_log_util.h"
@@ -39,9 +40,13 @@
     service.addConfiguration(kConfigKey, configAsVec, String16(kAndroid.c_str()));
 }
 
-ConfigMetricsReport GetReports(StatsService& service) {
+ConfigMetricsReport GetReports(sp<StatsLogProcessor> processor, int64_t timestamp,
+                               bool include_current = false) {
     vector<uint8_t> output;
-    service.getData(kConfigKey, String16(kAndroid.c_str()), &output);
+    IPCThreadState* ipc = IPCThreadState::self();
+    ConfigKey configKey(ipc->getCallingUid(), kConfigKey);
+    processor->onDumpReport(configKey, timestamp, include_current /* include_current_bucket*/,
+                            &output);
     ConfigMetricsReportList reports;
     reports.ParseFromArray(output.data(), output.size());
     EXPECT_EQ(1, reports.reports_size());
@@ -61,6 +66,47 @@
     return config;
 }
 
+StatsdConfig MakeValueMetricConfig(int64_t minTime) {
+    StatsdConfig config;
+    config.add_allowed_log_source("AID_ROOT");  // LogEvent defaults to UID of root.
+
+    auto temperatureAtomMatcher = CreateTemperatureAtomMatcher();
+    *config.add_atom_matcher() = temperatureAtomMatcher;
+    *config.add_atom_matcher() = CreateScreenTurnedOnAtomMatcher();
+    *config.add_atom_matcher() = CreateScreenTurnedOffAtomMatcher();
+
+    auto valueMetric = config.add_value_metric();
+    valueMetric->set_id(123456);
+    valueMetric->set_what(temperatureAtomMatcher.id());
+    *valueMetric->mutable_value_field() =
+            CreateDimensions(android::util::TEMPERATURE, {3 /* temperature degree field */});
+    *valueMetric->mutable_dimensions_in_what() =
+            CreateDimensions(android::util::TEMPERATURE, {2 /* sensor name field */});
+    valueMetric->set_bucket(FIVE_MINUTES);
+    valueMetric->set_min_bucket_size_nanos(minTime);
+    return config;
+}
+
+StatsdConfig MakeGaugeMetricConfig(int64_t minTime) {
+    StatsdConfig config;
+    config.add_allowed_log_source("AID_ROOT");  // LogEvent defaults to UID of root.
+
+    auto temperatureAtomMatcher = CreateTemperatureAtomMatcher();
+    *config.add_atom_matcher() = temperatureAtomMatcher;
+    *config.add_atom_matcher() = CreateScreenTurnedOnAtomMatcher();
+    *config.add_atom_matcher() = CreateScreenTurnedOffAtomMatcher();
+
+    auto gaugeMetric = config.add_gauge_metric();
+    gaugeMetric->set_id(123456);
+    gaugeMetric->set_what(temperatureAtomMatcher.id());
+    gaugeMetric->mutable_gauge_fields_filter()->set_include_all(true);
+    *gaugeMetric->mutable_dimensions_in_what() =
+            CreateDimensions(android::util::TEMPERATURE, {2 /* sensor name field */});
+    gaugeMetric->set_bucket(FIVE_MINUTES);
+    gaugeMetric->set_min_bucket_size_nanos(minTime);
+    return config;
+}
+
 TEST(PartialBucketE2eTest, TestCountMetricWithoutSplit) {
     StatsService service(nullptr);
     SendConfig(service, MakeConfig());
@@ -70,7 +116,7 @@
     service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 1).get());
     service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 2).get());
 
-    ConfigMetricsReport report = GetReports(service);
+    ConfigMetricsReport report = GetReports(service.mProcessor, start + 3);
     // Expect no metrics since the bucket has not finished yet.
     EXPECT_EQ(0, report.metrics_size());
 }
@@ -89,7 +135,7 @@
     // Goes into the second bucket.
     service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 3).get());
 
-    ConfigMetricsReport report = GetReports(service);
+    ConfigMetricsReport report = GetReports(service.mProcessor, start + 4);
     EXPECT_EQ(0, report.metrics_size());
 }
 
@@ -106,7 +152,7 @@
     // Goes into the second bucket.
     service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 3).get());
 
-    ConfigMetricsReport report = GetReports(service);
+    ConfigMetricsReport report = GetReports(service.mProcessor, start + 4);
     EXPECT_EQ(1, report.metrics_size());
     EXPECT_EQ(1, report.metrics(0).count_metrics().data(0).bucket_info(0).count());
 }
@@ -124,11 +170,85 @@
     // Goes into the second bucket.
     service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 3).get());
 
-    ConfigMetricsReport report = GetReports(service);
+    ConfigMetricsReport report = GetReports(service.mProcessor, start + 4);
     EXPECT_EQ(1, report.metrics_size());
     EXPECT_EQ(1, report.metrics(0).count_metrics().data(0).bucket_info(0).count());
 }
 
+TEST(PartialBucketE2eTest, TestValueMetricWithoutMinPartialBucket) {
+    StatsService service(nullptr);
+    // Partial buckets don't occur when app is first installed.
+    service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1);
+    SendConfig(service, MakeValueMetricConfig(0));
+    const long start = getElapsedRealtimeNs();  // This is the start-time the metrics producers are
+                                                // initialized with.
+
+    service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start);
+    service.mUidMap->updateApp(5 * 60 * NS_PER_SEC + start + 2, String16(kApp1.c_str()), 1, 2);
+
+    ConfigMetricsReport report =
+            GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100, true);
+    EXPECT_EQ(1, report.metrics_size());
+    EXPECT_EQ(0, report.metrics(0).value_metrics().skipped_size());
+}
+
+TEST(PartialBucketE2eTest, TestValueMetricWithMinPartialBucket) {
+    StatsService service(nullptr);
+    // Partial buckets don't occur when app is first installed.
+    service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1);
+    SendConfig(service, MakeValueMetricConfig(60 * NS_PER_SEC /* One minute */));
+    const long start = getElapsedRealtimeNs();  // This is the start-time the metrics producers are
+                                                // initialized with.
+
+    const int64_t endSkipped = 5 * 60 * NS_PER_SEC + start + 2;
+    service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start);
+    service.mUidMap->updateApp(endSkipped, String16(kApp1.c_str()), 1, 2);
+
+    ConfigMetricsReport report =
+            GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100 * NS_PER_SEC, true);
+    EXPECT_EQ(1, report.metrics_size());
+    EXPECT_EQ(1, report.metrics(0).value_metrics().skipped_size());
+    // Can't test the start time since it will be based on the actual time when the pulling occurs.
+    EXPECT_EQ(endSkipped, report.metrics(0).value_metrics().skipped(0).end_elapsed_nanos());
+}
+
+TEST(PartialBucketE2eTest, TestGaugeMetricWithoutMinPartialBucket) {
+    StatsService service(nullptr);
+    // Partial buckets don't occur when app is first installed.
+    service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1);
+    SendConfig(service, MakeGaugeMetricConfig(0));
+    const long start = getElapsedRealtimeNs();  // This is the start-time the metrics producers are
+                                                // initialized with.
+
+    service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start);
+    service.mUidMap->updateApp(5 * 60 * NS_PER_SEC + start + 2, String16(kApp1.c_str()), 1, 2);
+
+    ConfigMetricsReport report =
+            GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100, true);
+    EXPECT_EQ(1, report.metrics_size());
+    EXPECT_EQ(0, report.metrics(0).gauge_metrics().skipped_size());
+}
+
+TEST(PartialBucketE2eTest, TestGaugeMetricWithMinPartialBucket) {
+    StatsService service(nullptr);
+    // Partial buckets don't occur when app is first installed.
+    service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1);
+    SendConfig(service, MakeGaugeMetricConfig(60 * NS_PER_SEC /* One minute */));
+    const long start = getElapsedRealtimeNs();  // This is the start-time the metrics producers are
+                                                // initialized with.
+
+    const int64_t endSkipped = 5 * 60 * NS_PER_SEC + start + 2;
+    service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start);
+    service.mUidMap->updateApp(endSkipped, String16(kApp1.c_str()), 1, 2);
+
+    ConfigMetricsReport report =
+            GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100 * NS_PER_SEC, true);
+    EXPECT_EQ(1, report.metrics_size());
+    EXPECT_EQ(1, report.metrics(0).gauge_metrics().skipped_size());
+    // Can't test the start time since it will be based on the actual time when the pulling occurs.
+    EXPECT_EQ(endSkipped, report.metrics(0).gauge_metrics().skipped(0).end_elapsed_nanos());
+}
+
 #else
 GTEST_LOG_(INFO) << "This test does nothing.\n";
 #endif