ValueMetric supports multiple aggregation types

1. Add support for MIN, MAX, AVG
2. ValueMetric also allow floats now, in addition to long data type.
AnomalyDetection still takes long only. I am not sure if it makes
sense to do anomaly on AVG. I will leave that for later.
3. ValueMetric supports sliced condition change for pushed events.
I don't think it makes sense for pulled events to have sliced condition
changes so leave it for now.

Test: unit test
Change-Id: I8bc510d98ea9b8a6eb16d04ff99dce6b574249cd
diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
index 5195f01..3559a7c 100644
--- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
@@ -80,13 +80,11 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    valueProducer.setBucketSize(60 * NS_PER_SEC);
 
-    // startUpdated:true tainted:0 sum:0 start:11
+    // startUpdated:true sum:0 start:11
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
-    EXPECT_EQ(11, curInterval.start);
+    EXPECT_EQ(false, curInterval.hasValue);
+    EXPECT_EQ(11, curInterval.start.long_value);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     allData.clear();
@@ -99,13 +97,12 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // tartUpdated:false tainted:0 sum:12
+    // tartUpdated:false sum:12
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(0, curInterval.value.long_value);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 
     allData.clear();
     event = make_shared<LogEvent>(tagId, bucket4StartTimeNs + 1);
@@ -116,13 +113,12 @@
     valueProducer.onDataPulled(allData);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // startUpdated:false tainted:0 sum:12
