Partial buckets on app upgrade and fix duration.

Statsd will create partial buckets in all metrics producers when an
app is upgraded so that we can separate metrics between different
versions of an app. By looking at the uid map changes, we can tell
which app versions belong to a bucket; for metrics that are not
affected by an app version, we can instead join the buckets together.

To simplify the logic, the ends of the full buckets are always
aligned to when the metric producers were created. These boundaries
are computed on the fly by using the bucket number and the metric
producers' start times.

We keep the anomaly trackers to only be given full buckets; we buffer
the partial buckets within each metric producer.

Duration metric's MAX_SPARSE is fixed to be implemented as such. In
addition, after further discussion, we find anomaly detection on
MAX_SPARSE to be unnecessary, so this functionality is removed.

Test: Unit-tests added and modified, passed on marlin-eng.
Change-Id: I5ff7a9c7f05c406e9faf400c6a39162970ded102
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.cpp b/cmds/statsd/src/metrics/CountMetricProducer.cpp
index ae4df3e..5b5b57b 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/CountMetricProducer.cpp
@@ -170,7 +170,6 @@
     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
 
     mPastBuckets.clear();
-    mStartTimeNs = mCurrentBucketStartTimeNs;
 
     // TODO: Clear mDimensionKeyMap once the report is dumped.
 }
@@ -214,13 +213,11 @@
     }
 
     auto it = mCurrentSlicedCounter->find(eventKey);
-
     if (it == mCurrentSlicedCounter->end()) {
         // ===========GuardRail==============
         if (hitGuardRailLocked(eventKey)) {
             return;
         }
-
         // create a counter for the new key
         (*mCurrentSlicedCounter)[eventKey] = 1;
     } else {
@@ -228,10 +225,14 @@
         auto& count = it->second;
         count++;
     }
-
     for (auto& tracker : mAnomalyTrackers) {
+        int64_t countWholeBucket = mCurrentSlicedCounter->find(eventKey)->second;
+        auto prev = mCurrentFullCounters->find(eventKey);
+        if (prev != mCurrentFullCounters->end()) {
+            countWholeBucket += prev->second;
+        }
         tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
-                                         mCurrentSlicedCounter->find(eventKey)->second);
+                                         countWholeBucket);
     }
 
     VLOG("metric %lld %s->%lld", (long long)mMetricId, eventKey.c_str(),
@@ -241,33 +242,65 @@
 // When a new matched event comes in, we check if event falls into the current
 // bucket. If not, flush the old counter to past buckets and initialize the new bucket.
 void CountMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
-    if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
+    uint64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
+    if (eventTimeNs < currentBucketEndTimeNs) {
         return;
     }
 
+    flushCurrentBucketLocked(eventTimeNs);
+    // Setup the bucket start time and number.
+    uint64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
+    mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
+    mCurrentBucketNum += numBucketsForward;
+    VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
+         (long long)mCurrentBucketStartTimeNs);
+}
+
+void CountMetricProducer::flushCurrentBucketLocked(const uint64_t& eventTimeNs) {
+    uint64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
     CountBucket info;
     info.mBucketStartNs = mCurrentBucketStartTimeNs;
-    info.mBucketEndNs = mCurrentBucketStartTimeNs + mBucketSizeNs;
+    if (eventTimeNs < fullBucketEndTimeNs) {
+        info.mBucketEndNs = eventTimeNs;
+    } else {
+        info.mBucketEndNs = fullBucketEndTimeNs;
+    }
     info.mBucketNum = mCurrentBucketNum;
     for (const auto& counter : *mCurrentSlicedCounter) {
         info.mCount = counter.second;
         auto& bucketList = mPastBuckets[counter.first];
         bucketList.push_back(info);
-        VLOG("metric %lld, dump key value: %s -> %lld",
-            (long long)mMetricId, counter.first.c_str(), (long long)counter.second);
+        VLOG("metric %lld, dump key value: %s -> %lld", (long long)mMetricId, counter.first.c_str(),
+             (long long)counter.second);
     }
 
-    for (auto& tracker : mAnomalyTrackers) {
-        tracker->addPastBucket(mCurrentSlicedCounter, mCurrentBucketNum);
+    // If we have finished a full bucket, then send this to anomaly tracker.
+    if (eventTimeNs > fullBucketEndTimeNs) {
+        // Accumulate partial buckets with current value and then send to anomaly tracker.
+        if (mCurrentFullCounters->size() > 0) {
+            for (const auto& keyValuePair : *mCurrentSlicedCounter) {
+                (*mCurrentFullCounters)[keyValuePair.first] += keyValuePair.second;
+            }
+            for (auto& tracker : mAnomalyTrackers) {
+                tracker->addPastBucket(mCurrentFullCounters, mCurrentBucketNum);
+            }
+            mCurrentFullCounters = std::make_shared<DimToValMap>();
+        } else {
+            // Skip aggregating the partial buckets since there's no previous partial bucket.
+            for (auto& tracker : mAnomalyTrackers) {
+                tracker->addPastBucket(mCurrentSlicedCounter, mCurrentBucketNum);
+            }
+        }
+    } else {
+        // Accumulate partial bucket.
+        for (const auto& keyValuePair : *mCurrentSlicedCounter) {
+            (*mCurrentFullCounters)[keyValuePair.first] += keyValuePair.second;
+        }
     }
 
-    // Reset counters (do not clear, since the old one is still referenced in mAnomalyTrackers).
+    // Only resets the counters, but doesn't setup the times nor numbers.
+    // (Do not clear since the old one is still referenced in mAnomalyTrackers).
     mCurrentSlicedCounter = std::make_shared<DimToValMap>();
-    uint64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs;
-    mCurrentBucketStartTimeNs = mCurrentBucketStartTimeNs + numBucketsForward * mBucketSizeNs;
-    mCurrentBucketNum += numBucketsForward;
-    VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
-         (long long)mCurrentBucketStartTimeNs);
 }
 
 // Rough estimate of CountMetricProducer buffer stored. This number will be