Implement a logic to rate limitted flush statsd with by checking the
size of each MetricsProducer's. The implementation of byteSize() method
is still TBD as it depends on migration to ProtoOutputStream.

Test: statsd, statsd_test
Change-Id: I966606044d7cb814dabe94192bacecad91f28177
diff --git a/cmds/statsd/src/StatsLogProcessor.cpp b/cmds/statsd/src/StatsLogProcessor.cpp
index 56d4e4d..cdaca1b 100644
--- a/cmds/statsd/src/StatsLogProcessor.cpp
+++ b/cmds/statsd/src/StatsLogProcessor.cpp
@@ -46,6 +46,7 @@
     // pass the event to metrics managers.
     for (auto& pair : mMetricsManagers) {
         pair.second->onLogEvent(msg);
+        flushIfNecessary(msg.GetTimestampNs(), pair.first, pair.second);
     }
 }
 
@@ -84,41 +85,40 @@
         it->second->finish();
         mMetricsManagers.erase(it);
     }
+    auto flushTime = mLastFlushTimes.find(key);
+    if (flushTime != mLastFlushTimes.end()) {
+        mLastFlushTimes.erase(flushTime);
+    }
 }
 
-void StatsLogProcessor::addEventMetricData(const EventMetricData& eventMetricData) {
-    // TODO: Replace this code when MetricsManager.onDumpReport() is ready to
-    // get a list of byte arrays.
-    flushIfNecessary(eventMetricData);
-    const int numBytes = eventMetricData.ByteSize();
-    char buffer[numBytes];
-    eventMetricData.SerializeToArray(&buffer[0], numBytes);
-    string bufferString(buffer, numBytes);
-    mEvents.push_back(bufferString);
-    mBufferSize += eventMetricData.ByteSize();
-}
+void StatsLogProcessor::flushIfNecessary(uint64_t timestampNs,
+                                         const ConfigKey& key,
+                                         const unique_ptr<MetricsManager>& metricsManager) {
+    auto lastFlushNs = mLastFlushTimes.find(key);
+    if (lastFlushNs != mLastFlushTimes.end()) {
+        if (timestampNs - lastFlushNs->second < kMinFlushPeriod) {
+            return;
+        }
+    }
 
-void StatsLogProcessor::flushIfNecessary(const EventMetricData& eventMetricData) {
-    if (eventMetricData.ByteSize() + mBufferSize > kMaxSerializedBytes) {
-      flush();
+    size_t totalBytes = metricsManager->byteSize();
+    if (totalBytes > kMaxSerializedBytes) {
+        flush();
+        mLastFlushTimes[key] = std::move(timestampNs);
     }
 }
 
 void StatsLogProcessor::flush() {
+    // TODO: Take ConfigKey as an argument and flush metrics related to the
+    // ConfigKey. Also, create a wrapper that holds a repeated field of
+    // StatsLogReport's.
+    /*
     StatsLogReport logReport;
-    for (string eventBuffer : mEvents) {
-        EventMetricData eventFromBuffer;
-        eventFromBuffer.ParseFromString(eventBuffer);
-        EventMetricData* newEntry = logReport.mutable_event_metrics()->add_data();
-        newEntry->CopyFrom(eventFromBuffer);
-    }
-
     const int numBytes = logReport.ByteSize();
     vector<uint8_t> logReportBuffer(numBytes);
     logReport.SerializeToArray(&logReportBuffer[0], numBytes);
     mPushLog(logReportBuffer);
-    mEvents.clear();
-    mBufferSize = 0;
+    */
 }
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/StatsLogProcessor.h b/cmds/statsd/src/StatsLogProcessor.h
index 9cd74ca..6463441 100644
--- a/cmds/statsd/src/StatsLogProcessor.h
+++ b/cmds/statsd/src/StatsLogProcessor.h
@@ -50,6 +50,8 @@
 private:
     std::unordered_map<ConfigKey, std::unique_ptr<MetricsManager>> mMetricsManagers;
 
+    std::unordered_map<ConfigKey, long> mLastFlushTimes;
+
     sp<UidMap> mUidMap;  // Reference to the UidMap to lookup app name and version for each uid.
 
     /* Max *serialized* size of the logs kept in memory before flushing through binder call.
@@ -59,25 +61,17 @@
      */
     static const size_t kMaxSerializedBytes = 16 * 1024;
 
-    /* List of data that was captured for a single metric over a given interval of time. */
-    vector<string> mEvents;
-
-    /* Current *serialized* size of the logs kept in memory.
-       To save computation, we will not calculate the size of the StatsLogReport every time when a
-       new entry is added, which would recursively call ByteSize() on every log entry. Instead, we
-       keep the sum of all individual stats log entry sizes. The size of a proto is approximately
-       the sum of the size of all member protos.
-     */
-    size_t mBufferSize = 0;
-
     /* Check if the buffer size exceeds the max buffer size when the new entry is added, and flush
        the logs to callback clients if true. */
-    void flushIfNecessary(const EventMetricData& eventMetricData);
-
-    /* Append event metric data to StatsLogReport. */
-    void addEventMetricData(const EventMetricData& eventMetricData);
+    void flushIfNecessary(uint64_t timestampNs,
+                          const ConfigKey& key,
+                          const unique_ptr<MetricsManager>& metricsManager);
 
     std::function<void(const vector<uint8_t>&)> mPushLog;
+
+    /* Minimum period between two flushes in nanoseconds. Currently set to 10
+     * minutes. */
+    static const unsigned long long kMinFlushPeriod = 600 * NS_PER_SEC;
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.cpp b/cmds/statsd/src/metrics/CountMetricProducer.cpp
index 7bb9c8a..69f336f 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/CountMetricProducer.cpp
@@ -203,6 +203,13 @@
          (long long)mCurrentBucketStartTimeNs);
 }
 
