Support StatsD sending broadcasts.

StatsD will send a broadcast when we're 90% of the way to our
allocated memory limit for the configuration. If the memory usage
goes over the limit, we just lose all the data for this config.

Also modifies the adb shell commands to facilitate debugging of the
broadcasts.

Test: Manually tested on marlin-eng with custom gmscore code.

Change-Id: I517a15bd4c959aa221802f84a51f13141a725102
diff --git a/cmds/statsd/src/StatsLogProcessor.cpp b/cmds/statsd/src/StatsLogProcessor.cpp
index b764ce5..abd2a35 100644
--- a/cmds/statsd/src/StatsLogProcessor.cpp
+++ b/cmds/statsd/src/StatsLogProcessor.cpp
@@ -50,8 +50,8 @@
 const int FIELD_ID_NAME = 2;
 
 StatsLogProcessor::StatsLogProcessor(const sp<UidMap>& uidMap,
-                                     const std::function<void(const vector<uint8_t>&)>& pushLog)
-    : mUidMap(uidMap), mPushLog(pushLog) {
+                                     const std::function<void(const ConfigKey&)>& sendBroadcast)
+    : mUidMap(uidMap), mSendBroadcast(sendBroadcast) {
 }
 
 StatsLogProcessor::~StatsLogProcessor() {
@@ -102,12 +102,27 @@
     }
 }
 
-vector<uint8_t> StatsLogProcessor::onDumpReport(const ConfigKey& key) {
+size_t StatsLogProcessor::GetMetricsSize(const ConfigKey& key) {
     auto it = mMetricsManagers.find(key);
     if (it == mMetricsManagers.end()) {
         ALOGW("Config source %s does not exist", key.ToString().c_str());
-        return vector<uint8_t>();
+        return 0;
     }
+    return it->second->byteSize();
+}
+
+void StatsLogProcessor::onDumpReport(const ConfigKey& key, vector<uint8_t>* outData) {
+    auto it = mMetricsManagers.find(key);
+    if (it == mMetricsManagers.end()) {
+        ALOGW("Config source %s does not exist", key.ToString().c_str());
+        return;
+    }
+
+    // This allows another broadcast to be sent within the rate-limit period if we get close to
+    // filling the buffer again soon.
+    mBroadcastTimesMutex.lock();
+    mLastBroadcastTimes.erase(key);
+    mBroadcastTimesMutex.unlock();
 
     ProtoOutputStream proto;
 
@@ -131,17 +146,18 @@
     uidMap.SerializeToArray(&uidMapBuffer[0], uidMapSize);
     proto.write(FIELD_TYPE_MESSAGE | FIELD_ID_UID_MAP, uidMapBuffer, uidMapSize);
 
-    vector<uint8_t> buffer(proto.size());
-    size_t pos = 0;
-    auto iter = proto.data();
-    while (iter.readBuffer() != NULL) {
-        size_t toRead = iter.currentToRead();
-        std::memcpy(&buffer[pos], iter.readBuffer(), toRead);
-        pos += toRead;
-        iter.rp()->move(toRead);
+    if (outData != nullptr) {
+        outData->clear();
+        outData->resize(proto.size());
+        size_t pos = 0;
+        auto iter = proto.data();
+        while (iter.readBuffer() != NULL) {
+            size_t toRead = iter.currentToRead();
+            std::memcpy(&((*outData)[pos]), iter.readBuffer(), toRead);
+            pos += toRead;
+            iter.rp()->move(toRead);
+        }
     }
-
-    return buffer;
 }
 
 void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) {
@@ -151,42 +167,34 @@
         mMetricsManagers.erase(it);
         mUidMap->OnConfigRemoved(key);
     }
-    auto flushTime = mLastFlushTimes.find(key);
-    if (flushTime != mLastFlushTimes.end()) {
-        mLastFlushTimes.erase(flushTime);
-    }
+
+    std::lock_guard<std::mutex> lock(mBroadcastTimesMutex);
+    mLastBroadcastTimes.erase(key);
 }
 
 void StatsLogProcessor::flushIfNecessary(uint64_t timestampNs,
                                          const ConfigKey& key,
                                          const unique_ptr<MetricsManager>& metricsManager) {
-    auto lastFlushNs = mLastFlushTimes.find(key);
-    if (lastFlushNs != mLastFlushTimes.end()) {
-        if (timestampNs - lastFlushNs->second < kMinFlushPeriod) {
-            return;
-        }
-    }
+    std::lock_guard<std::mutex> lock(mBroadcastTimesMutex);
 
     size_t totalBytes = metricsManager->byteSize();
-    if (totalBytes > kMaxSerializedBytes) {
-        flush();
-        mLastFlushTimes[key] = std::move(timestampNs);
+    if (totalBytes > .9 * kMaxSerializedBytes) { // Send broadcast so that receivers can pull data.
+        auto lastFlushNs = mLastBroadcastTimes.find(key);
+        if (lastFlushNs != mLastBroadcastTimes.end()) {
+            if (timestampNs - lastFlushNs->second < kMinBroadcastPeriod) {
+                return;
+            }
+        }
+        mLastBroadcastTimes[key] = timestampNs;
+        ALOGD("StatsD requesting broadcast for %s", key.ToString().c_str());
+        mSendBroadcast(key);
+    } else if (totalBytes > kMaxSerializedBytes) { // Too late. We need to start clearing data.
+        // We ignore the return value so we force each metric producer to clear its contents.
+        metricsManager->onDumpReport();
+        ALOGD("StatsD had to toss out metrics for %s", key.ToString().c_str());
     }
 }
 
-void StatsLogProcessor::flush() {
-    // TODO: Take ConfigKey as an argument and flush metrics related to the
-    // ConfigKey. Also, create a wrapper that holds a repeated field of
-    // StatsLogReport's.
-    /*
-    StatsLogReport logReport;
-    const int numBytes = logReport.ByteSize();
-    vector<uint8_t> logReportBuffer(numBytes);
-    logReport.SerializeToArray(&logReportBuffer[0], numBytes);
-    mPushLog(logReportBuffer);
-    */
-}
-
 }  // namespace statsd
 }  // namespace os
 }  // namespace android
