Partial buckets on app upgrade and fix duration.

Statsd will create partial buckets in all metrics producers when an
app is upgraded so that we can separate metrics between different
versions of an app. By looking at the uid map changes, we can tell
which app versions belong to a bucket; for metrics that are not
affected by an app version, we can instead join the buckets together.

To simplify the logic, the ends of the full buckets are always
aligned to when the metric producers were created. These boundaries
are computed on the fly by using the bucket number and the metric
producers' start times.

We keep the anomaly trackers to only be given full buckets; we buffer
the partial buckets within each metric producer.

Duration metric's MAX_SPARSE is fixed to be implemented as such. In
addition, after further discussion, we find anomaly detection on
MAX_SPARSE to be unnecessary, so this functionality is removed.

Test: Unit-tests added and modified, passed on marlin-eng.
Change-Id: I5ff7a9c7f05c406e9faf400c6a39162970ded102
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
index efbdae1..2400eba1 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
@@ -121,13 +121,13 @@
         case DurationMetric_AggregationType_SUM:
             return make_unique<OringDurationTracker>(
                     mConfigKey, mMetricId, eventKey, mWizard, mConditionTrackerIndex,
-                    mDimensionsInCondition, mNested,
-                    mCurrentBucketStartTimeNs, mBucketSizeNs, mConditionSliced, mAnomalyTrackers);
+                    mDimensionsInCondition, mNested, mCurrentBucketStartTimeNs, mCurrentBucketNum,
+                    mStartTimeNs, mBucketSizeNs, mConditionSliced, mAnomalyTrackers);
         case DurationMetric_AggregationType_MAX_SPARSE:
             return make_unique<MaxDurationTracker>(
                     mConfigKey, mMetricId, eventKey, mWizard, mConditionTrackerIndex,
-                    mDimensionsInCondition, mNested,
-                    mCurrentBucketStartTimeNs, mBucketSizeNs, mConditionSliced, mAnomalyTrackers);
+                    mDimensionsInCondition, mNested, mCurrentBucketStartTimeNs, mCurrentBucketNum,
+                    mStartTimeNs, mBucketSizeNs, mConditionSliced, mAnomalyTrackers);
     }
 }
 
@@ -252,17 +252,18 @@
     protoOutput->end(protoToken);
     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
     mPastBuckets.clear();
-    mStartTimeNs = mCurrentBucketStartTimeNs;
 }
 
-void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTime) {
-    if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) {
+void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
+    uint64_t currentBucketEndTimeNs = getCurrentBucketEndTimeNs();
+
+    if (currentBucketEndTimeNs > eventTimeNs) {
         return;
     }
     VLOG("flushing...........");
     for (auto it = mCurrentSlicedDurationTrackerMap.begin();
             it != mCurrentSlicedDurationTrackerMap.end();) {
-        if (it->second->flushIfNeeded(eventTime, &mPastBuckets)) {
+        if (it->second->flushIfNeeded(eventTimeNs, &mPastBuckets)) {
             VLOG("erase bucket for key %s", it->first.c_str());
             it = mCurrentSlicedDurationTrackerMap.erase(it);
         } else {
@@ -270,11 +271,23 @@
         }
     }
 
-    int numBucketsForward = (eventTime - mCurrentBucketStartTimeNs) / mBucketSizeNs;
-    mCurrentBucketStartTimeNs += numBucketsForward * mBucketSizeNs;
+    int numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
+    mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
     mCurrentBucketNum += numBucketsForward;
 }
 
+void DurationMetricProducer::flushCurrentBucketLocked(const uint64_t& eventTimeNs) {
+    for (auto it = mCurrentSlicedDurationTrackerMap.begin();
+         it != mCurrentSlicedDurationTrackerMap.end();) {
+        if (it->second->flushCurrentBucket(eventTimeNs, &mPastBuckets)) {
+            VLOG("erase bucket for key %s", it->first.c_str());
+            it = mCurrentSlicedDurationTrackerMap.erase(it);
+        } else {
+            ++it;
+        }
+    }
+}
+
 void DurationMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
     if (mCurrentSlicedDurationTrackerMap.size() == 0) {
         return;