Unit tests for ValueMetricProducer
StatsPullerManager is refactored so that we can mock it.
It may need more refactor pass to make is safer for longer runs.

Test: unit test
Change-Id: Ief0c99710e4d06e1454678f8b749c9599467d114
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 5bd10fa..5cffec1 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -31,6 +31,7 @@
 using android::util::FIELD_TYPE_MESSAGE;
 using android::util::ProtoOutputStream;
 using std::list;
+using std::make_pair;
 using std::make_shared;
 using std::map;
 using std::shared_ptr;
@@ -62,13 +63,23 @@
 const int FIELD_ID_END_BUCKET_NANOS = 2;
 const int FIELD_ID_VALUE = 3;
 
+static const uint64_t kDefaultBucketSizeMillis = 60 * 60 * 1000L;
+
 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
 ValueMetricProducer::ValueMetricProducer(const ValueMetric& metric, const int conditionIndex,
                                          const sp<ConditionWizard>& wizard, const int pullTagId,
-                                         const uint64_t startTimeNs)
-    : MetricProducer(startTimeNs, conditionIndex, wizard), mMetric(metric), mPullTagId(pullTagId) {
+                                         const uint64_t startTimeNs,
+                                         shared_ptr<StatsPullerManager> statsPullerManager)
+    : MetricProducer(startTimeNs, conditionIndex, wizard),
+      mMetric(metric),
+      mStatsPullerManager(statsPullerManager),
+      mPullTagId(pullTagId) {
     // TODO: valuemetric for pushed events may need unlimited bucket length
-    mBucketSizeNs = mMetric.bucket().bucket_size_millis() * 1000 * 1000;
+    if (metric.has_bucket() && metric.bucket().has_bucket_size_millis()) {
+        mBucketSizeNs = mMetric.bucket().bucket_size_millis() * 1000 * 1000;
+    } else {
+        mBucketSizeNs = kDefaultBucketSizeMillis * 1000 * 1000;
+    }
 
     mDimension.insert(mDimension.begin(), metric.dimension().begin(), metric.dimension().end());
 
@@ -79,8 +90,9 @@
     }
 
     if (!metric.has_condition() && mPullTagId != -1) {
-        mStatsPullerManager.RegisterReceiver(mPullTagId, this,
-                                             metric.bucket().bucket_size_millis());
+        VLOG("Setting up periodic pulling for %d", mPullTagId);
+        mStatsPullerManager->RegisterReceiver(mPullTagId, this,
+                                              metric.bucket().bucket_size_millis());
     }
 
     startNewProtoOutputStream(mStartTimeNs);
@@ -89,8 +101,19 @@
          (long long)mBucketSizeNs, (long long)mStartTimeNs);
 }
 
