Thread-safe metric producers.

Test: unit test passed
Change-Id: Ie47404e8649b63ee8ac32e40189a47f6cb7a9def
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 66c8419..eed7841 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -121,6 +121,7 @@
 }
 
 void ValueMetricProducer::startNewProtoOutputStream(long long startTime) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     mProto = std::make_unique<ProtoOutputStream>();
     mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -136,9 +137,8 @@
     VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
 }
 
-std::unique_ptr<std::vector<uint8_t>> ValueMetricProducer::onDumpReport() {
-    VLOG("metric %s dump report now...", mMetric.name().c_str());
-
+void ValueMetricProducer::SerializeBuckets() {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     for (const auto& pair : mPastBuckets) {
         const HashableDimensionKey& hashableKey = pair.first;
         VLOG("  dimension key %s", hashableKey.c_str());
@@ -185,47 +185,63 @@
     mProto->end(mProtoToken);
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
                   (long long)mCurrentBucketStartTimeNs);
+    mPastBuckets.clear();
+}
 
+std::unique_ptr<std::vector<uint8_t>> ValueMetricProducer::onDumpReport() {
     VLOG("metric %s dump report now...", mMetric.name().c_str());
+
+    SerializeBuckets();
     std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
 
     startNewProtoOutputStream(time(nullptr) * NS_PER_SEC);
-    mPastBuckets.clear();
 
     return buffer;
-
     // TODO: Clear mDimensionKeyMap once the report is dumped.
 }
 
 void ValueMetricProducer::onConditionChanged(const bool condition, const uint64_t eventTime) {
-    AutoMutex _l(mLock);
-    mCondition = condition;
+    vector<shared_ptr<LogEvent>> allData;
 
-    if (mPullTagId != -1) {
+    // TODO(yanglu): move the following logic to a seperate function to make it lockable.
+    {
+        std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+        mCondition = condition;
+
+        if (mPullTagId == -1) {
+            return;
+        }
+
         if (mCondition == true) {
             mStatsPullerManager->RegisterReceiver(mPullTagId, this,
                                                   mMetric.bucket().bucket_size_millis());
         } else if (mCondition == false) {
             mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
         }
-
-        vector<shared_ptr<LogEvent>> allData;
-        if (mStatsPullerManager->Pull(mPullTagId, &allData)) {
-            if (allData.size() == 0) {
-                return;
-            }
-            for (const auto& data : allData) {
-                onMatchedLogEvent(0, *data, false);
-            }
-            flushIfNeeded(eventTime);
+        if (!mStatsPullerManager->Pull(mPullTagId, &allData)) {
+            return;
         }
+    }
+
+    if (allData.size() == 0) {
         return;
     }
+
+    // onMatchedLogEventInternal holds the write lock and is thread-safe.
+    for (const auto& data : allData) {
+        onMatchedLogEvent(0, *data, false);
+    }
+    // flushIfNeeded holds the write lock and is thread-safe.
+    flushIfNeeded(eventTime);
+}
+
+bool ValueMetricProducer::IsConditionMet() const {
+    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
+    return mCondition == true || !mMetric.has_condition();
 }
 
 void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
-    AutoMutex _l(mLock);
-    if (mCondition == true || !mMetric.has_condition()) {
+    if (IsConditionMet()) {
         if (allData.size() == 0) {
             return;
         }
@@ -242,6 +258,7 @@
 }
 
 bool ValueMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
+    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     // ===========GuardRail==============
     // 1. Report the tuple count if the tuple count > soft limit
     if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) {
@@ -262,58 +279,75 @@
     return false;
 }
 
-void ValueMetricProducer::onMatchedLogEventInternal(
-        const size_t matcherIndex, const HashableDimensionKey& eventKey,
-        const map<string, HashableDimensionKey>& conditionKey, bool condition,
-        const LogEvent& event, bool scheduledPull) {
-    uint64_t eventTimeNs = event.GetTimestampNs();
+void ValueMetricProducer::onMatchedLogEventInternal_pull(const uint64_t& eventTimeNs,
+                                                         const HashableDimensionKey& eventKey,
+                                                         const long& value, bool scheduledPull) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+
     if (eventTimeNs < mCurrentBucketStartTimeNs) {
         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
              (long long)mCurrentBucketStartTimeNs);
         return;
     }
-
-    if (hitGuardRail(eventKey)) {
-        return;
-    }
     Interval& interval = mCurrentSlicedBucket[eventKey];
-
-    long value = get_value(event);
-
-    if (mPullTagId != -1) {
-        if (scheduledPull) {
-            // scheduled pull always sets beginning of current bucket and end
-            // of next bucket
-            if (interval.raw.size() > 0) {
-                interval.raw.back().second = value;
-            } else {
-                interval.raw.push_back(make_pair(value, value));
-            }
-            Interval& nextInterval = mNextSlicedBucket[eventKey];
-            if (nextInterval.raw.size() == 0) {
-                nextInterval.raw.push_back(make_pair(value, 0));
-            } else {
-                nextInterval.raw.front().first = value;
-            }
+    if (scheduledPull) {
+        // scheduled pull always sets beginning of current bucket and end
+        // of next bucket
+        if (interval.raw.size() > 0) {
+            interval.raw.back().second = value;
         } else {
-            if (mCondition == true) {
-                interval.raw.push_back(make_pair(value, 0));
-            } else {
-                if (interval.raw.size() != 0) {
-                    interval.raw.back().second = value;
-                } else {
-                    interval.tainted = true;
-                    VLOG("Data on condition true missing!");
-                }
-            }
+            interval.raw.push_back(make_pair(value, value));
+        }
+        Interval& nextInterval = mNextSlicedBucket[eventKey];
+        if (nextInterval.raw.size() == 0) {
+            nextInterval.raw.push_back(make_pair(value, 0));
+        } else {
+            nextInterval.raw.front().first = value;
         }
     } else {
-        flushIfNeeded(eventTimeNs);
-        interval.raw.push_back(make_pair(value, 0));
+        if (mCondition == true) {
+            interval.raw.push_back(make_pair(value, 0));
+        } else {
+            if (interval.raw.size() != 0) {
+                interval.raw.back().second = value;
+            } else {
+                interval.tainted = true;
+                VLOG("Data on condition true missing!");
+            }
+        }
     }
 }
 
-long ValueMetricProducer::get_value(const LogEvent& event) {
+void ValueMetricProducer::onMatchedLogEventInternal_push(const uint64_t& eventTimeNs,
+                                                         const HashableDimensionKey& eventKey,
+                                                         const long& value) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+    if (eventTimeNs < mCurrentBucketStartTimeNs) {
+        VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
+             (long long)mCurrentBucketStartTimeNs);
+        return;
+    }
+    mCurrentSlicedBucket[eventKey].raw.push_back(make_pair(value, 0));
+}
+
+void ValueMetricProducer::onMatchedLogEventInternal(
+        const size_t matcherIndex, const HashableDimensionKey& eventKey,
+        const map<string, HashableDimensionKey>& conditionKey, bool condition,
+        const LogEvent& event, bool scheduledPull) {
+    uint64_t eventTimeNs = event.GetTimestampNs();
+    long value = get_value(event);
+    if (hitGuardRail(eventKey)) {
+        return;
+    }
+    if (mPullTagId != -1) {
+        onMatchedLogEventInternal_pull(eventTimeNs, eventKey, value, scheduledPull);
+    } else {
+        flushIfNeeded(eventTimeNs);
+        onMatchedLogEventInternal_push(eventTimeNs, eventKey, value);
+    }
+}
+
+long ValueMetricProducer::get_value(const LogEvent& event) const {
     status_t err = NO_ERROR;
     long val = event.GetLong(mMetric.value_field(), &err);
     if (err == NO_ERROR) {
@@ -325,6 +359,7 @@
 }
 
 void ValueMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTimeNs) {
         VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
              (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
@@ -373,6 +408,7 @@
 }
 
 size_t ValueMetricProducer::byteSize() const {
+    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     size_t totalSize = 0;
     for (const auto& pair : mPastBuckets) {
         totalSize += pair.second.size() * kBucketSize;