+    // startUpdated:false sum:12
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(0, curInterval.value.long_value);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(13, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(13, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 }
 
 /*
@@ -157,12 +153,10 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    valueProducer.setBucketSize(60 * NS_PER_SEC);
 
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
-    EXPECT_EQ(11, curInterval.start);
+    EXPECT_EQ(false, curInterval.hasValue);
+    EXPECT_EQ(11, curInterval.start.long_value);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     allData.clear();
@@ -176,11 +170,10 @@
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 
     allData.clear();
     event = make_shared<LogEvent>(tagId, bucket4StartTimeNs + 1);
@@ -192,11 +185,10 @@
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(26, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(26, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 }
 
 /*
@@ -230,12 +222,10 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    valueProducer.setBucketSize(60 * NS_PER_SEC);
 
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
-    EXPECT_EQ(11, curInterval.start);
+    EXPECT_EQ(false, curInterval.hasValue);
+    EXPECT_EQ(11, curInterval.start.long_value);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     allData.clear();
@@ -249,8 +239,7 @@
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     allData.clear();
@@ -263,11 +252,10 @@
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(26, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(26, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 }
 
 /*
@@ -316,11 +304,10 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // startUpdated:false tainted:0 sum:0 start:100
-    EXPECT_EQ(100, curInterval.start);
+    // startUpdated:false sum:0 start:100
+    EXPECT_EQ(100, curInterval.start.long_value);
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     vector<shared_ptr<LogEvent>> allData;
@@ -335,19 +322,19 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // startUpdated:false tainted:0 sum:0 start:110
-    EXPECT_EQ(110, curInterval.start);
+    // startUpdated:false sum:0 start:110
+    EXPECT_EQ(110, curInterval.start.long_value);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 
     valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
 
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // startUpdated:false tainted:0 sum:0 start:110
-    EXPECT_EQ(10, curInterval.sum);
+    // startUpdated:false sum:0 start:110
+    EXPECT_EQ(10, curInterval.value.long_value);
     EXPECT_EQ(false, curInterval.startUpdated);
 }
 
@@ -433,7 +420,7 @@
     valueProducer.notifyAppUpgrade(eventUpgradeTimeNs, "ANY.APP", 1, 1);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
     EXPECT_EQ(eventUpgradeTimeNs, valueProducer.mCurrentBucketStartTimeNs);
-    EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mValue);
+    EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mValueLong);
 
     allData.clear();
     event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
@@ -444,7 +431,7 @@
     valueProducer.onDataPulled(allData);
     EXPECT_EQ(2UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
     EXPECT_EQ(bucket2StartTimeNs, valueProducer.mCurrentBucketStartTimeNs);
-    EXPECT_EQ(30L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][1].mValue);
+    EXPECT_EQ(30L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][1].mValueLong);
 }
 
 TEST(ValueMetricProducerTest, TestPulledValueWithUpgradeWhileConditionFalse) {
@@ -493,7 +480,7 @@
     EXPECT_EQ(bucket2StartTimeNs-50, valueProducer.mCurrentBucketStartTimeNs);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
     EXPECT_EQ(bucketStartTimeNs, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mBucketStartNs);
-    EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mValue);
+    EXPECT_EQ(20L, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY][0].mValueLong);
     EXPECT_FALSE(valueProducer.mCondition);
 }
 
@@ -523,19 +510,20 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    EXPECT_EQ(10, curInterval.sum);
+    EXPECT_EQ(10, curInterval.value.long_value);
+    EXPECT_EQ(true, curInterval.hasValue);
 
     valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2);
 
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    EXPECT_EQ(30, curInterval.sum);
+    EXPECT_EQ(30, curInterval.value.long_value);
 
     valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(30, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(30, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 }
 
 TEST(ValueMetricProducerTest, TestPushedEventsWithCondition) {
@@ -572,7 +560,7 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    EXPECT_EQ(20, curInterval.sum);
+    EXPECT_EQ(20, curInterval.value.long_value);
 
     shared_ptr<LogEvent> event3 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 30);
     event3->write(1);
@@ -583,7 +571,7 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    EXPECT_EQ(50, curInterval.sum);
+    EXPECT_EQ(50, curInterval.value.long_value);
 
     valueProducer.onConditionChangedLocked(false, bucketStartTimeNs + 35);
     shared_ptr<LogEvent> event4 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 40);
@@ -595,12 +583,12 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    EXPECT_EQ(50, curInterval.sum);
+    EXPECT_EQ(50, curInterval.value.long_value);
 
     valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(50, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(50, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 }
 
 TEST(ValueMetricProducerTest, TestAnomalyDetection) {
@@ -718,11 +706,10 @@
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
 
-    // startUpdated:true tainted:0 sum:0 start:11
+    // startUpdated:true sum:0 start:11
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
-    EXPECT_EQ(11, curInterval.start);
+    EXPECT_EQ(false, curInterval.hasValue);
+    EXPECT_EQ(11, curInterval.start.long_value);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     // pull 2 at correct time
@@ -736,13 +723,12 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // tartUpdated:false tainted:0 sum:12
+    // tartUpdated:false sum:12
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 
     // pull 3 come late.
     // The previous bucket gets closed with error. (Has start value 23, no ending)
@@ -757,14 +743,13 @@
     valueProducer.onDataPulled(allData);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // startUpdated:false tainted:0 sum:12
+    // startUpdated:false sum:12
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(36, curInterval.start);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(36, curInterval.start.long_value);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
-    EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValue);
+    EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
 }
 
 /*
@@ -816,19 +801,17 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // startUpdated:false tainted:0 sum:0 start:100
-    EXPECT_EQ(100, curInterval.start);
+    // startUpdated:false sum:0 start:100
+    EXPECT_EQ(100, curInterval.start.long_value);
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     // pull on bucket boundary come late, condition change happens before it
     valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(false, curInterval.startUpdated);
-    EXPECT_EQ(1, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     // Now the alarm is delivered.
@@ -844,8 +827,7 @@
 
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(false, curInterval.startUpdated);
-    EXPECT_EQ(1, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 }
 
@@ -909,28 +891,25 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // startUpdated:false tainted:0 sum:0 start:100
-    EXPECT_EQ(100, curInterval.start);
+    // startUpdated:false sum:0 start:100
+    EXPECT_EQ(100, curInterval.start.long_value);
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     // pull on bucket boundary come late, condition change happens before it
     valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(false, curInterval.startUpdated);
-    EXPECT_EQ(1, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     // condition changed to true again, before the pull alarm is delivered
     valueProducer.onConditionChanged(true, bucket2StartTimeNs + 25);
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(130, curInterval.start);
-    EXPECT_EQ(1, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(130, curInterval.start.long_value);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     // Now the alarm is delivered, but it is considered late, it has no effect
@@ -945,9 +924,8 @@
 
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(130, curInterval.start);
-    EXPECT_EQ(1, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(130, curInterval.start.long_value);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 }
 
@@ -1000,11 +978,10 @@
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
-    // startUpdated:false tainted:0 sum:0 start:100
-    EXPECT_EQ(100, curInterval.start);
+    // startUpdated:false sum:0 start:100
+    EXPECT_EQ(100, curInterval.start.long_value);
     EXPECT_EQ(true, curInterval.startUpdated);
-    EXPECT_EQ(0, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     // pull on bucket boundary come late, condition change happens before it.
@@ -1012,8 +989,7 @@
     valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(false, curInterval.startUpdated);
-    EXPECT_EQ(1, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
     // Alarm is delivered in time, but the pull is very slow, and pullers are called in order,
@@ -1029,11 +1005,247 @@
 
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
     EXPECT_EQ(false, curInterval.startUpdated);
-    EXPECT_EQ(1, curInterval.tainted);
-    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 }
 
+TEST(ValueMetricProducerTest, TestPushedAggregateMin) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_aggregation_type(ValueMetric::MIN);
+
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, -1, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+    valueProducer.setBucketSize(60 * NS_PER_SEC);
+
+    shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+    event1->write(1);
+    event1->write(10);
+    event1->init();
+    shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 20);
+    event2->write(1);
+    event2->write(20);
+    event2->init();
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1);
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(10, curInterval.value.long_value);
+    EXPECT_EQ(true, curInterval.hasValue);
+
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2);
+
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(10, curInterval.value.long_value);
+
+    valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(10, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+}
+
+TEST(ValueMetricProducerTest, TestPushedAggregateMax) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_aggregation_type(ValueMetric::MAX);
+
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, -1, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+    valueProducer.setBucketSize(60 * NS_PER_SEC);
+
+    shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+    event1->write(1);
+    event1->write(10);
+    event1->init();
+    shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 20);
+    event2->write(1);
+    event2->write(20);
+    event2->init();
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1);
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(10, curInterval.value.long_value);
+    EXPECT_EQ(true, curInterval.hasValue);
+
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2);
+
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(20, curInterval.value.long_value);
+
+    valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(20, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+}
+
+TEST(ValueMetricProducerTest, TestPushedAggregateAvg) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_aggregation_type(ValueMetric::AVG);
+
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, -1, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+    valueProducer.setBucketSize(60 * NS_PER_SEC);
+
+    shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+    event1->write(1);
+    event1->write(10);
+    event1->init();
+    shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 20);
+    event2->write(1);
+    event2->write(15);
+    event2->init();
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1);
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval curInterval;
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(10, curInterval.value.long_value);
+    EXPECT_EQ(true, curInterval.hasValue);
+    EXPECT_EQ(1, curInterval.sampleSize);
+
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2);
+
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(25, curInterval.value.long_value);
+    EXPECT_EQ(2, curInterval.sampleSize);
+
+    valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(12.5, valueProducer.mPastBuckets.begin()->second.back().mValueDouble);
+}
+
+TEST(ValueMetricProducerTest, TestPushedAggregateSum) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_aggregation_type(ValueMetric::SUM);
+
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, -1, wizard, -1, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+    valueProducer.setBucketSize(60 * NS_PER_SEC);
+
+    shared_ptr<LogEvent> event1 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+    event1->write(1);
+    event1->write(10);
+    event1->init();
+    shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucketStartTimeNs + 20);
+    event2->write(1);
+    event2->write(15);
+    event2->init();
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1);
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(10, curInterval.value.long_value);
+    EXPECT_EQ(true, curInterval.hasValue);
+
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2);
+
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(25, curInterval.value.long_value);
+
+    valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(25, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+}
+
+TEST(ValueMetricProducerTest, TestPushedAggregateSumSliced) {
+    string slicedConditionName = "UID";
+    const int conditionTagId = 2;
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(1);
+    metric.set_aggregation_type(ValueMetric::SUM);
+
+    metric.set_condition(StringToId(slicedConditionName));
+    MetricConditionLink* link = metric.add_links();
+    link->set_condition(StringToId(slicedConditionName));
+    buildSimpleAtomFieldMatcher(tagId, 2, link->mutable_fields_in_what());
+    buildSimpleAtomFieldMatcher(conditionTagId, 2, link->mutable_fields_in_condition());
+
+    LogEvent event1(tagId, bucketStartTimeNs + 10);
+    event1.write(10);  // value
+    event1.write("111"); // uid
+    event1.init();
+    ConditionKey key1;
+    key1[StringToId(slicedConditionName)] =
+        {getMockedDimensionKey(conditionTagId, 2, "111")};
+
+    LogEvent event2(tagId, bucketStartTimeNs + 20);
+    event2.write(15);
+    event2.write("222");
+    event2.init();
+    ConditionKey key2;
+    key2[StringToId(slicedConditionName)] =
+        {getMockedDimensionKey(conditionTagId, 2, "222")};
+
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    EXPECT_CALL(*wizard, query(_, key1, _, _, _, _)).WillOnce(Return(ConditionState::kFalse));
+
+    EXPECT_CALL(*wizard, query(_, key2, _, _, _, _)).WillOnce(Return(ConditionState::kTrue));
+
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, -1, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+    valueProducer.setBucketSize(60 * NS_PER_SEC);
+
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, event1);
+
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(false, curInterval.hasValue);
+
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, event2);
+
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(15, curInterval.value.long_value);
+
+    valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(15, valueProducer.mPastBuckets.begin()->second.back().mValueLong);
+}
+
 }  // namespace statsd
 }  // namespace os
 }  // namespace android