Merge "Add unit test ValueMetricProducer on boundary" into pi-dev
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index bd3c78c..d0f510d 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -312,8 +312,13 @@
 
     if (mPullTagId != -1) { // for pulled events
         if (mCondition == true) {
-            interval.start = value;
-            interval.startUpdated = true;
+            if (!interval.startUpdated) {
+                interval.start = value;
+                interval.startUpdated = true;
+            } else {
+                // skip it if there is already value recorded for the start
+                VLOG("Already recorded value for this dimension %s", eventKey.toString().c_str());
+            }
         } else {
             // Generally we expect value to be monotonically increasing.
             // If not, there was a reset event. We take the absolute value as
@@ -382,6 +387,7 @@
     int tainted = 0;
     for (const auto& slice : mCurrentSlicedBucket) {
         tainted += slice.second.tainted;
+        tainted += slice.second.startUpdated;
         info.mValue = slice.second.sum;
         // it will auto create new vector of ValuebucketInfo if the key is not found.
         auto& bucketList = mPastBuckets[slice.first];
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index ebc6e81..45d9531 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -159,6 +159,10 @@
     FRIEND_TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade);
     FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition);
     FRIEND_TEST(ValueMetricProducerTest, TestAnomalyDetection);
+    FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition);
+    FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition);
+    FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2);
+    FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition3);
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
index a0224ec..c650a06 100644
--- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
@@ -45,6 +45,8 @@
 const int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs;
 const int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs;
 const int64_t bucket4StartTimeNs = bucketStartTimeNs + 3 * bucketSizeNs;
