Merge "Adds option to drop small buckets for statsd." into pi-dev
am: bbc057444f

Change-Id: I1ab4de808eef9e7f0ef78a4ea336db9a83420c02
diff --git a/cmds/statsd/src/StatsService.h b/cmds/statsd/src/StatsService.h
index 774a3e9..b5957b5 100644
--- a/cmds/statsd/src/StatsService.h
+++ b/cmds/statsd/src/StatsService.h
@@ -272,6 +272,10 @@
     FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnUpgrade);
     FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnRemoval);
     FRIEND_TEST(PartialBucketE2eTest, TestCountMetricWithoutSplit);
+    FRIEND_TEST(PartialBucketE2eTest, TestValueMetricWithoutMinPartialBucket);
+    FRIEND_TEST(PartialBucketE2eTest, TestValueMetricWithMinPartialBucket);
+    FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricWithoutMinPartialBucket);
+    FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricWithMinPartialBucket);
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index 6886f7c..1270856 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -47,6 +47,9 @@
 const int FIELD_ID_GAUGE_METRICS = 8;
 // for GaugeMetricDataWrapper
 const int FIELD_ID_DATA = 1;
+const int FIELD_ID_SKIPPED = 2;
+const int FIELD_ID_SKIPPED_START = 1;
+const int FIELD_ID_SKIPPED_END = 2;
 // for GaugeMetricData
 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
 const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
@@ -66,6 +69,7 @@
     : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard),
       mStatsPullerManager(statsPullerManager),
       mPullTagId(pullTagId),
+      mMinBucketSizeNs(metric.min_bucket_size_nanos()),
       mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
@@ -174,6 +178,15 @@
     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
     uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
 
+    for (const auto& pair : mSkippedBuckets) {
+        uint64_t wrapperToken =
+                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
+        protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START, (long long)pair.first);
+        protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END, (long long)pair.second);
+        protoOutput->end(wrapperToken);
+    }
+    mSkippedBuckets.clear();
+
     for (const auto& pair : mPastBuckets) {
         const MetricDimensionKey& dimensionKey = pair.first;
 
@@ -440,12 +453,16 @@
         info.mBucketEndNs = fullBucketEndTimeNs;
     }
 
-    for (const auto& slice : *mCurrentSlicedBucket) {
-        info.mGaugeAtoms = slice.second;
-        auto& bucketList = mPastBuckets[slice.first];
-        bucketList.push_back(info);
-        VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId,
-             slice.first.toString().c_str());
+    if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
+        for (const auto& slice : *mCurrentSlicedBucket) {
+            info.mGaugeAtoms = slice.second;
+            auto& bucketList = mPastBuckets[slice.first];
+            bucketList.push_back(info);
+            VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId,
+                 slice.first.toString().c_str());
+        }
+    } else {
+        mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
     }
 
     // If we have anomaly trackers, we need to update the partial bucket values.
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index 08765c2..71d5912 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -136,6 +136,11 @@
     // this slice (ie, for partial buckets, we use the last partial bucket in this full bucket).
     std::shared_ptr<DimToValMap> mCurrentSlicedBucketForAnomaly;
 
+    // Pairs of (elapsed start, elapsed end) denoting buckets that were skipped.
+    std::list<std::pair<int64_t, int64_t>> mSkippedBuckets;
+
+    const int64_t mMinBucketSizeNs;
+
     // Translate Atom based bucket to single numeric value bucket for anomaly and updates the map
     // for each slice with the latest value.
     void updateCurrentSlicedBucketForAnomaly();
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 844c728..27fd78f 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -50,6 +50,9 @@
 const int FIELD_ID_VALUE_METRICS = 7;
 // for ValueMetricDataWrapper
 const int FIELD_ID_DATA = 1;
+const int FIELD_ID_SKIPPED = 2;
+const int FIELD_ID_SKIPPED_START = 1;
+const int FIELD_ID_SKIPPED_END = 2;
 // for ValueMetricData
 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
 const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
@@ -69,6 +72,7 @@
       mValueField(metric.value_field()),
       mStatsPullerManager(statsPullerManager),
       mPullTagId(pullTagId),
+      mMinBucketSizeNs(metric.min_bucket_size_nanos()),
       mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
@@ -156,12 +160,21 @@
     } else {
         flushIfNeededLocked(dumpTimeNs);
     }
-    if (mPastBuckets.empty()) {
+    if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
         return;
     }
     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
     uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);
 
