Revert "Thread-safe metric producers."
This reverts commit 8de6939c494da838f6dbbda0631f66425dbbd25b.
Change-Id: Ieae841bfc5339b569f0fca909a6066de72806617
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
index 3b49b9a..a0374c0 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
@@ -104,7 +104,6 @@
}
void DurationMetricProducer::startNewProtoOutputStream(long long startTime) {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
mProto = std::make_unique<ProtoOutputStream>();
mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -112,7 +111,7 @@
}
unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker(
- const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) const {
+ const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) {
switch (mMetric.aggregation_type()) {
case DurationMetric_AggregationType_SUM:
return make_unique<OringDurationTracker>(
@@ -131,7 +130,6 @@
}
void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
flushIfNeeded(eventTime);
// Now for each of the on-going event, check if the condition has changed for them.
@@ -141,7 +139,6 @@
}
void DurationMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
mCondition = conditionMet;
flushIfNeeded(eventTime);
@@ -152,8 +149,15 @@
}
}
-void DurationMetricProducer::SerializeBuckets() {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() {
+ long long endTime = time(nullptr) * NS_PER_SEC;
+
+ // Dump current bucket if it's stale.
+ // If current bucket is still on-going, don't force dump current bucket.
+ // In finish(), We can force dump current bucket.
+ flushIfNeeded(endTime);
+ VLOG("metric %s dump report now...", mMetric.name().c_str());
+
for (const auto& pair : mPastBuckets) {
const HashableDimensionKey& hashableKey = pair.first;
VLOG(" dimension key %s", hashableKey.c_str());
@@ -210,29 +214,13 @@
mProto->end(mProtoToken);
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
(long long)mCurrentBucketStartTimeNs);
-}
-
-std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() {
- VLOG("metric %s dump report now...", mMetric.name().c_str());
-
- long long endTime = time(nullptr) * NS_PER_SEC;
-
- // Dump current bucket if it's stale.
- // If current bucket is still on-going, don't force dump current bucket.
- // In finish(), We can force dump current bucket.
- flushIfNeeded(endTime);
-
- SerializeBuckets();
-
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
-
startNewProtoOutputStream(endTime);
// TODO: Properly clear the old buckets.
return buffer;
}
void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) {
return;
}
@@ -252,7 +240,6 @@
}
bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
- std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
// the key is not new, we are good.
if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) {
return false;
@@ -278,37 +265,32 @@
const LogEvent& event, bool scheduledPull) {
flushIfNeeded(event.GetTimestampNs());
- // TODO(yanglu): move the following logic to a seperate function to make it lockable.
- {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
- if (matcherIndex == mStopAllIndex) {
- for (auto& pair : mCurrentSlicedDuration) {
- pair.second->noteStopAll(event.GetTimestampNs());
- }
+ if (matcherIndex == mStopAllIndex) {
+ for (auto& pair : mCurrentSlicedDuration) {
+ pair.second->noteStopAll(event.GetTimestampNs());
+ }
+ return;
+ }
+
+ HashableDimensionKey atomKey = getHashableKey(getDimensionKey(event, mInternalDimension));
+
+ if (mCurrentSlicedDuration.find(eventKey) == mCurrentSlicedDuration.end()) {
+ if (hitGuardRail(eventKey)) {
return;
}
+ mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]);
+ }
- HashableDimensionKey atomKey = getHashableKey(getDimensionKey(event, mInternalDimension));
+ auto it = mCurrentSlicedDuration.find(eventKey);
- if (mCurrentSlicedDuration.find(eventKey) == mCurrentSlicedDuration.end()) {
- if (hitGuardRail(eventKey)) {
- return;
- }
- mCurrentSlicedDuration[eventKey] =
- createDurationTracker(eventKey, mPastBuckets[eventKey]);
- }
- auto it = mCurrentSlicedDuration.find(eventKey);
-
- if (matcherIndex == mStartIndex) {
- it->second->noteStart(atomKey, condition, event.GetTimestampNs(), conditionKeys);
- } else if (matcherIndex == mStopIndex) {
- it->second->noteStop(atomKey, event.GetTimestampNs(), false);
- }
+ if (matcherIndex == mStartIndex) {
+ it->second->noteStart(atomKey, condition, event.GetTimestampNs(), conditionKeys);
+ } else if (matcherIndex == mStopIndex) {
+ it->second->noteStop(atomKey, event.GetTimestampNs(), false);
}
}
size_t DurationMetricProducer::byteSize() const {
- std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
size_t totalSize = 0;
for (const auto& pair : mPastBuckets) {
totalSize += pair.second.size() * kBucketSize;