diff --git a/cmds/statsd/src/StatsLogProcessor.h b/cmds/statsd/src/StatsLogProcessor.h
index f38d715..2091774 100644
--- a/cmds/statsd/src/StatsLogProcessor.h
+++ b/cmds/statsd/src/StatsLogProcessor.h
@@ -33,7 +33,7 @@
 class StatsLogProcessor : public ConfigListener {
 public:
     StatsLogProcessor(const sp<UidMap>& uidMap,
-                      const std::function<void(const vector<uint8_t>&)>& pushLog);
+                      const std::function<void(const ConfigKey&)>& sendBroadcast);
     virtual ~StatsLogProcessor();
 
     virtual void OnLogEvent(const LogEvent& event);
@@ -41,15 +41,16 @@
     void OnConfigUpdated(const ConfigKey& key, const StatsdConfig& config);
     void OnConfigRemoved(const ConfigKey& key);
 
-    vector<uint8_t> onDumpReport(const ConfigKey& key);
+    size_t GetMetricsSize(const ConfigKey& key);
 
-    /* Request a flush through a binder call. */
-    void flush();
+    void onDumpReport(const ConfigKey& key, vector<uint8_t>* outData);
 
 private:
+    mutable mutex mBroadcastTimesMutex;
+
     std::unordered_map<ConfigKey, std::unique_ptr<MetricsManager>> mMetricsManagers;
 
-    std::unordered_map<ConfigKey, long> mLastFlushTimes;
+    std::unordered_map<ConfigKey, long> mLastBroadcastTimes;
 
     sp<UidMap> mUidMap;  // Reference to the UidMap to lookup app name and version for each uid.
 
@@ -60,17 +61,18 @@
      */
     static const size_t kMaxSerializedBytes = 16 * 1024;
 
-    /* Check if the buffer size exceeds the max buffer size when the new entry is added, and flush
-       the logs to callback clients if true. */
+    /* Check if we should send a broadcast if approaching memory limits and if we're over, we
+     * actually delete the data. */
     void flushIfNecessary(uint64_t timestampNs,
                           const ConfigKey& key,
                           const unique_ptr<MetricsManager>& metricsManager);
 
-    std::function<void(const vector<uint8_t>&)> mPushLog;
+    // Function used to send a broadcast so that receiver for the config key can call getData
+    // to retrieve the stored data.
+    std::function<void(const ConfigKey& key)> mSendBroadcast;
 
