Only create ProtoOutputStream when onGetData() is called.

The exception is EventMetricProducer. Each EventMetricProducer will still have a ProtoOutputStream
Because LogEvent comes as a fixed 4K, it's more memory efficient to have an 8k ProtoOutputStream for
storing the events.

Also removed finish() api in MetricProducer, which was intended to use with Dropbox.

Test: statsd_test & dogfood app
Bug: 70393808
Change-Id: I2efe4ecc76a88060a9aa5eb49d1fa6ea60bc5da8
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
index cedea30..9920f65 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
@@ -93,8 +93,6 @@
         mConditionSliced = true;
     }
 
-    startNewProtoOutputStreamLocked(mStartTimeNs);
-
     VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
          (long long)mBucketSizeNs, (long long)mStartTimeNs);
 }
@@ -114,13 +112,6 @@
     return new AnomalyTracker(alert, mConfigKey);
 }
 
-void DurationMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
-    mProto = std::make_unique<ProtoOutputStream>();
-    mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
-    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
-    mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DURATION_METRICS);
-}
-
 unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker(
         const HashableDimensionKey& eventKey) const {
     switch (mMetric.aggregation_type()) {
@@ -135,11 +126,6 @@
     }
 }
 
-void DurationMetricProducer::finish() {
-    // TODO: write the StatsLogReport to dropbox using
-    // DropboxWriter.
-}
-
 void DurationMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
     VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
     flushIfNeededLocked(eventTime);
@@ -161,13 +147,14 @@
     }
 }
 
-std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReportLocked() {
-    long long endTime = time(nullptr) * NS_PER_SEC;
+void DurationMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
+                                                ProtoOutputStream* protoOutput) {
+    flushIfNeededLocked(dumpTimeNs);
 
-    // Dump current bucket if it's stale.
-    // If current bucket is still on-going, don't force dump current bucket.
-    // In finish(), We can force dump current bucket.
-    flushIfNeededLocked(endTime);
+    protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
+    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs);
+    long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DURATION_METRICS);
+
     VLOG("metric %s dump report now...", mMetric.name().c_str());
 
     for (const auto& pair : mPastBuckets) {
@@ -180,49 +167,46 @@
         }
 
         long long wrapperToken =
-                mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
+                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
 
         // First fill dimension (KeyValuePairs).
         for (const auto& kv : it->second) {
-            long long dimensionToken =
-                    mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
-            mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
+            long long dimensionToken = protoOutput->start(
+                    FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
+            protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
             if (kv.has_value_str()) {
-                mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
+                protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
             } else if (kv.has_value_int()) {
-                mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
+                protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
             } else if (kv.has_value_bool()) {
-                mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
+                protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
             } else if (kv.has_value_float()) {
-                mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
+                protoOutput->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
             }
-            mProto->end(dimensionToken);
+            protoOutput->end(dimensionToken);
         }
 
         // Then fill bucket_info (DurationBucketInfo).
         for (const auto& bucket : pair.second) {
-            long long bucketInfoToken =
-                    mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
-            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
-                          (long long)bucket.mBucketStartNs);
-            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
-                          (long long)bucket.mBucketEndNs);
-            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_DURATION, (long long)bucket.mDuration);
-            mProto->end(bucketInfoToken);
+            long long bucketInfoToken = protoOutput->start(
+                    FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
+            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
+                               (long long)bucket.mBucketStartNs);
+            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
+                               (long long)bucket.mBucketEndNs);
+            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DURATION, (long long)bucket.mDuration);
+            protoOutput->end(bucketInfoToken);
             VLOG("\t bucket [%lld - %lld] duration: %lld", (long long)bucket.mBucketStartNs,
                  (long long)bucket.mBucketEndNs, (long long)bucket.mDuration);
         }
 
-        mProto->end(wrapperToken);
+        protoOutput->end(wrapperToken);
     }
 
-    mProto->end(mProtoToken);
-    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
-                  (long long)mCurrentBucketStartTimeNs);
-    std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
-    startNewProtoOutputStreamLocked(endTime);
+    protoOutput->end(protoToken);
+    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
     mPastBuckets.clear();
-    return buffer;
+    mStartTimeNs = mCurrentBucketStartTimeNs;
 }
 
 void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTime) {