+    for (const auto& pair : mSkippedBuckets) {
+        uint64_t wrapperToken =
+                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
+        protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START, (long long)pair.first);
+        protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END, (long long)pair.second);
+        protoOutput->end(wrapperToken);
+    }
+    mSkippedBuckets.clear();
+
     for (const auto& pair : mPastBuckets) {
         const MetricDimensionKey& dimensionKey = pair.first;
         VLOG("  dimension key %s", dimensionKey.toString().c_str());
@@ -391,18 +404,23 @@
         info.mBucketEndNs = fullBucketEndTimeNs;
     }
 
-    int tainted = 0;
-    for (const auto& slice : mCurrentSlicedBucket) {
-        tainted += slice.second.tainted;
-        tainted += slice.second.startUpdated;
-        if (slice.second.hasValue) {
-            info.mValue = slice.second.sum;
-            // it will auto create new vector of ValuebucketInfo if the key is not found.
-            auto& bucketList = mPastBuckets[slice.first];
-            bucketList.push_back(info);
+    if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
+        // The current bucket is large enough to keep.
+        int tainted = 0;
+        for (const auto& slice : mCurrentSlicedBucket) {
+            tainted += slice.second.tainted;
+            tainted += slice.second.startUpdated;
+            if (slice.second.hasValue) {
+                info.mValue = slice.second.sum;
+                // it will auto create new vector of ValuebucketInfo if the key is not found.
+                auto& bucketList = mPastBuckets[slice.first];
+                bucketList.push_back(info);
+            }
         }
+        VLOG("%d tainted pairs in the bucket", tainted);
+    } else {
+        mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
     }
-    VLOG("%d tainted pairs in the bucket", tainted);
 
     if (eventTimeNs > fullBucketEndTimeNs) {  // If full bucket, send to anomaly tracker.
         // Accumulate partial buckets with current value and then send to anomaly tracker.
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 9c5a56c..8df30d3 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -148,6 +148,11 @@
     // TODO: Add a lock to mPastBuckets.
     std::unordered_map<MetricDimensionKey, std::vector<ValueBucket>> mPastBuckets;
 
+    // Pairs of (elapsed start, elapsed end) denoting buckets that were skipped.
+    std::list<std::pair<int64_t, int64_t>> mSkippedBuckets;
+
+    const int64_t mMinBucketSizeNs;
+
     // Util function to check whether the specified dimension hits the guardrail.
     bool hitGuardRailLocked(const MetricDimensionKey& newKey);
 
diff --git a/cmds/statsd/src/packages/UidMap.cpp b/cmds/statsd/src/packages/UidMap.cpp
index b3425a4..8eb5327 100644
--- a/cmds/statsd/src/packages/UidMap.cpp
+++ b/cmds/statsd/src/packages/UidMap.cpp
@@ -166,6 +166,8 @@
         } else {
             // Only notify the listeners if this is an app upgrade. If this app is being installed
             // for the first time, then we don't notify the listeners.
+            // It's also OK to split again if we're forming a partial bucket after re-installing an
+            // app after deletion.
             getListenerListCopyLocked(&broadcastList);
         }
         mChanges.emplace_back(false, timestamp, appName, uid, versionCode, prevVersion);
diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto
index eaa7bf1..e1b6386 100644
--- a/cmds/statsd/src/stats_log.proto
+++ b/cmds/statsd/src/stats_log.proto
@@ -121,6 +121,11 @@
 
   // Fields 2 and 3 are reserved.
 
+  message SkippedBuckets {
+      optional int64 start_elapsed_nanos = 1;
+      optional int64 end_elapsed_nanos = 2;
+  }
+
   message EventMetricDataWrapper {
     repeated EventMetricData data = 1;
   }
@@ -132,10 +137,12 @@
   }
   message ValueMetricDataWrapper {
     repeated ValueMetricData data = 1;
+    repeated SkippedBuckets skipped = 2;
   }
 
   message GaugeMetricDataWrapper {
     repeated GaugeMetricData data = 1;
+    repeated SkippedBuckets skipped = 2;
   }
 
   oneof data {
diff --git a/cmds/statsd/src/statsd_config.proto b/cmds/statsd/src/statsd_config.proto
index 870f92e..fd36560 100644
--- a/cmds/statsd/src/statsd_config.proto
+++ b/cmds/statsd/src/statsd_config.proto
@@ -236,6 +236,8 @@
     ALL_CONDITION_CHANGES = 2;
   }
   optional SamplingType sampling_type = 9 [default = RANDOM_ONE_SAMPLE] ;
+
+  optional int64 min_bucket_size_nanos = 10;
 }
 
 message ValueMetric {
@@ -259,6 +261,8 @@
     SUM = 1;
   }
   optional AggregationType aggregation_type = 8 [default = SUM];
