1/ Duration anomaly tracker with alarm.
2/ Init anomaly from config based on the public language.
3/ Unit tests for anomaly detection in count/gauge producer.
4/ Revisit the duration tracker logic.
Test: unit test passed.
Change-Id: I2423c0e0f05b1e37626954de9e749303423963f2
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.cpp b/cmds/statsd/src/metrics/CountMetricProducer.cpp
index f9da68e..d47bd4f 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/CountMetricProducer.cpp
@@ -17,7 +17,6 @@
#define DEBUG true // STOPSHIP if true
#include "Log.h"
-#include "../anomaly/DiscreteAnomalyTracker.h"
#include "CountMetricProducer.h"
#include "stats_util.h"
@@ -114,7 +113,7 @@
// Dump current bucket if it's stale.
// If current bucket is still on-going, don't force dump current bucket.
// In finish(), We can force dump current bucket.
- flushCounterIfNeeded(endTime);
+ flushIfNeeded(endTime);
VLOG("metric %s dump report now...", mMetric.name().c_str());
for (const auto& counter : mPastBuckets) {
@@ -170,7 +169,6 @@
startNewProtoOutputStream(endTime);
mPastBuckets.clear();
- mByteSize = 0;
return buffer;
@@ -188,7 +186,7 @@
const LogEvent& event, bool scheduledPull) {
uint64_t eventTimeNs = event.GetTimestampNs();
- flushCounterIfNeeded(eventTimeNs);
+ flushIfNeeded(eventTimeNs);
if (condition == false) {
return;
@@ -205,41 +203,41 @@
count++;
}
- VLOG("metric %s %s->%d", mMetric.name().c_str(), eventKey.c_str(),
- (*mCurrentSlicedCounter)[eventKey]);
+ for (auto& tracker : mAnomalyTrackers) {
+ tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
+ mCurrentSlicedCounter->find(eventKey)->second);
+ }
+
+ VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(),
+ (long long)(*mCurrentSlicedCounter)[eventKey]);
}
// When a new matched event comes in, we check if event falls into the current
// bucket. If not, flush the old counter to past buckets and initialize the new bucket.
-void CountMetricProducer::flushCounterIfNeeded(const uint64_t eventTimeNs) {
- if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTimeNs) {
+void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
+ if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
return;
}
- // adjust the bucket start time
- // TODO: This (and addPastBucket to which it goes) doesn't really need to be an int64.
- uint64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs;
-
CountBucket info;
info.mBucketStartNs = mCurrentBucketStartTimeNs;
info.mBucketEndNs = mCurrentBucketStartTimeNs + mBucketSizeNs;
+ info.mBucketNum = mCurrentBucketNum;
for (const auto& counter : *mCurrentSlicedCounter) {
info.mCount = counter.second;
auto& bucketList = mPastBuckets[counter.first];
bucketList.push_back(info);
- VLOG("metric %s, dump key value: %s -> %d", mMetric.name().c_str(), counter.first.c_str(),
- counter.second);
- mByteSize += sizeof(info);
+ VLOG("metric %s, dump key value: %s -> %lld", mMetric.name().c_str(), counter.first.c_str(),
+ (long long)counter.second);
}
for (auto& tracker : mAnomalyTrackers) {
- tracker->addOrUpdateBucket(mCurrentSlicedCounter, mCurrentBucketNum);
- tracker->declareAndDeclareAnomaly();
+ tracker->addPastBucket(mCurrentSlicedCounter, mCurrentBucketNum);
}
// Reset counters (do not clear, since the old one is still referenced in mAnomalyTrackers).
mCurrentSlicedCounter = std::make_shared<DimToValMap>();
-
+ uint64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs;
mCurrentBucketStartTimeNs = mCurrentBucketStartTimeNs + numBucketsForward * mBucketSizeNs;
mCurrentBucketNum += numBucketsForward;
VLOG("metric %s: new bucket start time: %lld", mMetric.name().c_str(),
@@ -250,7 +248,11 @@
// greater than actual data size as it contains each dimension of
// CountMetricData is duplicated.
size_t CountMetricProducer::byteSize() {
- return mByteSize;
+ size_t totalSize = 0;
+ for (const auto& pair : mPastBuckets) {
+ totalSize += pair.second.size() * kBucketSize;
+ }
+ return totalSize;
}
} // namespace statsd