1. Enable FIELD_TYPE IDs defined in ProtoOutputStream
2. Migrate CountMetricProducer to use ProtoOutputStream

Test: statsd, statsd_test
Change-Id: I33a1ea77a49b045818a48923b2263cb594ab0013
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.cpp b/cmds/statsd/src/metrics/CountMetricProducer.cpp
index 9f8558d..71cb771 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/CountMetricProducer.cpp
@@ -24,6 +24,8 @@
 #include <limits.h>
 #include <stdlib.h>
 
+using namespace android::util;
+using android::util::ProtoOutputStream;
 using std::map;
 using std::string;
 using std::unordered_map;
@@ -33,6 +35,27 @@
 namespace os {
 namespace statsd {
 
+// for StatsLogReport
+const int FIELD_ID_METRIC_ID = 1;
+const int FIELD_ID_START_REPORT_NANOS = 2;
+const int FIELD_ID_END_REPORT_NANOS = 3;
+const int FIELD_ID_COUNT_METRICS = 5;
+// for CountMetricDataWrapper
+const int FIELD_ID_DATA = 1;
+// for CountMetricData
+const int FIELD_ID_DIMENSION = 1;
+const int FIELD_ID_BUCKET_INFO = 2;
+// for KeyValuePair
+const int FIELD_ID_KEY = 1;
+const int FIELD_ID_VALUE_STR = 2;
+const int FIELD_ID_VALUE_INT = 3;
+const int FIELD_ID_VALUE_BOOL = 4;
+const int FIELD_ID_VALUE_FLOAT = 5;
+// for CountBucketInfo
+const int FIELD_ID_START_BUCKET_NANOS = 1;
+const int FIELD_ID_END_BUCKET_NANOS = 2;
+const int FIELD_ID_COUNT = 3;
+
 // TODO: add back AnomalyTracker.
 CountMetricProducer::CountMetricProducer(const CountMetric& metric, const int conditionIndex,
                                          const sp<ConditionWizard>& wizard)
@@ -66,6 +89,8 @@
         mConditionSliced = true;
     }
 
+    startNewProtoOutputStream(mStartTimeNs);
+
     VLOG("metric %lld created. bucket size %lld start_time: %lld", metric.metric_id(),
          (long long)mBucketSizeNs, (long long)mStartTimeNs);
 }
@@ -74,23 +99,14 @@
     VLOG("~CountMetricProducer() called");
 }
 
-void CountMetricProducer::finish() {
-    // TODO: write the StatsLogReport to dropbox using
-    // DropboxWriter.
+void CountMetricProducer::startNewProtoOutputStream(long long startTime) {
+    mProto = std::make_unique<ProtoOutputStream>();
+    mProto->write(FIELD_TYPE_INT32 | FIELD_ID_METRIC_ID, mMetric.metric_id());
+    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
+    mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_COUNT_METRICS);
 }
 
-static void addSlicedCounterToReport(StatsLogReport_CountMetricDataWrapper& wrapper,
-                                     const vector<KeyValuePair>& key,
-                                     const vector<CountBucketInfo>& buckets) {
-    CountMetricData* data = wrapper.add_data();
-    for (const auto& kv : key) {
-        data->add_dimension()->CopyFrom(kv);
-    }
-    for (const auto& bucket : buckets) {
-        data->add_bucket_info()->CopyFrom(bucket);
-        VLOG("\t bucket [%lld - %lld] count: %lld", bucket.start_bucket_nanos(),
-             bucket.end_bucket_nanos(), bucket.count());
-    }
+void CountMetricProducer::finish() {
 }
 
 void CountMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
@@ -98,33 +114,81 @@
 }
 
 StatsLogReport CountMetricProducer::onDumpReport() {
-    VLOG("metric %lld dump report now...", mMetric.metric_id());
-
-    StatsLogReport report;
-    report.set_metric_id(mMetric.metric_id());
-    report.set_start_report_nanos(mStartTimeNs);
+    long long endTime = time(nullptr) * NANO_SECONDS_IN_A_SECOND;
 
     // Dump current bucket if it's stale.
     // If current bucket is still on-going, don't force dump current bucket.
     // In finish(), We can force dump current bucket.
-    flushCounterIfNeeded(time(nullptr) * NANO_SECONDS_IN_A_SECOND);
-    report.set_end_report_nanos(mCurrentBucketStartTimeNs);
+    flushCounterIfNeeded(endTime);
 
-    StatsLogReport_CountMetricDataWrapper* wrapper = report.mutable_count_metrics();
-
-    for (const auto& pair : mPastBuckets) {
-        const HashableDimensionKey& hashableKey = pair.first;
+    for (const auto& counter : mPastBucketProtos) {
+        const HashableDimensionKey& hashableKey = counter.first;
         auto it = mDimensionKeyMap.find(hashableKey);
         if (it == mDimensionKeyMap.end()) {
             ALOGE("Dimension key %s not found?!?! skip...", hashableKey.c_str());
             continue;
         }
+        long long wrapperToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DATA);
 
-        VLOG("  dimension key %s", hashableKey.c_str());
-        addSlicedCounterToReport(*wrapper, it->second, pair.second);
+        // First fill dimension (KeyValuePairs).
+        for (const auto& kv : it->second) {
+            long long dimensionToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION);
+            mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
+            if (kv.has_value_str()) {
+                mProto->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_STR, kv.value_str());
+            } else if (kv.has_value_int()) {
+                mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
+            } else if (kv.has_value_bool()) {
+                mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
+            } else if (kv.has_value_float()) {
+                mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
+            }
+            mProto->end(dimensionToken);
+        }
+
+        // Then fill bucket_info (CountBucketInfo).
+        for (const auto& proto : counter.second) {
+            size_t bufferSize = proto->size();
+            char* buffer(new char[bufferSize]);
+            size_t pos = 0;
+            auto it = proto->data();
+            while (it.readBuffer() != NULL) {
+                size_t toRead = it.currentToRead();
+                std::memcpy(&buffer[pos], it.readBuffer(), toRead);
+                pos += toRead;
+                it.rp()->move(toRead);
+            }
+            mProto->write(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION, buffer, bufferSize);
+        }
+
+        mProto->end(wrapperToken);
     }