+
+  optional int64 min_bucket_size_nanos = 10;
 }
 
 message Alert {
diff --git a/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp b/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp
index af07f6b..a34aaf0 100644
--- a/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp
+++ b/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp
@@ -14,6 +14,7 @@
 
 #include <gtest/gtest.h>
 
+#include <binder/IPCThreadState.h>
 #include "src/StatsLogProcessor.h"
 #include "src/StatsService.h"
 #include "src/stats_log_util.h"
@@ -39,9 +40,13 @@
     service.addConfiguration(kConfigKey, configAsVec, String16(kAndroid.c_str()));
 }
 
-ConfigMetricsReport GetReports(StatsService& service) {
+ConfigMetricsReport GetReports(sp<StatsLogProcessor> processor, int64_t timestamp,
+                               bool include_current = false) {
     vector<uint8_t> output;
-    service.getData(kConfigKey, String16(kAndroid.c_str()), &output);
+    IPCThreadState* ipc = IPCThreadState::self();
+    ConfigKey configKey(ipc->getCallingUid(), kConfigKey);
+    processor->onDumpReport(configKey, timestamp, include_current /* include_current_bucket*/,
+                            &output);
     ConfigMetricsReportList reports;
     reports.ParseFromArray(output.data(), output.size());
     EXPECT_EQ(1, reports.reports_size());
@@ -61,6 +66,47 @@
     return config;
 }
 
+StatsdConfig MakeValueMetricConfig(int64_t minTime) {
+    StatsdConfig config;
+    config.add_allowed_log_source("AID_ROOT");  // LogEvent defaults to UID of root.
+
+    auto temperatureAtomMatcher = CreateTemperatureAtomMatcher();
+    *config.add_atom_matcher() = temperatureAtomMatcher;
+    *config.add_atom_matcher() = CreateScreenTurnedOnAtomMatcher();
+    *config.add_atom_matcher() = CreateScreenTurnedOffAtomMatcher();
+
+    auto valueMetric = config.add_value_metric();
+    valueMetric->set_id(123456);
+    valueMetric->set_what(temperatureAtomMatcher.id());
+    *valueMetric->mutable_value_field() =
+            CreateDimensions(android::util::TEMPERATURE, {3 /* temperature degree field */});
+    *valueMetric->mutable_dimensions_in_what() =
+            CreateDimensions(android::util::TEMPERATURE, {2 /* sensor name field */});
+    valueMetric->set_bucket(FIVE_MINUTES);
+    valueMetric->set_min_bucket_size_nanos(minTime);
+    return config;
+}
+
+StatsdConfig MakeGaugeMetricConfig(int64_t minTime) {
+    StatsdConfig config;
+    config.add_allowed_log_source("AID_ROOT");  // LogEvent defaults to UID of root.
+
+    auto temperatureAtomMatcher = CreateTemperatureAtomMatcher();
+    *config.add_atom_matcher() = temperatureAtomMatcher;
+    *config.add_atom_matcher() = CreateScreenTurnedOnAtomMatcher();
+    *config.add_atom_matcher() = CreateScreenTurnedOffAtomMatcher();
+
+    auto gaugeMetric = config.add_gauge_metric();
+    gaugeMetric->set_id(123456);
+    gaugeMetric->set_what(temperatureAtomMatcher.id());
+    gaugeMetric->mutable_gauge_fields_filter()->set_include_all(true);
+    *gaugeMetric->mutable_dimensions_in_what() =
+            CreateDimensions(android::util::TEMPERATURE, {2 /* sensor name field */});
+    gaugeMetric->set_bucket(FIVE_MINUTES);
+    gaugeMetric->set_min_bucket_size_nanos(minTime);
+    return config;
+}
+
 TEST(PartialBucketE2eTest, TestCountMetricWithoutSplit) {
     StatsService service(nullptr);
     SendConfig(service, MakeConfig());
@@ -70,7 +116,7 @@
     service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 1).get());
     service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 2).get());
 
-    ConfigMetricsReport report = GetReports(service);
+    ConfigMetricsReport report = GetReports(service.mProcessor, start + 3);
     // Expect no metrics since the bucket has not finished yet.
     EXPECT_EQ(0, report.metrics_size());
 }
@@ -89,7 +135,7 @@
     // Goes into the second bucket.
     service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 3).get());
 
-    ConfigMetricsReport report = GetReports(service);
+    ConfigMetricsReport report = GetReports(service.mProcessor, start + 4);
     EXPECT_EQ(0, report.metrics_size());
 }
 
@@ -106,7 +152,7 @@
     // Goes into the second bucket.
     service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 3).get());
 