+// for testing
+ValueMetricProducer::ValueMetricProducer(const ValueMetric& metric, const int conditionIndex,
+                                         const sp<ConditionWizard>& wizard, const int pullTagId,
+                                         const uint64_t startTimeNs)
+    : ValueMetricProducer(metric, conditionIndex, wizard, pullTagId, startTimeNs,
+                          make_shared<StatsPullerManager>()) {
+}
+
 ValueMetricProducer::~ValueMetricProducer() {
     VLOG("~ValueMetricProducer() called");
+    if (mPullTagId != -1) {
+        mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
+    }
 }
 
 void ValueMetricProducer::startNewProtoOutputStream(long long startTime) {
@@ -177,14 +200,14 @@
 
     if (mPullTagId != -1) {
         if (mCondition == true) {
-            mStatsPullerManager.RegisterReceiver(mPullTagId, this,
-                                                 mMetric.bucket().bucket_size_millis());
-        } else if (mCondition == ConditionState::kFalse) {
-            mStatsPullerManager.UnRegisterReceiver(mPullTagId, this);
+            mStatsPullerManager->RegisterReceiver(mPullTagId, this,
+                                                  mMetric.bucket().bucket_size_millis());
+        } else if (mCondition == false) {
+            mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
         }
 
         vector<shared_ptr<LogEvent>> allData;
-        if (mStatsPullerManager.Pull(mPullTagId, &allData)) {
+        if (mStatsPullerManager->Pull(mPullTagId, &allData)) {
             if (allData.size() == 0) {
                 return;
             }
@@ -199,11 +222,15 @@
 
 void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
     AutoMutex _l(mLock);
-    if (mCondition == ConditionState::kTrue || !mMetric.has_condition()) {
+    if (mCondition == true || !mMetric.has_condition()) {
         if (allData.size() == 0) {
             return;
         }
         uint64_t eventTime = allData.at(0)->GetTimestampNs();
+        // alarm is not accurate and might drift.
+        if (eventTime > mCurrentBucketStartTimeNs + mBucketSizeNs * 3 / 2) {
+            flush_if_needed(eventTime);
+        }
         for (const auto& data : allData) {
             onMatchedLogEvent(0, *data, true);
         }
@@ -226,24 +253,36 @@
 
     long value = get_value(event);
 
-    if (scheduledPull) {
-        if (interval.raw.size() > 0) {
-            interval.raw.back().second = value;
-        } else {
-            interval.raw.push_back(std::make_pair(value, value));
-        }
-        mNextSlicedBucket[eventKey].raw[0].first = value;
-    } else {
-        if (mCondition == ConditionState::kTrue) {
-            interval.raw.push_back(std::make_pair(value, 0));
-        } else {
-            if (interval.raw.size() != 0) {
+    if (mPullTagId != -1) {
+        if (scheduledPull) {
+            // scheduled pull always sets beginning of current bucket and end
+            // of next bucket
+            if (interval.raw.size() > 0) {
                 interval.raw.back().second = value;
+            } else {
+                interval.raw.push_back(make_pair(value, value));
+            }
+            Interval& nextInterval = mNextSlicedBucket[eventKey];
+            if (nextInterval.raw.size() == 0) {
+                nextInterval.raw.push_back(make_pair(value, 0));
+            } else {
+                nextInterval.raw.front().first = value;
+            }
+        } else {
+            if (mCondition == true) {
+                interval.raw.push_back(make_pair(value, 0));
+            } else {
+                if (interval.raw.size() != 0) {
+                    interval.raw.back().second = value;
+                } else {
+                    interval.tainted = true;
+                    VLOG("Data on condition true missing!");
+                }
             }
         }
-    }
-    if (mPullTagId == -1) {
+    } else {
         flush_if_needed(eventTimeNs);
+        interval.raw.push_back(make_pair(value, 0));
     }
 }
 
@@ -253,7 +292,7 @@
     if (err == NO_ERROR) {
         return val;
     } else {
-        VLOG("Can't find value in message.");
+        VLOG("Can't find value in message. %s", event.ToString().c_str());
         return 0;
     }
 }
@@ -271,13 +310,21 @@
     info.mBucketStartNs = mCurrentBucketStartTimeNs;
     info.mBucketEndNs = mCurrentBucketStartTimeNs + mBucketSizeNs;
 
+    int tainted = 0;
     for (const auto& slice : mCurrentSlicedBucket) {
         long value = 0;
-        for (const auto& pair : slice.second.raw) {
-            value += pair.second - pair.first;
+        if (mPullTagId != -1) {
+            for (const auto& pair : slice.second.raw) {
+                value += (pair.second - pair.first);
+            }
+        } else {
+            for (const auto& pair : slice.second.raw) {
+                value += pair.first;
+            }
         }
+        tainted += slice.second.tainted;
         info.mValue = value;
-        VLOG(" %s, %ld", slice.first.c_str(), value);
+        VLOG(" %s, %ld, %d", slice.first.c_str(), value, tainted);
         // it will auto create new vector of ValuebucketInfo if the key is not found.
         auto& bucketList = mPastBuckets[slice.first];
         bucketList.push_back(info);