+size_t CountMetricProducer::byteSize() {
+// TODO: return actual proto size when ProtoOutputStream is ready for use for
+// CountMetricsProducer.
+//    return mProto->size();
+    return 0;
+}
+
 }  // namespace statsd
 }  // namespace os
-}  // namespace android
\ No newline at end of file
+}  // namespace android
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.h b/cmds/statsd/src/metrics/CountMetricProducer.h
index 340c830..be77e47 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.h
+++ b/cmds/statsd/src/metrics/CountMetricProducer.h
@@ -49,6 +49,8 @@
 
     void onSlicedConditionMayChange() override;
 
+    size_t byteSize() override;
+
     // TODO: Implement this later.
     virtual void notifyAppUpgrade(const string& apk, const int uid, const int version) override{};
 
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
index 38e55fd..a590bc8 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
@@ -349,6 +349,13 @@
     }
 }
 
+size_t DurationMetricProducer::byteSize() {
+// TODO: return actual proto size when ProtoOutputStream is ready for use for
+// DurationMetricsProducer.
+//    return mProto->size();
+  return 0;
+}
+
 }  // namespace statsd
 }  // namespace os
 }  // namespace android
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.h b/cmds/statsd/src/metrics/DurationMetricProducer.h
index 19e2437..8820403 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.h
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.h
@@ -69,6 +69,8 @@
 
     void onSlicedConditionMayChange() override;
 
+    size_t byteSize() override;
+
     // TODO: Implement this later.
     virtual void notifyAppUpgrade(const string& apk, const int uid, const int version) override{};
 
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.cpp b/cmds/statsd/src/metrics/EventMetricProducer.cpp
index 8b3f405..7e06105 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/EventMetricProducer.cpp
@@ -127,6 +127,10 @@
     mProto->end(wrapperToken);
 }
 
+size_t EventMetricProducer::byteSize() {
+  return mProto->size();
+}
+
 }  // namespace statsd
 }  // namespace os
 }  // namespace android
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.h b/cmds/statsd/src/metrics/EventMetricProducer.h
index 879175c..14fa31c 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.h
+++ b/cmds/statsd/src/metrics/EventMetricProducer.h
@@ -51,6 +51,8 @@
 
     void onSlicedConditionMayChange() override;
 
+    size_t byteSize() override;
+
     // TODO: Implement this later.
     virtual void notifyAppUpgrade(const string& apk, const int uid, const int version) override{};
 
diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h
index 496b145..80eb527 100644
--- a/cmds/statsd/src/metrics/MetricProducer.h
+++ b/cmds/statsd/src/metrics/MetricProducer.h
@@ -64,6 +64,8 @@
         return mConditionSliced;
     };
 
+    virtual size_t byteSize() = 0;
+
 protected:
     const uint64_t mStartTimeNs;
 
diff --git a/cmds/statsd/src/metrics/MetricsManager.cpp b/cmds/statsd/src/metrics/MetricsManager.cpp
index 1ffa58b..4fa3965 100644
--- a/cmds/statsd/src/metrics/MetricsManager.cpp
+++ b/cmds/statsd/src/metrics/MetricsManager.cpp
@@ -150,6 +150,15 @@
     }
 }
 
+// Returns the total byte size of all metrics managed by a single config source.
+size_t MetricsManager::byteSize() {
+    size_t totalSize = 0;
+    for (auto metricProducer : mAllMetricProducers) {
+        totalSize += metricProducer->byteSize();
+    }
+    return totalSize;
+}
+
 }  // namespace statsd
 }  // namespace os
 }  // namespace android
diff --git a/cmds/statsd/src/metrics/MetricsManager.h b/cmds/statsd/src/metrics/MetricsManager.h
index 2f91460..44cd637 100644
--- a/cmds/statsd/src/metrics/MetricsManager.h
+++ b/cmds/statsd/src/metrics/MetricsManager.h
@@ -46,6 +46,8 @@
     // Config source owner can call onDumpReport() to get all the metrics collected.
     std::vector<StatsLogReport> onDumpReport();
 
+    size_t byteSize();
+
 private:
     // All event tags that are interesting to my metrics.
     std::set<int> mTagIds;