Only create ProtoOutputStream when onGetData() is called.
The exception is EventMetricProducer. Each EventMetricProducer will still have a ProtoOutputStream
Because LogEvent comes as a fixed 4K, it's more memory efficient to have an 8k ProtoOutputStream for
storing the events.
Also removed finish() api in MetricProducer, which was intended to use with Dropbox.
Test: statsd_test & dogfood app
Bug: 70393808
Change-Id: I2efe4ecc76a88060a9aa5eb49d1fa6ea60bc5da8
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
index cedea30..9920f65 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
@@ -93,8 +93,6 @@
mConditionSliced = true;
}
- startNewProtoOutputStreamLocked(mStartTimeNs);
-
VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
(long long)mBucketSizeNs, (long long)mStartTimeNs);
}
@@ -114,13 +112,6 @@
return new AnomalyTracker(alert, mConfigKey);
}
-void DurationMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
- mProto = std::make_unique<ProtoOutputStream>();
- mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
- mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DURATION_METRICS);
-}
-
unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker(
const HashableDimensionKey& eventKey) const {
switch (mMetric.aggregation_type()) {
@@ -135,11 +126,6 @@
}
}
-void DurationMetricProducer::finish() {
- // TODO: write the StatsLogReport to dropbox using
- // DropboxWriter.
-}
-
void DurationMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
flushIfNeededLocked(eventTime);
@@ -161,13 +147,14 @@
}
}
-std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReportLocked() {
- long long endTime = time(nullptr) * NS_PER_SEC;
+void DurationMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
+ ProtoOutputStream* protoOutput) {
+ flushIfNeededLocked(dumpTimeNs);
- // Dump current bucket if it's stale.
- // If current bucket is still on-going, don't force dump current bucket.
- // In finish(), We can force dump current bucket.
- flushIfNeededLocked(endTime);
+ protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs);
+ long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DURATION_METRICS);
+
VLOG("metric %s dump report now...", mMetric.name().c_str());
for (const auto& pair : mPastBuckets) {
@@ -180,49 +167,46 @@
}
long long wrapperToken =
- mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
// First fill dimension (KeyValuePairs).
for (const auto& kv : it->second) {
- long long dimensionToken =
- mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
- mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
+ long long dimensionToken = protoOutput->start(
+ FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
+ protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
if (kv.has_value_str()) {
- mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
+ protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
} else if (kv.has_value_int()) {
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
} else if (kv.has_value_bool()) {
- mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
+ protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
} else if (kv.has_value_float()) {
- mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
+ protoOutput->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
}
- mProto->end(dimensionToken);
+ protoOutput->end(dimensionToken);
}
// Then fill bucket_info (DurationBucketInfo).
for (const auto& bucket : pair.second) {
- long long bucketInfoToken =
- mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
- (long long)bucket.mBucketStartNs);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
- (long long)bucket.mBucketEndNs);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_DURATION, (long long)bucket.mDuration);
- mProto->end(bucketInfoToken);
+ long long bucketInfoToken = protoOutput->start(
+ FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
+ (long long)bucket.mBucketStartNs);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
+ (long long)bucket.mBucketEndNs);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DURATION, (long long)bucket.mDuration);
+ protoOutput->end(bucketInfoToken);
VLOG("\t bucket [%lld - %lld] duration: %lld", (long long)bucket.mBucketStartNs,
(long long)bucket.mBucketEndNs, (long long)bucket.mDuration);
}
- mProto->end(wrapperToken);
+ protoOutput->end(wrapperToken);
}
- mProto->end(mProtoToken);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
- (long long)mCurrentBucketStartTimeNs);
- std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
- startNewProtoOutputStreamLocked(endTime);
+ protoOutput->end(protoToken);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
mPastBuckets.clear();
- return buffer;
+ mStartTimeNs = mCurrentBucketStartTimeNs;
}
void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTime) {