-    /* Minimum period between two flushes in nanoseconds. Currently set to 10
-     * minutes. */
-    static const unsigned long long kMinFlushPeriod = 600 * NS_PER_SEC;
+    /* Minimum period between two broadcasts in nanoseconds. Currently set to 60 seconds. */
+    static const unsigned long long kMinBroadcastPeriod = 60 * NS_PER_SEC;
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp
index 8bc776f..779f60f 100644
--- a/cmds/statsd/src/StatsService.cpp
+++ b/cmds/statsd/src/StatsService.cpp
@@ -73,8 +73,18 @@
 {
     mUidMap = new UidMap();
     mConfigManager = new ConfigManager();
-    mProcessor = new StatsLogProcessor(mUidMap, [](const vector<uint8_t>& log) {
-        // TODO: Update how we send data out of StatsD.
+    mProcessor = new StatsLogProcessor(mUidMap, [this](const ConfigKey& key) {
+        auto sc = getStatsCompanionService();
+        auto receiver = mConfigManager->GetConfigReceiver(key);
+        if (sc == nullptr) {
+            ALOGD("Could not find StatsCompanionService");
+        } else if (receiver.first.size() == 0) {
+            ALOGD("Statscompanion could not find a broadcast receiver for %s",
+                  key.ToString().c_str());
+        } else {
+            sc->sendBroadcast(String16(receiver.first.c_str()),
+                              String16(receiver.second.c_str()));
+        }
     });
 
     mConfigManager->AddListener(mProcessor);
@@ -206,7 +216,11 @@
         }
 
         if (!args[0].compare(String8("send-broadcast"))) {
-            return cmd_trigger_broadcast(args);
+            return cmd_trigger_broadcast(out, args);
+        }
+
+        if (!args[0].compare(String8("print-stats"))) {
+            return cmd_print_stats(out);
         }
 
         if (!args[0].compare(String8("clear-config"))) {
@@ -259,16 +273,56 @@
     fprintf(out, "  NAME          The name of the configuration\n");
     fprintf(out, "\n");
     fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats send-broadcast PACKAGE CLASS\n");
-    fprintf(out, "  Send a broadcast that triggers one subscriber to fetch metrics.\n");
-    fprintf(out, "  PACKAGE        The name of the package to receive the broadcast.\n");
-    fprintf(out, "  CLASS          The name of the class to receive the broadcast.\n");
+    fprintf(out, "usage: adb shell cmd stats send-broadcast [UID] NAME\n");
+    fprintf(out, "  Send a broadcast that triggers the subscriber to fetch metrics.\n");
+    fprintf(out, "  UID           The uid of the configuration. It is only possible to pass\n");
+    fprintf(out, "                the UID parameter on eng builds. If UID is omitted the\n");
+    fprintf(out, "                calling uid is used.\n");
+    fprintf(out, "  NAME          The name of the configuration\n");
+    fprintf(out, "\n");
+    fprintf(out, "\n");
+    fprintf(out, "usage: adb shell cmd stats print-stats\n");
+    fprintf(out, "  Prints some basic stats.\n");
 }
 
-status_t StatsService::cmd_trigger_broadcast(Vector<String8>& args) {
+status_t StatsService::cmd_trigger_broadcast(FILE* out, Vector<String8>& args) {
+    string name;
+    bool good = false;
+    int uid;
+    const int argCount = args.size();
+    if (argCount == 2) {
+        // Automatically pick the UID
+        uid = IPCThreadState::self()->getCallingUid();
+        // TODO: What if this isn't a binder call? Should we fail?
+        name.assign(args[1].c_str(), args[1].size());
+        good = true;
+    } else if (argCount == 3) {
+        // If it's a userdebug or eng build, then the shell user can
+        // impersonate other uids.
+        if (mEngBuild) {
+            const char* s = args[1].c_str();
+            if (*s != '\0') {
+                char* end = NULL;
+                uid = strtol(s, &end, 0);
+                if (*end == '\0') {
+                    name.assign(args[2].c_str(), args[2].size());
+                    good = true;
+                }
+            }
+        } else {
+            fprintf(out,
+                    "The metrics can only be dumped for other UIDs on eng or userdebug "
+                            "builds.\n");
+        }
+    }
+    if (!good) {
+        print_cmd_help(out);
+        return UNKNOWN_ERROR;
+    }
+    auto receiver = mConfigManager->GetConfigReceiver(ConfigKey(uid, name));
     auto sc = getStatsCompanionService();
-    sc->sendBroadcast(String16(args[1]), String16(args[2]));
-    ALOGD("StatsService::trigger broadcast succeeded");
+    sc->sendBroadcast(String16(receiver.first.c_str()), String16(receiver.second.c_str()));
+    ALOGD("StatsService::trigger broadcast succeeded to %s, %s", args[1].c_str(), args[2].c_str());
     return NO_ERROR;
 }
 
@@ -373,7 +427,8 @@
             }
         }
         if (good) {
-            mProcessor->onDumpReport(ConfigKey(uid, name));
+            vector<uint8_t> data;
+            mProcessor->onDumpReport(ConfigKey(uid, name), &data);
             // TODO: print the returned StatsLogReport to file instead of printing to logcat.
             fprintf(out, "Dump report for Config [%d,%s]\n", uid, name.c_str());
             fprintf(out, "See the StatsLogReport in logcat...\n");
@@ -389,6 +444,15 @@
     }
 }
 
