Only create ProtoOutputStream when onGetData() is called.

The exception is EventMetricProducer. Each EventMetricProducer will still have a ProtoOutputStream
Because LogEvent comes as a fixed 4K, it's more memory efficient to have an 8k ProtoOutputStream for
storing the events.

Also removed finish() api in MetricProducer, which was intended to use with Dropbox.

Test: statsd_test & dogfood app
Bug: 70393808
Change-Id: I2efe4ecc76a88060a9aa5eb49d1fa6ea60bc5da8
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.cpp b/cmds/statsd/src/metrics/EventMetricProducer.cpp
index 8bdc9e3..217aff0 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/EventMetricProducer.cpp
@@ -62,7 +62,7 @@
         mConditionSliced = true;
     }
 
-    startNewProtoOutputStreamLocked(mStartTimeNs);
+    startNewProtoOutputStreamLocked();
 
     VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
          (long long)mBucketSizeNs, (long long)mStartTimeNs);
@@ -72,33 +72,45 @@
     VLOG("~EventMetricProducer() called");
 }
 
-void EventMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
+void EventMetricProducer::startNewProtoOutputStreamLocked() {
     mProto = std::make_unique<ProtoOutputStream>();
-    // TODO: We need to auto-generate the field IDs for StatsLogReport, EventMetricData,
-    // and StatsEvent.
-    mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
-    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
-    mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_EVENT_METRICS);
-}
-
-void EventMetricProducer::finish() {
 }
 
 void EventMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
 }
 
-std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReportLocked() {
-    long long endTime = time(nullptr) * NS_PER_SEC;
-    mProto->end(mProtoToken);
-    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime);
+std::unique_ptr<std::vector<uint8_t>> serializeProtoLocked(ProtoOutputStream& protoOutput) {
+    size_t bufferSize = protoOutput.size();
+
+    std::unique_ptr<std::vector<uint8_t>> buffer(new std::vector<uint8_t>(bufferSize));
+
+    size_t pos = 0;
+    auto it = protoOutput.data();
+    while (it.readBuffer() != NULL) {
+        size_t toRead = it.currentToRead();
+        std::memcpy(&((*buffer)[pos]), it.readBuffer(), toRead);
+        pos += toRead;
+        it.rp()->move(toRead);
+    }
+
+    return buffer;
+}
+
+void EventMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
+                                             ProtoOutputStream* protoOutput) {
+    protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
+    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs);
+    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
 
     size_t bufferSize = mProto->size();
     VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize);
-    std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
+    std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked(*mProto);
 
-    startNewProtoOutputStreamLocked(endTime);
+    protoOutput->write(FIELD_TYPE_MESSAGE | FIELD_ID_EVENT_METRICS,
+                       reinterpret_cast<char*>(buffer.get()->data()), buffer.get()->size());
 
-    return buffer;
+    startNewProtoOutputStreamLocked();
+    mStartTimeNs = dumpTimeNs;
 }
 
 void EventMetricProducer::onConditionChangedLocked(const bool conditionMet,