Migrate all remaining MetricProducers to use ProtoOutputStream

Test: statsd, statsd_test
Change-Id: I1087e1c1ffb372ca288dfc575cb7a372b11ce8c5
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 07a078f..41b7a5d 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -23,6 +23,12 @@
 #include <limits.h>
 #include <stdlib.h>
 
+using android::util::FIELD_TYPE_BOOL;
+using android::util::FIELD_TYPE_FLOAT;
+using android::util::FIELD_TYPE_INT32;
+using android::util::FIELD_TYPE_INT64;
+using android::util::FIELD_TYPE_MESSAGE;
+using android::util::ProtoOutputStream;
 using std::list;
 using std::make_shared;
 using std::map;
@@ -34,6 +40,27 @@
 namespace os {
 namespace statsd {
 
+// for StatsLogReport
+const int FIELD_ID_METRIC_ID = 1;
+const int FIELD_ID_START_REPORT_NANOS = 2;
+const int FIELD_ID_END_REPORT_NANOS = 3;
+const int FIELD_ID_VALUE_METRICS = 7;
+// for ValueMetricDataWrapper
+const int FIELD_ID_DATA = 1;
+// for ValueMetricData
+const int FIELD_ID_DIMENSION = 1;
+const int FIELD_ID_BUCKET_INFO = 2;
+// for KeyValuePair
+const int FIELD_ID_KEY = 1;
+const int FIELD_ID_VALUE_STR = 2;
+const int FIELD_ID_VALUE_INT = 3;
+const int FIELD_ID_VALUE_BOOL = 4;
+const int FIELD_ID_VALUE_FLOAT = 5;
+// for ValueBucketInfo
+const int FIELD_ID_START_BUCKET_NANOS = 1;
+const int FIELD_ID_END_BUCKET_NANOS = 2;
+const int FIELD_ID_VALUE = 3;
+
 // ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
 ValueMetricProducer::ValueMetricProducer(const ValueMetric& metric, const int conditionIndex,
                                          const sp<ConditionWizard>& wizard, const int pullTagId,
@@ -55,6 +82,8 @@
                                              metric.bucket().bucket_size_millis());
     }
 
+    startNewProtoOutputStream(mStartTimeNs);
+
     VLOG("value metric %lld created. bucket size %lld start_time: %lld", metric.metric_id(),
          (long long)mBucketSizeNs, (long long)mStartTimeNs);
 }
@@ -63,25 +92,18 @@
     VLOG("~ValueMetricProducer() called");
 }
 
+void ValueMetricProducer::startNewProtoOutputStream(long long startTime) {
+    mProto = std::make_unique<ProtoOutputStream>();
+    mProto->write(FIELD_TYPE_INT32 | FIELD_ID_METRIC_ID, mMetric.metric_id());
+    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
+    mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);
+}
+
 void ValueMetricProducer::finish() {
     // TODO: write the StatsLogReport to dropbox using
     // DropboxWriter.
 }
 
-static void addSlicedCounterToReport(StatsLogReport_ValueMetricDataWrapper& wrapper,
-                                     const vector<KeyValuePair>& key,
-                                     const vector<ValueBucketInfo>& buckets) {
-    ValueMetricData* data = wrapper.add_data();
-    for (const auto& kv : key) {
-        data->add_dimension()->CopyFrom(kv);
-    }
-    for (const auto& bucket : buckets) {
-        data->add_bucket_info()->CopyFrom(bucket);
-        VLOG("\t bucket [%lld - %lld] value: %lld", bucket.start_bucket_nanos(),
-             bucket.end_bucket_nanos(), bucket.value());
-    }
-}
-
 void ValueMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
     VLOG("Metric %lld onSlicedConditionMayChange", mMetric.metric_id());
 }