+const int64_t bucket5StartTimeNs = bucketStartTimeNs + 4 * bucketSizeNs;
+const int64_t bucket6StartTimeNs = bucketStartTimeNs + 5 * bucketSizeNs;
 const int64_t eventUpgradeTimeNs = bucketStartTimeNs + 15 * NS_PER_SEC;
 
 /*
@@ -431,6 +433,376 @@
             std::ceil(1.0 * event6->GetElapsedTimestampNs() / NS_PER_SEC + refPeriodSec));
 }
 
+// Test value metric no condition, the pull on bucket boundary come in time and too late
+TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    shared_ptr<MockStatsPullerManager> pullerManager =
+            make_shared<StrictMock<MockStatsPullerManager>>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
+                                      tagId, bucketStartTimeNs, pullerManager);
+    valueProducer.setBucketSize(60 * NS_PER_SEC);
+
+    vector<shared_ptr<LogEvent>> allData;
+    // pull 1
+    allData.clear();
+    shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+    event->write(tagId);
+    event->write(11);
+    event->init();
+    allData.push_back(event);
+
+    valueProducer.onDataPulled(allData);
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    valueProducer.setBucketSize(60 * NS_PER_SEC);
+
+    // startUpdated:true tainted:0 sum:0 start:11
+    EXPECT_EQ(true, curInterval.startUpdated);
+    EXPECT_EQ(0, curInterval.tainted);
+    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(11, curInterval.start);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second.back().mValue);
+
+    // pull 2 at correct time
+    allData.clear();
+    event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 1);
+    event->write(tagId);
+    event->write(23);
+    event->init();
+    allData.push_back(event);
+    valueProducer.onDataPulled(allData);
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    // tartUpdated:false tainted:0 sum:12
+    EXPECT_EQ(true, curInterval.startUpdated);
+    EXPECT_EQ(0, curInterval.tainted);
+    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(2UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second.back().mValue);
+
+    // pull 3 come late.
+    // The previous bucket gets closed with error. (Has start value 23, no ending)
+    // Another bucket gets closed with error. (No start, but ending with 36)
+    // The new bucket is back to normal.
+    allData.clear();
+    event = make_shared<LogEvent>(tagId, bucket6StartTimeNs + 1);
+    event->write(tagId);
+    event->write(36);
+    event->init();
+    allData.push_back(event);
+    valueProducer.onDataPulled(allData);
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    // startUpdated:false tainted:0 sum:12
+    EXPECT_EQ(true, curInterval.startUpdated);
+    EXPECT_EQ(0, curInterval.tainted);
+    EXPECT_EQ(36, curInterval.start);
+    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(4UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue);
+    EXPECT_EQ(12, valueProducer.mPastBuckets.begin()->second[1].mValue);
+    EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[2].mValue);
+    EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[3].mValue);
+}
+
+/*
+ * Test pulled event with non sliced condition. The pull on boundary come late because the alarm
+ * was delivered late.
+ */
+TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_condition(StringToId("SCREEN_ON"));
+
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    shared_ptr<MockStatsPullerManager> pullerManager =
+            make_shared<StrictMock<MockStatsPullerManager>>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+    EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
+            // condition becomes true
+            .WillOnce(Invoke([](int tagId, int64_t timeNs,
+                                vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+                event->write(tagId);
+                event->write(100);
+                event->init();
+                data->push_back(event);
+                return true;
+            }))
+            // condition becomes false
+            .WillOnce(Invoke([](int tagId, int64_t timeNs,
+                                vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 20);
+                event->write(tagId);
+                event->write(120);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, tagId, bucketStartTimeNs,
+                                      pullerManager);
+    valueProducer.setBucketSize(60 * NS_PER_SEC);
+    valueProducer.onConditionChanged(true, bucketStartTimeNs + 8);
+
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    // startUpdated:false tainted:0 sum:0 start:100
+    EXPECT_EQ(100, curInterval.start);
+    EXPECT_EQ(true, curInterval.startUpdated);
+    EXPECT_EQ(0, curInterval.tainted);
+    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
+
+    // pull on bucket boundary come late, condition change happens before it
+    valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(false, curInterval.startUpdated);
+    EXPECT_EQ(1, curInterval.tainted);
+    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue);
+
+    // Now the alarm is delivered.
+    // since the condition turned to off before this pull finish, it has no effect
+    vector<shared_ptr<LogEvent>> allData;
+    allData.clear();
+    shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 30);
+    event->write(1);
+    event->write(110);
+    event->init();
+    allData.push_back(event);
+    valueProducer.onDataPulled(allData);
+
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(false, curInterval.startUpdated);
+    EXPECT_EQ(1, curInterval.tainted);
+    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue);
+}
+
+/*
+ * Test pulled event with non sliced condition. The pull on boundary come late, after the condition
+ * change to false, and then true again. This is due to alarm delivered late.
+ */
+TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_condition(StringToId("SCREEN_ON"));
+
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    shared_ptr<MockStatsPullerManager> pullerManager =
+            make_shared<StrictMock<MockStatsPullerManager>>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillRepeatedly(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+    EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
+            // condition becomes true
+            .WillOnce(Invoke([](int tagId, int64_t timeNs,
+                                vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+                event->write(tagId);
+                event->write(100);
+                event->init();
+                data->push_back(event);
+                return true;
+            }))
+            // condition becomes false
+            .WillOnce(Invoke([](int tagId, int64_t timeNs,
+                                vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 20);
+                event->write(tagId);
+                event->write(120);
+                event->init();
+                data->push_back(event);
+                return true;
+            }))
+            // condition becomes true again
+            .WillOnce(Invoke([](int tagId, int64_t timeNs,
+                                vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 30);
+                event->write(tagId);
+                event->write(130);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, tagId, bucketStartTimeNs,
+                                      pullerManager);
+    valueProducer.setBucketSize(60 * NS_PER_SEC);
+    valueProducer.onConditionChanged(true, bucketStartTimeNs + 8);
+
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    // startUpdated:false tainted:0 sum:0 start:100
+    EXPECT_EQ(100, curInterval.start);
+    EXPECT_EQ(true, curInterval.startUpdated);
+    EXPECT_EQ(0, curInterval.tainted);
+    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
+
+    // pull on bucket boundary come late, condition change happens before it
+    valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(false, curInterval.startUpdated);
+    EXPECT_EQ(1, curInterval.tainted);
+    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue);
+
+    // condition changed to true again, before the pull alarm is delivered
+    valueProducer.onConditionChanged(true, bucket2StartTimeNs + 25);
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(true, curInterval.startUpdated);
+    EXPECT_EQ(130, curInterval.start);
+    EXPECT_EQ(1, curInterval.tainted);
+    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue);
+
+    // Now the alarm is delivered, but it is considered late, it has no effect
+    vector<shared_ptr<LogEvent>> allData;
+    allData.clear();
+    shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 50);
+    event->write(1);
+    event->write(110);
+    event->init();
+    allData.push_back(event);
+    valueProducer.onDataPulled(allData);
+
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(true, curInterval.startUpdated);
+    EXPECT_EQ(130, curInterval.start);
+    EXPECT_EQ(1, curInterval.tainted);
+    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue);
+}
+
+/*
+ * Test pulled event with non sliced condition. The pull on boundary come late because the puller is
+ * very slow.
+ */
+TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition3) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_condition(StringToId("SCREEN_ON"));
+
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    shared_ptr<MockStatsPullerManager> pullerManager =
+            make_shared<StrictMock<MockStatsPullerManager>>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+    EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
+            // condition becomes true
+            .WillOnce(Invoke([](int tagId, int64_t timeNs,
+                                vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+                event->write(tagId);
+                event->write(100);
+                event->init();
+                data->push_back(event);
+                return true;
+            }))
+            // condition becomes false
+            .WillOnce(Invoke([](int tagId, int64_t timeNs,
+                                vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 20);
+                event->write(tagId);
+                event->write(120);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, tagId, bucketStartTimeNs,
+                                      pullerManager);
+    valueProducer.setBucketSize(60 * NS_PER_SEC);
+    valueProducer.onConditionChanged(true, bucketStartTimeNs + 8);
+
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    // startUpdated:false tainted:0 sum:0 start:100
+    EXPECT_EQ(100, curInterval.start);
+    EXPECT_EQ(true, curInterval.startUpdated);
+    EXPECT_EQ(0, curInterval.tainted);
+    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
+
+    // pull on bucket boundary come late, condition change happens before it.
+    // But puller is very slow in this one, so the data come after bucket finish
+    valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(false, curInterval.startUpdated);
+    EXPECT_EQ(1, curInterval.tainted);
+    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue);
+
+    // Alarm is delivered in time, but the pull is very slow, and pullers are called in order,
+    // so this one comes even later
+    vector<shared_ptr<LogEvent>> allData;
+    allData.clear();
+    shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 30);
+    event->write(1);
+    event->write(110);
+    event->init();
+    allData.push_back(event);
+    valueProducer.onDataPulled(allData);
+
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second;
+    EXPECT_EQ(false, curInterval.startUpdated);
+    EXPECT_EQ(1, curInterval.tainted);
+    EXPECT_EQ(0, curInterval.sum);
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
+    EXPECT_EQ(0, valueProducer.mPastBuckets.begin()->second[0].mValue);
+}
+
 }  // namespace statsd
 }  // namespace os
 }  // namespace android