+status_t StatsService::cmd_print_stats(FILE* out) {
+    vector<ConfigKey> configs = mConfigManager->GetAllConfigKeys();
+    for (const ConfigKey& key : configs) {
+        fprintf(out, "Config %s uses %zu bytes\n", key.ToString().c_str(),
+                mProcessor->GetMetricsSize(key));
+    }
+    return NO_ERROR;
+}
+
 status_t StatsService::cmd_print_stats_log(FILE* out, const Vector<String8>& args) {
     long msec = 0;
 
@@ -573,10 +637,14 @@
     mProcessor->OnLogEvent(event);
 }
 
-Status StatsService::getData(const String16& key, vector<uint8_t>* output) {
+Status StatsService::getData(const String16& key, vector <uint8_t>* output) {
     IPCThreadState* ipc = IPCThreadState::self();
+    ALOGD("StatsService::getData with Pid %i, Uid %i", ipc->getCallingPid(),
+          ipc->getCallingUid());
     if (checkCallingPermission(String16(kPermissionDump))) {
-        // TODO: Implement this.
+        string keyStr = string(String8(key).string());
+        ConfigKey configKey(ipc->getCallingUid(), keyStr);
+        mProcessor->onDumpReport(configKey, output);
         return Status::ok();
     } else {
         return Status::fromExceptionCode(binder::Status::EX_SECURITY);
@@ -588,10 +656,9 @@
                                       const String16& package, const String16& cls,
                                       bool* success) {
     IPCThreadState* ipc = IPCThreadState::self();
-    int32_t* uid = reinterpret_cast<int32_t*>(ipc->getCallingUid());
     if (checkCallingPermission(String16(kPermissionDump))) {
         string keyString = string(String8(key).string());
-        ConfigKey configKey(*uid, keyString);
+        ConfigKey configKey(ipc->getCallingUid(), keyString);
         StatsdConfig cfg;
         cfg.ParseFromArray(&config[0], config.size());
         mConfigManager->UpdateConfig(configKey, cfg);
@@ -607,7 +674,8 @@
 Status StatsService::removeConfiguration(const String16& key, bool* success) {
     IPCThreadState* ipc = IPCThreadState::self();
     if (checkCallingPermission(String16(kPermissionDump))) {
-        // TODO: Implement this.
+        string keyStr = string(String8(key).string());
+        mConfigManager->RemoveConfig(ConfigKey(ipc->getCallingUid(), keyStr));
         return Status::ok();
     } else {
         *success = false;
@@ -615,7 +683,7 @@
     }
 }
 
-void StatsService::binderDied(const wp<IBinder>& who) {
+void StatsService::binderDied(const wp <IBinder>& who) {
     for (size_t i = 0; i < mCallbacks.size(); i++) {
         if (IInterface::asBinder(mCallbacks[i]) == who) {
             mCallbacks.removeAt(i);
diff --git a/cmds/statsd/src/StatsService.h b/cmds/statsd/src/StatsService.h
index 0163f94..888f97b 100644
--- a/cmds/statsd/src/StatsService.h
+++ b/cmds/statsd/src/StatsService.h
@@ -124,7 +124,7 @@
     /**
      * Trigger a broadcast.
      */
-    status_t cmd_trigger_broadcast(Vector<String8>& args);
+    status_t cmd_trigger_broadcast(FILE* out, Vector<String8>& args);
 
     /**
      * Handle the config sub-command.
@@ -132,6 +132,11 @@
     status_t cmd_config(FILE* in, FILE* out, FILE* err, Vector<String8>& args);
 
     /**
+     * Prints some basic stats to std out.
+     */
+    status_t cmd_print_stats(FILE* out);
+
+    /**
      * Print the event log.
      */
     status_t cmd_print_stats_log(FILE* out, const Vector<String8>& args);
diff --git a/cmds/statsd/src/config/ConfigManager.cpp b/cmds/statsd/src/config/ConfigManager.cpp
index 3319f6d..9680e4a 100644
--- a/cmds/statsd/src/config/ConfigManager.cpp
+++ b/cmds/statsd/src/config/ConfigManager.cpp
@@ -134,12 +134,34 @@
     }
 }
 
+vector<ConfigKey> ConfigManager::GetAllConfigKeys() {
+    vector<ConfigKey> ret;
+    for (auto it = mConfigs.cbegin(); it != mConfigs.cend(); ++it) {
+        ret.push_back(it->first);
+    }
+    return ret;
+}
+
+const pair<string, string> ConfigManager::GetConfigReceiver(const ConfigKey& key) {
+    auto it = mConfigReceivers.find(key);
+    if (it == mConfigReceivers.end()) {
+        return pair<string,string>();
+    } else {
+        return it->second;
+    }
+}
+
 void ConfigManager::Dump(FILE* out) {
     fprintf(out, "CONFIGURATIONS (%d)\n", (int)mConfigs.size());
     fprintf(out, "     uid name\n");
     for (unordered_map<ConfigKey, StatsdConfig>::const_iterator it = mConfigs.begin();
          it != mConfigs.end(); it++) {
         fprintf(out, "  %6d %s\n", it->first.GetUid(), it->first.GetName().c_str());
+        auto receiverIt = mConfigReceivers.find(it->first);
+        if (receiverIt != mConfigReceivers.end()) {
+            fprintf(out, "    -> received by %s, %s\n", receiverIt->second.first.c_str(),
+                    receiverIt->second.second.c_str());
+        }
         // TODO: Print the contents of the config too.
     }
 }
diff --git a/cmds/statsd/src/config/ConfigManager.h b/cmds/statsd/src/config/ConfigManager.h
index 5b612cc..01d7fb9 100644
--- a/cmds/statsd/src/config/ConfigManager.h
+++ b/cmds/statsd/src/config/ConfigManager.h
@@ -68,6 +68,16 @@
     void SetConfigReceiver(const ConfigKey& key, const string& pkg, const string& cls);
 
     /**
+     * Returns the package name and class name representing the broadcast receiver for this config.
+     */
+    const pair<string, string> GetConfigReceiver(const ConfigKey& key);
+
+    /**
+     * Returns all config keys registered.
+     */
+    vector<ConfigKey> GetAllConfigKeys();
+
+    /**
      * Erase any broadcast receiver associated with this config key.
      */
     void RemoveConfigReceiver(const ConfigKey& key);
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.cpp b/cmds/statsd/src/metrics/EventMetricProducer.cpp
index 74ba40b..bd288a1 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/EventMetricProducer.cpp
@@ -121,7 +121,10 @@
     event.ToProto(*mProto);
     mProto->end(eventToken);
     mProto->end(wrapperToken);
-    // TODO: Find a proper way to derive the size of incoming LogEvent.
+
+    // TODO: Increment mByteSize with a real value. Until this feature is working, we assume 50
+    // bytes.
+    mByteSize += 50;
 }
 
 size_t EventMetricProducer::byteSize() {
diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h
index c0930e3..c7982a8 100644
--- a/cmds/statsd/src/metrics/MetricProducer.h
+++ b/cmds/statsd/src/metrics/MetricProducer.h
@@ -61,12 +61,15 @@
 
     // TODO: Pass a timestamp as a parameter in onDumpReport and update all its
     // implementations.
+    // onDumpReport returns the proto-serialized output and clears the previously stored contents.
     virtual std::unique_ptr<std::vector<uint8_t>> onDumpReport() = 0;
 
     virtual bool isConditionSliced() const {
         return mConditionSliced;
     };
 
+    // Returns the memory in bytes currently used to store this metric's data. Does not change
+    // state.
     virtual size_t byteSize() = 0;
 
 protected:
diff --git a/cmds/statsd/src/metrics/MetricsManager.h b/cmds/statsd/src/metrics/MetricsManager.h
index 39c79f9..fb16779 100644
--- a/cmds/statsd/src/metrics/MetricsManager.h
+++ b/cmds/statsd/src/metrics/MetricsManager.h
@@ -46,6 +46,8 @@
     // Config source owner can call onDumpReport() to get all the metrics collected.
     std::vector<std::unique_ptr<std::vector<uint8_t>>> onDumpReport();
 
+    // Computes the total byte size of all metrics managed by a single config source.
+    // Does not change the state.
     size_t byteSize();
 
 private: