1/ Only expose thread-safe interfaces in metric producer.
2/ Simplify lock logic.
3/ Add test for duration metric producer.

Test: all unit test passsed.
Change-Id: If6ee2e69a17f12406f4b3ea3553b14642cd636d6
diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h
index b22ff6f..5df712c 100644
--- a/cmds/statsd/src/metrics/MetricProducer.h
+++ b/cmds/statsd/src/metrics/MetricProducer.h
@@ -17,6 +17,8 @@
 #ifndef METRIC_PRODUCER_H
 #define METRIC_PRODUCER_H
 
+#include <shared_mutex>
+
 #include "anomaly/AnomalyTracker.h"
 #include "condition/ConditionWizard.h"
 #include "config/ConfigKey.h"
@@ -52,39 +54,61 @@
     virtual ~MetricProducer(){};
 
     // Consume the parsed stats log entry that already matched the "what" of the metric.
-    void onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event, bool scheduledPull);
+    void onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event, bool scheduledPull) {
+        std::lock_guard<std::mutex> lock(mMutex);
+        onMatchedLogEventLocked(matcherIndex, event, scheduledPull);
+    }
 
-    virtual void onConditionChanged(const bool condition, const uint64_t eventTime) = 0;
+    void onConditionChanged(const bool condition, const uint64_t eventTime) {
+        std::lock_guard<std::mutex> lock(mMutex);
+        onConditionChangedLocked(condition, eventTime);
+    }
 
-    virtual void onSlicedConditionMayChange(const uint64_t eventTime) = 0;
+    void onSlicedConditionMayChange(const uint64_t eventTime) {
+        std::lock_guard<std::mutex> lock(mMutex);
+        onSlicedConditionMayChangeLocked(eventTime);
+    }
+
+    bool isConditionSliced() const {
+        std::lock_guard<std::mutex> lock(mMutex);
+        return mConditionSliced;
+    };
 
     // This is called when the metric collecting is done, e.g., when there is a new configuration
     // coming. MetricProducer should do the clean up, and dump existing data to dropbox.
     virtual void finish() = 0;
-    virtual void flushIfNeeded(const uint64_t newEventTime) = 0;
 
     // TODO: Pass a timestamp as a parameter in onDumpReport and update all its
     // implementations.
     // onDumpReport returns the proto-serialized output and clears the previously stored contents.
-    virtual std::unique_ptr<std::vector<uint8_t>> onDumpReport() = 0;
-
-    virtual bool isConditionSliced() const {
-        return mConditionSliced;
-    };
+    std::unique_ptr<std::vector<uint8_t>> onDumpReport() {
+        std::lock_guard<std::mutex> lock(mMutex);
+        return onDumpReportLocked();
+    }
 
     // Returns the memory in bytes currently used to store this metric's data. Does not change
     // state.
-    virtual size_t byteSize() const = 0;
+    size_t byteSize() const {
+        std::lock_guard<std::mutex> lock(mMutex);
+        return byteSizeLocked();
+    }
 
     void addAnomalyTracker(sp<AnomalyTracker> tracker) {
+        std::lock_guard<std::mutex> lock(mMutex);
         mAnomalyTrackers.push_back(tracker);
     }
 
     int64_t getBuckeSizeInNs() const {
+        std::lock_guard<std::mutex> lock(mMutex);
         return mBucketSizeNs;
     }
 
 protected:
+    virtual void onConditionChangedLocked(const bool condition, const uint64_t eventTime) = 0;
+    virtual void onSlicedConditionMayChangeLocked(const uint64_t eventTime) = 0;
+    virtual std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() = 0;
+    virtual size_t byteSizeLocked() const = 0;
+
     const ConfigKey mConfigKey;
 
     const uint64_t mStartTimeNs;
@@ -128,18 +152,24 @@
      *              nonSlicedCondition.
      * [event]: the log event, just in case the metric needs its data, e.g., EventMetric.
      */
-    virtual void onMatchedLogEventInternal(
+    virtual void onMatchedLogEventInternalLocked(
             const size_t matcherIndex, const HashableDimensionKey& eventKey,
             const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
             const LogEvent& event, bool scheduledPull) = 0;
 
+    // Consume the parsed stats log entry that already matched the "what" of the metric.
+    void onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event,
+                                 bool scheduledPull);
+
     std::unique_ptr<android::util::ProtoOutputStream> mProto;
 
     long long mProtoToken;
 
-    virtual void startNewProtoOutputStream(long long timestamp) = 0;
+    // Read/Write mutex to make the producer thread-safe.
+    // TODO(yanglu): replace with std::shared_mutex when available in libc++.
+    mutable std::mutex mMutex;
 
-    std::unique_ptr<std::vector<uint8_t>> serializeProto();
+    std::unique_ptr<std::vector<uint8_t>> serializeProtoLocked();
 };
 
 }  // namespace statsd