1/ Only expose thread-safe interfaces in metric producer.
2/ Simplify lock logic.
3/ Add test for duration metric producer.
Test: all unit test passsed.
Change-Id: If6ee2e69a17f12406f4b3ea3553b14642cd636d6
diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h
index b22ff6f..5df712c 100644
--- a/cmds/statsd/src/metrics/MetricProducer.h
+++ b/cmds/statsd/src/metrics/MetricProducer.h
@@ -17,6 +17,8 @@
#ifndef METRIC_PRODUCER_H
#define METRIC_PRODUCER_H
+#include <shared_mutex>
+
#include "anomaly/AnomalyTracker.h"
#include "condition/ConditionWizard.h"
#include "config/ConfigKey.h"
@@ -52,39 +54,61 @@
virtual ~MetricProducer(){};
// Consume the parsed stats log entry that already matched the "what" of the metric.
- void onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event, bool scheduledPull);
+ void onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event, bool scheduledPull) {
+ std::lock_guard<std::mutex> lock(mMutex);
+ onMatchedLogEventLocked(matcherIndex, event, scheduledPull);
+ }
- virtual void onConditionChanged(const bool condition, const uint64_t eventTime) = 0;
+ void onConditionChanged(const bool condition, const uint64_t eventTime) {
+ std::lock_guard<std::mutex> lock(mMutex);
+ onConditionChangedLocked(condition, eventTime);
+ }
- virtual void onSlicedConditionMayChange(const uint64_t eventTime) = 0;
+ void onSlicedConditionMayChange(const uint64_t eventTime) {
+ std::lock_guard<std::mutex> lock(mMutex);
+ onSlicedConditionMayChangeLocked(eventTime);
+ }
+
+ bool isConditionSliced() const {
+ std::lock_guard<std::mutex> lock(mMutex);
+ return mConditionSliced;
+ };
// This is called when the metric collecting is done, e.g., when there is a new configuration
// coming. MetricProducer should do the clean up, and dump existing data to dropbox.
virtual void finish() = 0;
- virtual void flushIfNeeded(const uint64_t newEventTime) = 0;
// TODO: Pass a timestamp as a parameter in onDumpReport and update all its
// implementations.
// onDumpReport returns the proto-serialized output and clears the previously stored contents.
- virtual std::unique_ptr<std::vector<uint8_t>> onDumpReport() = 0;
-
- virtual bool isConditionSliced() const {
- return mConditionSliced;
- };
+ std::unique_ptr<std::vector<uint8_t>> onDumpReport() {
+ std::lock_guard<std::mutex> lock(mMutex);
+ return onDumpReportLocked();
+ }
// Returns the memory in bytes currently used to store this metric's data. Does not change
// state.
- virtual size_t byteSize() const = 0;
+ size_t byteSize() const {
+ std::lock_guard<std::mutex> lock(mMutex);
+ return byteSizeLocked();
+ }
void addAnomalyTracker(sp<AnomalyTracker> tracker) {
+ std::lock_guard<std::mutex> lock(mMutex);
mAnomalyTrackers.push_back(tracker);
}
int64_t getBuckeSizeInNs() const {
+ std::lock_guard<std::mutex> lock(mMutex);
return mBucketSizeNs;
}
protected:
+ virtual void onConditionChangedLocked(const bool condition, const uint64_t eventTime) = 0;
+ virtual void onSlicedConditionMayChangeLocked(const uint64_t eventTime) = 0;
+ virtual std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() = 0;
+ virtual size_t byteSizeLocked() const = 0;
+
const ConfigKey mConfigKey;
const uint64_t mStartTimeNs;
@@ -128,18 +152,24 @@
* nonSlicedCondition.
* [event]: the log event, just in case the metric needs its data, e.g., EventMetric.
*/
- virtual void onMatchedLogEventInternal(
+ virtual void onMatchedLogEventInternalLocked(
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
const LogEvent& event, bool scheduledPull) = 0;
+ // Consume the parsed stats log entry that already matched the "what" of the metric.
+ void onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event,
+ bool scheduledPull);
+
std::unique_ptr<android::util::ProtoOutputStream> mProto;
long long mProtoToken;
- virtual void startNewProtoOutputStream(long long timestamp) = 0;
+ // Read/Write mutex to make the producer thread-safe.
+ // TODO(yanglu): replace with std::shared_mutex when available in libc++.
+ mutable std::mutex mMutex;
- std::unique_ptr<std::vector<uint8_t>> serializeProto();
+ std::unique_ptr<std::vector<uint8_t>> serializeProtoLocked();
};
} // namespace statsd