use running sum for ValueMetricProducer bucket
simplify ValueMetricProducer logic for pulled data

Test: unit test
Change-Id: Ic0a21a543166cc5c34c1fa505dba08d1fc2f510a
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index c20c302..aabe5af 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -185,9 +185,13 @@
     mCondition = condition;
 
     if (eventTime < mCurrentBucketStartTimeNs) {
+        VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTime,
+             (long long)mCurrentBucketStartTimeNs);
         return;
     }
 
+    flushIfNeededLocked(eventTime);
+
     if (mPullTagId != -1) {
         if (mCondition == true) {
             mStatsPullerManager->RegisterReceiver(mPullTagId, this,
@@ -202,9 +206,8 @@
                 return;
             }
             for (const auto& data : allData) {
-                onMatchedLogEventLocked(0, *data, false);
+                onMatchedLogEventLocked(0, *data);
             }
-            flushIfNeededLocked(eventTime);
         }
         return;
     }
@@ -217,15 +220,22 @@
         if (allData.size() == 0) {
             return;
         }
-        uint64_t eventTime = allData.at(0)->GetTimestampNs();
-        // alarm is not accurate and might drift.
-        if (eventTime > mCurrentBucketStartTimeNs + mBucketSizeNs * 3 / 2) {
-            flushIfNeededLocked(eventTime);
-        }
+        // For scheduled pulled data, the effective event time is snap to the nearest
+        // bucket boundary to make bucket finalize.
+        uint64_t realEventTime = allData.at(0)->GetTimestampNs();
+        uint64_t eventTime = mStartTimeNs + ((realEventTime - mStartTimeNs)/mBucketSizeNs) * mBucketSizeNs;
+
+        mCondition = false;
         for (const auto& data : allData) {
-            onMatchedLogEventLocked(0, *data, true);
+            data->setTimestampNs(eventTime-1);
+            onMatchedLogEventLocked(0, *data);
         }
-        flushIfNeededLocked(eventTime);
+
+        mCondition = true;
+        for (const auto& data : allData) {
+            data->setTimestampNs(eventTime);
+            onMatchedLogEventLocked(0, *data);
+        }
     }
 }
 
@@ -253,7 +263,7 @@
 void ValueMetricProducer::onMatchedLogEventInternalLocked(
         const size_t matcherIndex, const HashableDimensionKey& eventKey,
         const map<string, HashableDimensionKey>& conditionKey, bool condition,
-        const LogEvent& event, bool scheduledPull) {
+        const LogEvent& event) {
     uint64_t eventTimeNs = event.GetTimestampNs();
     if (eventTimeNs < mCurrentBucketStartTimeNs) {
         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
@@ -261,6 +271,8 @@
         return;
     }
 
+    flushIfNeededLocked(eventTimeNs);
+
     if (hitGuardRailLocked(eventKey)) {
         return;
     }
@@ -268,36 +280,21 @@
 
     long value = get_value(event);
 
-    if (mPullTagId != -1) {
-        if (scheduledPull) {
-            // scheduled pull always sets beginning of current bucket and end
-            // of next bucket
-            if (interval.raw.size() > 0) {
-                interval.raw.back().second = value;
-            } else {
-                interval.raw.push_back(make_pair(value, value));
-            }
-            Interval& nextInterval = mNextSlicedBucket[eventKey];
-            if (nextInterval.raw.size() == 0) {
-                nextInterval.raw.push_back(make_pair(value, 0));
-            } else {
-                nextInterval.raw.front().first = value;
-            }
+    if (mPullTagId != -1) { // for pulled events
+        if (mCondition == true) {
+            interval.start = value;
+            interval.startUpdated = true;
         } else {
-            if (mCondition == true) {
-                interval.raw.push_back(make_pair(value, 0));
+            if (interval.startUpdated) {
+                interval.sum += (value - interval.start);
+                interval.startUpdated = false;
             } else {
-                if (interval.raw.size() != 0) {
-                    interval.raw.back().second = value;
-                } else {
-                    interval.tainted = true;
-                    VLOG("Data on condition true missing!");
-                }
+                VLOG("No start for matching end %ld", value);
+                interval.tainted += 1;
             }
         }
-    } else {
-        flushIfNeededLocked(eventTimeNs);
-        interval.raw.push_back(make_pair(value, 0));
+    } else {    // for pushed events
+        interval.sum += value;
     }
 }
 
@@ -327,27 +324,16 @@
 
     int tainted = 0;
     for (const auto& slice : mCurrentSlicedBucket) {
-        long value = 0;
-        if (mPullTagId != -1) {
-            for (const auto& pair : slice.second.raw) {
-                value += (pair.second - pair.first);
-            }
-        } else {
-            for (const auto& pair : slice.second.raw) {
-                value += pair.first;
-            }
-        }
         tainted += slice.second.tainted;
-        info.mValue = value;
-        VLOG(" %s, %ld, %d", slice.first.c_str(), value, tainted);
+        info.mValue = slice.second.sum;
         // it will auto create new vector of ValuebucketInfo if the key is not found.
         auto& bucketList = mPastBuckets[slice.first];
         bucketList.push_back(info);
     }
+    VLOG("%d tainted pairs in the bucket", tainted);
 
     // Reset counters
-    mCurrentSlicedBucket.swap(mNextSlicedBucket);
-    mNextSlicedBucket.clear();
+    mCurrentSlicedBucket.clear();
 
     int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs;
     mCurrentBucketStartTimeNs = mCurrentBucketStartTimeNs + numBucketsForward * mBucketSizeNs;