Statsd value metric anomaly detection

Add anomly detection to Value metric in statsd.

Test: make statsd_test && adb sync data && adb shell data/nativetest64/statsd_test/statsd_test
Test: run cts-dev -m CtsStatsdHostTestCases -t android.cts.statsd.HostAtomTests#testValueAnomalyDetection
Fixes: 70240042
Change-Id: I05cf36495cdfd0ac7aa1a922f0e253a60fda1787
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 9400a1c..1eabf11 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -294,6 +294,10 @@
     } else {    // for pushed events
         interval.sum += value;
     }
+
+    for (auto& tracker : mAnomalyTrackers) {
+        tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, interval.sum);
+    }
 }
 
 long ValueMetricProducer::get_value(const LogEvent& event) {
@@ -327,6 +331,12 @@
         // it will auto create new vector of ValuebucketInfo if the key is not found.
         auto& bucketList = mPastBuckets[slice.first];
         bucketList.push_back(info);
+
+        for (auto& tracker : mAnomalyTrackers) {
+            if (tracker != nullptr) {
+                tracker->addPastBucket(slice.first, info.mValue, info.mBucketNum);
+            }
+        }
     }
     VLOG("%d tainted pairs in the bucket", tainted);
 
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 62e5d52..51af83d 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -19,6 +19,7 @@
 #include <gtest/gtest_prod.h>
 #include <utils/threads.h>
 #include <list>
+#include "../anomaly/AnomalyTracker.h"
 #include "../condition/ConditionTracker.h"
 #include "../external/PullDataReceiver.h"
 #include "../external/StatsPullerManager.h"
@@ -117,6 +118,7 @@
     FRIEND_TEST(ValueMetricProducerTest, TestNonDimensionalEvents);
     FRIEND_TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition);
     FRIEND_TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition);
+    FRIEND_TEST(ValueMetricProducerTest, TestAnomalyDetection);
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
index 146a19d..6f117d3 100644
--- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
@@ -238,6 +238,79 @@
     EXPECT_EQ(30, valueProducer.mPastBuckets.begin()->second.back().mValue);
 }
 
+TEST(ValueMetricProducerTest, TestAnomalyDetection) {
+    Alert alert;
+    alert.set_name("alert");
+    alert.set_metric_name(metricName);
+    alert.set_trigger_if_sum_gt(130);
+    alert.set_number_of_buckets(2);
+    alert.set_refractory_period_secs(3);
+    sp<AnomalyTracker> anomalyTracker = new AnomalyTracker(alert, kConfigKey);
+
+    ValueMetric metric;
+    metric.set_name(metricName);
+    metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000);
+    metric.set_value_field(2);
+
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
+                                      -1 /*not pulled*/, bucketStartTimeNs);
+    valueProducer.addAnomalyTracker(anomalyTracker);
+
+
+    shared_ptr<LogEvent> event1
+            = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1 * NS_PER_SEC);
+    event1->write(161);
+    event1->write(10); // value of interest
+    event1->init();
+    shared_ptr<LogEvent> event2
+            = make_shared<LogEvent>(tagId, bucketStartTimeNs + 2 + NS_PER_SEC);
+    event2->write(162);
+    event2->write(20); // value of interest
+    event2->init();
+    shared_ptr<LogEvent> event3
+            = make_shared<LogEvent>(tagId, bucketStartTimeNs + 2 * bucketSizeNs + 1 * NS_PER_SEC);
+    event3->write(163);
+    event3->write(130); // value of interest
+    event3->init();
+    shared_ptr<LogEvent> event4
+            = make_shared<LogEvent>(tagId, bucketStartTimeNs + 3 * bucketSizeNs + 1 * NS_PER_SEC);
+    event4->write(35);
+    event4->write(1); // value of interest
+    event4->init();
+    shared_ptr<LogEvent> event5
+            = make_shared<LogEvent>(tagId, bucketStartTimeNs + 3 * bucketSizeNs + 2 * NS_PER_SEC);
+    event5->write(45);
+    event5->write(150); // value of interest
+    event5->init();
+    shared_ptr<LogEvent> event6
+            = make_shared<LogEvent>(tagId, bucketStartTimeNs + 3 * bucketSizeNs + 10 * NS_PER_SEC);
+    event6->write(25);
+    event6->write(160); // value of interest
+    event6->init();
+
+    // Two events in bucket #0.
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event1);
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event2);
+    EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), -1LL); // Value sum == 30 <= 130.
+
+    // One event in bucket #2. No alarm as bucket #0 is trashed out.
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event3);
+    EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), -1LL); // Value sum == 130 <= 130.
+
+    // Three events in bucket #3.
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event4);
+    // Anomaly at event 4 since Value sum == 131 > 130!
+    EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), (long long)event4->GetTimestampNs());
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event5);
+    // Event 5 is within 3 sec refractory period. Thus last alarm timestamp is still event4.
+    EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), (long long)event4->GetTimestampNs());
+
+    valueProducer.onMatchedLogEvent(1 /*log matcher index*/, *event6);
+    // Anomaly at event 6 since Value sum == 160 > 130 and after refractory period.
+    EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), (long long)event6->GetTimestampNs());
+}
+
 }  // namespace statsd
 }  // namespace os
 }  // namespace android