1. Enable FIELD_TYPE IDs defined in ProtoOutputStream
2. Migrate CountMetricProducer to use ProtoOutputStream
Test: statsd, statsd_test
Change-Id: I33a1ea77a49b045818a48923b2263cb594ab0013
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.cpp b/cmds/statsd/src/metrics/CountMetricProducer.cpp
index 9f8558d..71cb771 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/CountMetricProducer.cpp
@@ -24,6 +24,8 @@
#include <limits.h>
#include <stdlib.h>
+using namespace android::util;
+using android::util::ProtoOutputStream;
using std::map;
using std::string;
using std::unordered_map;
@@ -33,6 +35,27 @@
namespace os {
namespace statsd {
+// for StatsLogReport
+const int FIELD_ID_METRIC_ID = 1;
+const int FIELD_ID_START_REPORT_NANOS = 2;
+const int FIELD_ID_END_REPORT_NANOS = 3;
+const int FIELD_ID_COUNT_METRICS = 5;
+// for CountMetricDataWrapper
+const int FIELD_ID_DATA = 1;
+// for CountMetricData
+const int FIELD_ID_DIMENSION = 1;
+const int FIELD_ID_BUCKET_INFO = 2;
+// for KeyValuePair
+const int FIELD_ID_KEY = 1;
+const int FIELD_ID_VALUE_STR = 2;
+const int FIELD_ID_VALUE_INT = 3;
+const int FIELD_ID_VALUE_BOOL = 4;
+const int FIELD_ID_VALUE_FLOAT = 5;
+// for CountBucketInfo
+const int FIELD_ID_START_BUCKET_NANOS = 1;
+const int FIELD_ID_END_BUCKET_NANOS = 2;
+const int FIELD_ID_COUNT = 3;
+
// TODO: add back AnomalyTracker.
CountMetricProducer::CountMetricProducer(const CountMetric& metric, const int conditionIndex,
const sp<ConditionWizard>& wizard)
@@ -66,6 +89,8 @@
mConditionSliced = true;
}
+ startNewProtoOutputStream(mStartTimeNs);
+
VLOG("metric %lld created. bucket size %lld start_time: %lld", metric.metric_id(),
(long long)mBucketSizeNs, (long long)mStartTimeNs);
}
@@ -74,23 +99,14 @@
VLOG("~CountMetricProducer() called");
}
-void CountMetricProducer::finish() {
- // TODO: write the StatsLogReport to dropbox using
- // DropboxWriter.
+void CountMetricProducer::startNewProtoOutputStream(long long startTime) {
+ mProto = std::make_unique<ProtoOutputStream>();
+ mProto->write(FIELD_TYPE_INT32 | FIELD_ID_METRIC_ID, mMetric.metric_id());
+ mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
+ mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_COUNT_METRICS);
}
-static void addSlicedCounterToReport(StatsLogReport_CountMetricDataWrapper& wrapper,
- const vector<KeyValuePair>& key,
- const vector<CountBucketInfo>& buckets) {
- CountMetricData* data = wrapper.add_data();
- for (const auto& kv : key) {
- data->add_dimension()->CopyFrom(kv);
- }
- for (const auto& bucket : buckets) {
- data->add_bucket_info()->CopyFrom(bucket);
- VLOG("\t bucket [%lld - %lld] count: %lld", bucket.start_bucket_nanos(),
- bucket.end_bucket_nanos(), bucket.count());
- }
+void CountMetricProducer::finish() {
}
void CountMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
@@ -98,33 +114,81 @@
}
StatsLogReport CountMetricProducer::onDumpReport() {
- VLOG("metric %lld dump report now...", mMetric.metric_id());
-
- StatsLogReport report;
- report.set_metric_id(mMetric.metric_id());
- report.set_start_report_nanos(mStartTimeNs);
+ long long endTime = time(nullptr) * NANO_SECONDS_IN_A_SECOND;
// Dump current bucket if it's stale.
// If current bucket is still on-going, don't force dump current bucket.
// In finish(), We can force dump current bucket.
- flushCounterIfNeeded(time(nullptr) * NANO_SECONDS_IN_A_SECOND);
- report.set_end_report_nanos(mCurrentBucketStartTimeNs);
+ flushCounterIfNeeded(endTime);
- StatsLogReport_CountMetricDataWrapper* wrapper = report.mutable_count_metrics();
-
- for (const auto& pair : mPastBuckets) {
- const HashableDimensionKey& hashableKey = pair.first;
+ for (const auto& counter : mPastBucketProtos) {
+ const HashableDimensionKey& hashableKey = counter.first;
auto it = mDimensionKeyMap.find(hashableKey);
if (it == mDimensionKeyMap.end()) {
ALOGE("Dimension key %s not found?!?! skip...", hashableKey.c_str());
continue;
}
+ long long wrapperToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DATA);
- VLOG(" dimension key %s", hashableKey.c_str());
- addSlicedCounterToReport(*wrapper, it->second, pair.second);
+ // First fill dimension (KeyValuePairs).
+ for (const auto& kv : it->second) {
+ long long dimensionToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION);
+ mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
+ if (kv.has_value_str()) {
+ mProto->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_STR, kv.value_str());
+ } else if (kv.has_value_int()) {
+ mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
+ } else if (kv.has_value_bool()) {
+ mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
+ } else if (kv.has_value_float()) {
+ mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
+ }
+ mProto->end(dimensionToken);
+ }
+
+ // Then fill bucket_info (CountBucketInfo).
+ for (const auto& proto : counter.second) {
+ size_t bufferSize = proto->size();
+ char* buffer(new char[bufferSize]);
+ size_t pos = 0;
+ auto it = proto->data();
+ while (it.readBuffer() != NULL) {
+ size_t toRead = it.currentToRead();
+ std::memcpy(&buffer[pos], it.readBuffer(), toRead);
+ pos += toRead;
+ it.rp()->move(toRead);
+ }
+ mProto->write(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION, buffer, bufferSize);
+ }
+
+ mProto->end(wrapperToken);
}
- return report;
- // TODO: Clear mPastBuckets, mDimensionKeyMap once the report is dumped.
+
+ mProto->end(mProtoToken);
+ mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
+ (long long)mCurrentBucketStartTimeNs);
+
+ size_t bufferSize = mProto->size();
+ VLOG("metric %lld dump report now...", mMetric.metric_id());
+ std::unique_ptr<uint8_t[]> buffer(new uint8_t[bufferSize]);
+ size_t pos = 0;
+ auto it = mProto->data();
+ while (it.readBuffer() != NULL) {
+ size_t toRead = it.currentToRead();
+ std::memcpy(&buffer[pos], it.readBuffer(), toRead);
+ pos += toRead;
+ it.rp()->move(toRead);
+ }
+
+ startNewProtoOutputStream(endTime);
+ mPastBucketProtos.clear();
+ mByteSize = 0;
+
+ // TODO: Once we migrate all MetricProducers to use ProtoOutputStream, we should return this:
+ // return std::move(buffer);
+ return StatsLogReport();
+
+ // TODO: Clear mDimensionKeyMap once the report is dumped.
}
void CountMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
@@ -175,15 +239,17 @@
// adjust the bucket start time
int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs;
- CountBucketInfo info;
- info.set_start_bucket_nanos(mCurrentBucketStartTimeNs);
- info.set_end_bucket_nanos(mCurrentBucketStartTimeNs + mBucketSizeNs);
-
for (const auto& counter : mCurrentSlicedCounter) {
- info.set_count(counter.second);
- // it will auto create new vector of CountbucketInfo if the key is not found.
- auto& bucketList = mPastBuckets[counter.first];
- bucketList.push_back(info);
+ unique_ptr<ProtoOutputStream> proto = make_unique<ProtoOutputStream>();
+ proto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
+ (long long)mCurrentBucketStartTimeNs);
+ proto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
+ (long long)mCurrentBucketStartTimeNs + mBucketSizeNs);
+ proto->write(FIELD_TYPE_INT64 | FIELD_ID_COUNT, (long long)counter.second);
+
+ auto& bucketList = mPastBucketProtos[counter.first];
+ bucketList.push_back(std::move(proto));
+ mByteSize += proto->size();
VLOG("metric %lld, dump key value: %s -> %d", mMetric.metric_id(), counter.first.c_str(),
counter.second);
@@ -202,11 +268,11 @@
(long long)mCurrentBucketStartTimeNs);
}
+// Rough estimate of CountMetricProducer buffer stored. This number will be
+// greater than actual data size as it contains each dimension of
+// CountMetricData is duplicated.
size_t CountMetricProducer::byteSize() {
-// TODO: return actual proto size when ProtoOutputStream is ready for use for
-// CountMetricsProducer.
-// return mProto->size();
- return 0;
+ return mByteSize;
}
} // namespace statsd