@@ -89,26 +111,62 @@
 StatsLogReport ValueMetricProducer::onDumpReport() {
     VLOG("metric %lld dump report now...", mMetric.metric_id());
 
-    StatsLogReport report;
-    report.set_metric_id(mMetric.metric_id());
-    report.set_start_report_nanos(mStartTimeNs);
-    report.set_end_report_nanos(mCurrentBucketStartTimeNs);
-
-    StatsLogReport_ValueMetricDataWrapper* wrapper = report.mutable_value_metrics();
-
     for (const auto& pair : mPastBuckets) {
         const HashableDimensionKey& hashableKey = pair.first;
+        VLOG("  dimension key %s", hashableKey.c_str());
         auto it = mDimensionKeyMap.find(hashableKey);
         if (it == mDimensionKeyMap.end()) {
             ALOGE("Dimension key %s not found?!?! skip...", hashableKey.c_str());
             continue;
         }
+        long long wrapperToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DATA);
 
-        VLOG("  dimension key %s", hashableKey.c_str());
-        addSlicedCounterToReport(*wrapper, it->second, pair.second);
+        // First fill dimension (KeyValuePairs).
+        for (const auto& kv : it->second) {
+            long long dimensionToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION);
+            mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
+            if (kv.has_value_str()) {
+                mProto->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_STR, kv.value_str());
+            } else if (kv.has_value_int()) {
+                mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
+            } else if (kv.has_value_bool()) {
+                mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
+            } else if (kv.has_value_float()) {
+                mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
+            }
+            mProto->end(dimensionToken);
+        }
+
+        // Then fill bucket_info (ValueBucketInfo).
+        for (const auto& bucket : pair.second) {
+            long long bucketInfoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_BUCKET_INFO);
+            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
+                          (long long)bucket.mBucketStartNs);
+            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
+                          (long long)bucket.mBucketEndNs);
+            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE, (long long)bucket.mValue);
+            mProto->end(bucketInfoToken);
+            VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
+                 (long long)bucket.mBucketEndNs, (long long)bucket.mValue);
+        }
+        mProto->end(wrapperToken);
     }
-    return report;
-    // TODO: Clear mPastBuckets, mDimensionKeyMap once the report is dumped.
+    mProto->end(mProtoToken);
+    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
+                  (long long)mCurrentBucketStartTimeNs);
+
+    VLOG("metric %lld dump report now...", mMetric.metric_id());
+    std::unique_ptr<uint8_t[]> buffer = serializeProto();
+
+    startNewProtoOutputStream(time(nullptr) * NS_PER_SEC);
+    mPastBuckets.clear();
+    mByteSize = 0;
+
+    // TODO: Once we migrate all MetricProducers to use ProtoOutputStream, we should return this:
+    // return std::move(buffer);
+    return StatsLogReport();
+
+    // TODO: Clear mDimensionKeyMap once the report is dumped.
 }
 
 void ValueMetricProducer::onConditionChanged(const bool condition, const uint64_t eventTime) {
@@ -207,20 +265,21 @@
 
     VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
          (int)mCurrentSlicedBucket.size());
-    ValueBucketInfo info;
-    info.set_start_bucket_nanos(mCurrentBucketStartTimeNs);
-    info.set_end_bucket_nanos(mCurrentBucketStartTimeNs + mBucketSizeNs);
+    ValueBucket info;
+    info.mBucketStartNs = mCurrentBucketStartTimeNs;
+    info.mBucketEndNs = mCurrentBucketStartTimeNs + mBucketSizeNs;
 
     for (const auto& slice : mCurrentSlicedBucket) {
         long value = 0;
         for (const auto& pair : slice.second.raw) {
             value += pair.second - pair.first;
         }
-        info.set_value(value);
+        info.mValue = value;
         VLOG(" %s, %ld", slice.first.c_str(), value);
         // it will auto create new vector of ValuebucketInfo if the key is not found.
         auto& bucketList = mPastBuckets[slice.first];
         bucketList.push_back(info);
+        mByteSize += sizeof(info);
     }
 
     // Reset counters
@@ -235,6 +294,10 @@
          (long long)mCurrentBucketStartTimeNs);
 }
 
+size_t ValueMetricProducer::byteSize() {
+    return mByteSize;
+}
+
 }  // namespace statsd
 }  // namespace os
 }  // namespace android