1/ Duration anomaly tracker with alarm.
2/ Init anomaly from config based on the public language.
3/ Unit tests for anomaly detection in count/gauge producer.
4/ Revisit the duration tracker logic.

Test: unit test passed.
Change-Id: I2423c0e0f05b1e37626954de9e749303423963f2
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.cpp b/cmds/statsd/src/metrics/CountMetricProducer.cpp
index f9da68e..d47bd4f 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/CountMetricProducer.cpp
@@ -17,7 +17,6 @@
 #define DEBUG true  // STOPSHIP if true
 #include "Log.h"
 
-#include "../anomaly/DiscreteAnomalyTracker.h"
 #include "CountMetricProducer.h"
 #include "stats_util.h"
 
@@ -114,7 +113,7 @@
     // Dump current bucket if it's stale.
     // If current bucket is still on-going, don't force dump current bucket.
     // In finish(), We can force dump current bucket.
-    flushCounterIfNeeded(endTime);
+    flushIfNeeded(endTime);
     VLOG("metric %s dump report now...", mMetric.name().c_str());
 
     for (const auto& counter : mPastBuckets) {
@@ -170,7 +169,6 @@
 
     startNewProtoOutputStream(endTime);
     mPastBuckets.clear();
-    mByteSize = 0;
 
     return buffer;
 
@@ -188,7 +186,7 @@
         const LogEvent& event, bool scheduledPull) {
     uint64_t eventTimeNs = event.GetTimestampNs();
 
-    flushCounterIfNeeded(eventTimeNs);
+    flushIfNeeded(eventTimeNs);
 
     if (condition == false) {
         return;
@@ -205,41 +203,41 @@
         count++;
     }
 
-    VLOG("metric %s %s->%d", mMetric.name().c_str(), eventKey.c_str(),
-         (*mCurrentSlicedCounter)[eventKey]);
+    for (auto& tracker : mAnomalyTrackers) {
+        tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
+                                         mCurrentSlicedCounter->find(eventKey)->second);
+    }
+
+    VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(),
+         (long long)(*mCurrentSlicedCounter)[eventKey]);
 }
 
 // When a new matched event comes in, we check if event falls into the current
 // bucket. If not, flush the old counter to past buckets and initialize the new bucket.
-void CountMetricProducer::flushCounterIfNeeded(const uint64_t eventTimeNs) {
-    if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTimeNs) {
+void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
+    if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
         return;
     }
 
-    // adjust the bucket start time
-    // TODO: This (and addPastBucket to which it goes) doesn't really need to be an int64.
-    uint64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs;
-
     CountBucket info;
     info.mBucketStartNs = mCurrentBucketStartTimeNs;
     info.mBucketEndNs = mCurrentBucketStartTimeNs + mBucketSizeNs;
+    info.mBucketNum = mCurrentBucketNum;
     for (const auto& counter : *mCurrentSlicedCounter) {
         info.mCount = counter.second;
         auto& bucketList = mPastBuckets[counter.first];
         bucketList.push_back(info);
-        VLOG("metric %s, dump key value: %s -> %d", mMetric.name().c_str(), counter.first.c_str(),
-             counter.second);
-        mByteSize += sizeof(info);
+        VLOG("metric %s, dump key value: %s -> %lld", mMetric.name().c_str(), counter.first.c_str(),
+             (long long)counter.second);
     }
 
     for (auto& tracker : mAnomalyTrackers) {
-        tracker->addOrUpdateBucket(mCurrentSlicedCounter, mCurrentBucketNum);
-        tracker->declareAndDeclareAnomaly();
+        tracker->addPastBucket(mCurrentSlicedCounter, mCurrentBucketNum);
     }
 
     // Reset counters (do not clear, since the old one is still referenced in mAnomalyTrackers).
     mCurrentSlicedCounter = std::make_shared<DimToValMap>();
-
+    uint64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs;
     mCurrentBucketStartTimeNs = mCurrentBucketStartTimeNs + numBucketsForward * mBucketSizeNs;
     mCurrentBucketNum += numBucketsForward;
     VLOG("metric %s: new bucket start time: %lld", mMetric.name().c_str(),
@@ -250,7 +248,11 @@
 // greater than actual data size as it contains each dimension of
 // CountMetricData is  duplicated.
 size_t CountMetricProducer::byteSize() {
-    return mByteSize;
+    size_t totalSize = 0;
+    for (const auto& pair : mPastBuckets) {
+        totalSize += pair.second.size() * kBucketSize;
+    }
+    return totalSize;
 }
 
 }  // namespace statsd