1/ Only expose thread-safe interfaces in metric producer.
2/ Simplify lock logic.
3/ Add test for duration metric producer.

Test: all unit test passsed.
Change-Id: If6ee2e69a17f12406f4b3ea3553b14642cd636d6
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
index a0374c0..66e8c61 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
@@ -93,7 +93,7 @@
         mConditionSliced = true;
     }
 
-    startNewProtoOutputStream(mStartTimeNs);
+    startNewProtoOutputStreamLocked(mStartTimeNs);
 
     VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
          (long long)mBucketSizeNs, (long long)mStartTimeNs);
@@ -103,7 +103,7 @@
     VLOG("~DurationMetric() called");
 }
 
-void DurationMetricProducer::startNewProtoOutputStream(long long startTime) {
+void DurationMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
     mProto = std::make_unique<ProtoOutputStream>();
     mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -111,7 +111,7 @@
 }
 
 unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker(
-        const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) {
+        const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) const {
     switch (mMetric.aggregation_type()) {
         case DurationMetric_AggregationType_SUM:
             return make_unique<OringDurationTracker>(
@@ -129,19 +129,20 @@
     // DropboxWriter.
 }
 
-void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
+void DurationMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
     VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
-    flushIfNeeded(eventTime);
+    flushIfNeededLocked(eventTime);
     // Now for each of the on-going event, check if the condition has changed for them.
     for (auto& pair : mCurrentSlicedDuration) {
         pair.second->onSlicedConditionMayChange(eventTime);
     }
 }
 
-void DurationMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
+void DurationMetricProducer::onConditionChangedLocked(const bool conditionMet,
+                                                      const uint64_t eventTime) {
     VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
     mCondition = conditionMet;
-    flushIfNeeded(eventTime);
+    flushIfNeededLocked(eventTime);
     // TODO: need to populate the condition change time from the event which triggers the condition
     // change, instead of using current time.
     for (auto& pair : mCurrentSlicedDuration) {
@@ -149,13 +150,13 @@
     }
 }
 
-std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() {
+std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReportLocked() {
     long long endTime = time(nullptr) * NS_PER_SEC;
 
     // Dump current bucket if it's stale.
     // If current bucket is still on-going, don't force dump current bucket.
     // In finish(), We can force dump current bucket.
-    flushIfNeeded(endTime);
+    flushIfNeededLocked(endTime);
     VLOG("metric %s dump report now...", mMetric.name().c_str());
 
     for (const auto& pair : mPastBuckets) {
@@ -214,13 +215,13 @@
     mProto->end(mProtoToken);
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
                   (long long)mCurrentBucketStartTimeNs);
-    std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
-    startNewProtoOutputStream(endTime);
+    std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
+    startNewProtoOutputStreamLocked(endTime);
     // TODO: Properly clear the old buckets.
     return buffer;
 }
 
-void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) {
+void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTime) {
     if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) {
         return;
     }
@@ -239,7 +240,7 @@
     mCurrentBucketNum += numBucketsForward;
 }
 
-bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
+bool DurationMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) {
     // the key is not new, we are good.
     if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) {
         return false;
@@ -259,11 +260,11 @@
     return false;
 }
 
-void DurationMetricProducer::onMatchedLogEventInternal(
+void DurationMetricProducer::onMatchedLogEventInternalLocked(
         const size_t matcherIndex, const HashableDimensionKey& eventKey,
         const map<string, HashableDimensionKey>& conditionKeys, bool condition,
         const LogEvent& event, bool scheduledPull) {
-    flushIfNeeded(event.GetTimestampNs());
+    flushIfNeededLocked(event.GetTimestampNs());
 
     if (matcherIndex == mStopAllIndex) {
         for (auto& pair : mCurrentSlicedDuration) {
@@ -275,7 +276,7 @@
     HashableDimensionKey atomKey = getHashableKey(getDimensionKey(event, mInternalDimension));
 
     if (mCurrentSlicedDuration.find(eventKey) == mCurrentSlicedDuration.end()) {
-        if (hitGuardRail(eventKey)) {
+        if (hitGuardRailLocked(eventKey)) {
             return;
         }
         mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]);
@@ -290,7 +291,7 @@
     }
 }
 
-size_t DurationMetricProducer::byteSize() const {
+size_t DurationMetricProducer::byteSizeLocked() const {
     size_t totalSize = 0;
     for (const auto& pair : mPastBuckets) {
         totalSize += pair.second.size() * kBucketSize;