1/ Only expose thread-safe interfaces in metric producer.
2/ Simplify lock logic.
3/ Add test for duration metric producer.
Test: all unit test passsed.
Change-Id: If6ee2e69a17f12406f4b3ea3553b14642cd636d6
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
index a0374c0..66e8c61 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
@@ -93,7 +93,7 @@
mConditionSliced = true;
}
- startNewProtoOutputStream(mStartTimeNs);
+ startNewProtoOutputStreamLocked(mStartTimeNs);
VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
(long long)mBucketSizeNs, (long long)mStartTimeNs);
@@ -103,7 +103,7 @@
VLOG("~DurationMetric() called");
}
-void DurationMetricProducer::startNewProtoOutputStream(long long startTime) {
+void DurationMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
mProto = std::make_unique<ProtoOutputStream>();
mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -111,7 +111,7 @@
}
unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker(
- const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) {
+ const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) const {
switch (mMetric.aggregation_type()) {
case DurationMetric_AggregationType_SUM:
return make_unique<OringDurationTracker>(
@@ -129,19 +129,20 @@
// DropboxWriter.
}
-void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
+void DurationMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
- flushIfNeeded(eventTime);
+ flushIfNeededLocked(eventTime);
// Now for each of the on-going event, check if the condition has changed for them.
for (auto& pair : mCurrentSlicedDuration) {
pair.second->onSlicedConditionMayChange(eventTime);
}
}
-void DurationMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
+void DurationMetricProducer::onConditionChangedLocked(const bool conditionMet,
+ const uint64_t eventTime) {
VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
mCondition = conditionMet;
- flushIfNeeded(eventTime);
+ flushIfNeededLocked(eventTime);
// TODO: need to populate the condition change time from the event which triggers the condition
// change, instead of using current time.
for (auto& pair : mCurrentSlicedDuration) {
@@ -149,13 +150,13 @@
}
}
-std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() {
+std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReportLocked() {
long long endTime = time(nullptr) * NS_PER_SEC;
// Dump current bucket if it's stale.
// If current bucket is still on-going, don't force dump current bucket.
// In finish(), We can force dump current bucket.
- flushIfNeeded(endTime);
+ flushIfNeededLocked(endTime);
VLOG("metric %s dump report now...", mMetric.name().c_str());
for (const auto& pair : mPastBuckets) {
@@ -214,13 +215,13 @@
mProto->end(mProtoToken);
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
(long long)mCurrentBucketStartTimeNs);
- std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
- startNewProtoOutputStream(endTime);
+ std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
+ startNewProtoOutputStreamLocked(endTime);
// TODO: Properly clear the old buckets.
return buffer;
}
-void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) {
+void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTime) {
if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) {
return;
}
@@ -239,7 +240,7 @@
mCurrentBucketNum += numBucketsForward;
}
-bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
+bool DurationMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) {
// the key is not new, we are good.
if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) {
return false;
@@ -259,11 +260,11 @@
return false;
}
-void DurationMetricProducer::onMatchedLogEventInternal(
+void DurationMetricProducer::onMatchedLogEventInternalLocked(
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const map<string, HashableDimensionKey>& conditionKeys, bool condition,
const LogEvent& event, bool scheduledPull) {
- flushIfNeeded(event.GetTimestampNs());
+ flushIfNeededLocked(event.GetTimestampNs());
if (matcherIndex == mStopAllIndex) {
for (auto& pair : mCurrentSlicedDuration) {
@@ -275,7 +276,7 @@
HashableDimensionKey atomKey = getHashableKey(getDimensionKey(event, mInternalDimension));
if (mCurrentSlicedDuration.find(eventKey) == mCurrentSlicedDuration.end()) {
- if (hitGuardRail(eventKey)) {
+ if (hitGuardRailLocked(eventKey)) {
return;
}
mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]);
@@ -290,7 +291,7 @@
}
}
-size_t DurationMetricProducer::byteSize() const {
+size_t DurationMetricProducer::byteSizeLocked() const {
size_t totalSize = 0;
for (const auto& pair : mPastBuckets) {
totalSize += pair.second.size() * kBucketSize;