-    ConfigMetricsReport report = GetReports(service);
+    ConfigMetricsReport report = GetReports(service.mProcessor, start + 4);
     EXPECT_EQ(1, report.metrics_size());
     EXPECT_EQ(1, report.metrics(0).count_metrics().data(0).bucket_info(0).count());
 }
@@ -124,11 +170,85 @@
     // Goes into the second bucket.
     service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 3).get());
 
-    ConfigMetricsReport report = GetReports(service);
+    ConfigMetricsReport report = GetReports(service.mProcessor, start + 4);
     EXPECT_EQ(1, report.metrics_size());
     EXPECT_EQ(1, report.metrics(0).count_metrics().data(0).bucket_info(0).count());
 }
 
+TEST(PartialBucketE2eTest, TestValueMetricWithoutMinPartialBucket) {
+    StatsService service(nullptr);
+    // Partial buckets don't occur when app is first installed.
+    service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1);
+    SendConfig(service, MakeValueMetricConfig(0));
+    const long start = getElapsedRealtimeNs();  // This is the start-time the metrics producers are
+                                                // initialized with.
+
+    service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start);
+    service.mUidMap->updateApp(5 * 60 * NS_PER_SEC + start + 2, String16(kApp1.c_str()), 1, 2);
+
+    ConfigMetricsReport report =
+            GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100, true);
+    EXPECT_EQ(1, report.metrics_size());
+    EXPECT_EQ(0, report.metrics(0).value_metrics().skipped_size());
+}
+
+TEST(PartialBucketE2eTest, TestValueMetricWithMinPartialBucket) {
+    StatsService service(nullptr);
+    // Partial buckets don't occur when app is first installed.
+    service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1);
+    SendConfig(service, MakeValueMetricConfig(60 * NS_PER_SEC /* One minute */));
+    const long start = getElapsedRealtimeNs();  // This is the start-time the metrics producers are
+                                                // initialized with.
+
+    const int64_t endSkipped = 5 * 60 * NS_PER_SEC + start + 2;
+    service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start);
+    service.mUidMap->updateApp(endSkipped, String16(kApp1.c_str()), 1, 2);
+
+    ConfigMetricsReport report =
+            GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100 * NS_PER_SEC, true);
+    EXPECT_EQ(1, report.metrics_size());
+    EXPECT_EQ(1, report.metrics(0).value_metrics().skipped_size());
+    // Can't test the start time since it will be based on the actual time when the pulling occurs.
+    EXPECT_EQ(endSkipped, report.metrics(0).value_metrics().skipped(0).end_elapsed_nanos());
+}
+
+TEST(PartialBucketE2eTest, TestGaugeMetricWithoutMinPartialBucket) {
+    StatsService service(nullptr);
+    // Partial buckets don't occur when app is first installed.
+    service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1);
+    SendConfig(service, MakeGaugeMetricConfig(0));
+    const long start = getElapsedRealtimeNs();  // This is the start-time the metrics producers are
+                                                // initialized with.
+
+    service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start);
+    service.mUidMap->updateApp(5 * 60 * NS_PER_SEC + start + 2, String16(kApp1.c_str()), 1, 2);
+
+    ConfigMetricsReport report =
+            GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100, true);
+    EXPECT_EQ(1, report.metrics_size());
+    EXPECT_EQ(0, report.metrics(0).gauge_metrics().skipped_size());
+}
+
+TEST(PartialBucketE2eTest, TestGaugeMetricWithMinPartialBucket) {
+    StatsService service(nullptr);
+    // Partial buckets don't occur when app is first installed.
+    service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1);
+    SendConfig(service, MakeGaugeMetricConfig(60 * NS_PER_SEC /* One minute */));
+    const long start = getElapsedRealtimeNs();  // This is the start-time the metrics producers are
+                                                // initialized with.
+
+    const int64_t endSkipped = 5 * 60 * NS_PER_SEC + start + 2;
+    service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start);
+    service.mUidMap->updateApp(endSkipped, String16(kApp1.c_str()), 1, 2);
+
+    ConfigMetricsReport report =
+            GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100 * NS_PER_SEC, true);
+    EXPECT_EQ(1, report.metrics_size());
+    EXPECT_EQ(1, report.metrics(0).gauge_metrics().skipped_size());
+    // Can't test the start time since it will be based on the actual time when the pulling occurs.
+    EXPECT_EQ(endSkipped, report.metrics(0).gauge_metrics().skipped(0).end_elapsed_nanos());
+}
+
 #else
 GTEST_LOG_(INFO) << "This test does nothing.\n";
 #endif