-    return report;
-    // TODO: Clear mPastBuckets, mDimensionKeyMap once the report is dumped.
+
+    mProto->end(mProtoToken);
+    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
+                  (long long)mCurrentBucketStartTimeNs);
+
+    size_t bufferSize = mProto->size();
+    VLOG("metric %lld dump report now...", mMetric.metric_id());
+    std::unique_ptr<uint8_t[]> buffer(new uint8_t[bufferSize]);
+    size_t pos = 0;
+    auto it = mProto->data();
+    while (it.readBuffer() != NULL) {
+        size_t toRead = it.currentToRead();
+        std::memcpy(&buffer[pos], it.readBuffer(), toRead);
+        pos += toRead;
+        it.rp()->move(toRead);
+    }
+
+    startNewProtoOutputStream(endTime);
+    mPastBucketProtos.clear();
+    mByteSize = 0;
+
+    // TODO: Once we migrate all MetricProducers to use ProtoOutputStream, we should return this:
+    // return std::move(buffer);
+    return StatsLogReport();
+
+    // TODO: Clear mDimensionKeyMap once the report is dumped.
 }
 
 void CountMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
@@ -175,15 +239,17 @@
     // adjust the bucket start time
     int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs;
 
-    CountBucketInfo info;
-    info.set_start_bucket_nanos(mCurrentBucketStartTimeNs);
-    info.set_end_bucket_nanos(mCurrentBucketStartTimeNs + mBucketSizeNs);
-
     for (const auto& counter : mCurrentSlicedCounter) {
-        info.set_count(counter.second);
-        // it will auto create new vector of CountbucketInfo if the key is not found.
-        auto& bucketList = mPastBuckets[counter.first];
-        bucketList.push_back(info);
+        unique_ptr<ProtoOutputStream> proto = make_unique<ProtoOutputStream>();
+        proto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
+                     (long long)mCurrentBucketStartTimeNs);
+        proto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
+                      (long long)mCurrentBucketStartTimeNs + mBucketSizeNs);
+        proto->write(FIELD_TYPE_INT64 | FIELD_ID_COUNT, (long long)counter.second);
+
+        auto& bucketList = mPastBucketProtos[counter.first];
+        bucketList.push_back(std::move(proto));
+        mByteSize += proto->size();
 
         VLOG("metric %lld, dump key value: %s -> %d", mMetric.metric_id(), counter.first.c_str(),
              counter.second);
@@ -202,11 +268,11 @@
          (long long)mCurrentBucketStartTimeNs);
 }
 
+// Rough estimate of CountMetricProducer buffer stored. This number will be
+// greater than actual data size as it contains each dimension of
+// CountMetricData is  duplicated.
 size_t CountMetricProducer::byteSize() {
-// TODO: return actual proto size when ProtoOutputStream is ready for use for
-// CountMetricsProducer.
-//    return mProto->size();
-    return 0;
+    return mByteSize;
 }
 
 }  // namespace statsd