Merge "Handle errors within LogEvent" into rvc-dev
diff --git a/cmds/statsd/src/StatsLogProcessor.cpp b/cmds/statsd/src/StatsLogProcessor.cpp
index d914ab2..f91c600 100644
--- a/cmds/statsd/src/StatsLogProcessor.cpp
+++ b/cmds/statsd/src/StatsLogProcessor.cpp
@@ -389,15 +389,24 @@
 void StatsLogProcessor::OnLogEvent(LogEvent* event, int64_t elapsedRealtimeNs) {
     std::lock_guard<std::mutex> lock(mMetricsMutex);
 
+    // Tell StatsdStats about new event
+    const int64_t eventElapsedTimeNs = event->GetElapsedTimestampNs();
+    int atomId = event->GetTagId();
+    StatsdStats::getInstance().noteAtomLogged(atomId, eventElapsedTimeNs / NS_PER_SEC);
+    if (!event->isValid()) {
+        StatsdStats::getInstance().noteAtomError(atomId);
+        return;
+    }
+
     // Hard-coded logic to update train info on disk and fill in any information
     // this log event may be missing.
-    if (event->GetTagId() == android::os::statsd::util::BINARY_PUSH_STATE_CHANGED) {
+    if (atomId == android::os::statsd::util::BINARY_PUSH_STATE_CHANGED) {
         onBinaryPushStateChangedEventLocked(event);
     }
 
     // Hard-coded logic to update experiment ids on disk for certain rollback
     // types and fill the rollback atom with experiment ids
-    if (event->GetTagId() == android::os::statsd::util::WATCHDOG_ROLLBACK_OCCURRED) {
+    if (atomId == android::os::statsd::util::WATCHDOG_ROLLBACK_OCCURRED) {
         onWatchdogRollbackOccurredLocked(event);
     }
 
@@ -406,16 +415,11 @@
         ALOGI("%s", event->ToString().c_str());
     }
 #endif
-    const int64_t eventElapsedTimeNs = event->GetElapsedTimestampNs();
-
     resetIfConfigTtlExpiredLocked(eventElapsedTimeNs);
 
-    StatsdStats::getInstance().noteAtomLogged(
-        event->GetTagId(), event->GetElapsedTimestampNs() / NS_PER_SEC);
-
     // Hard-coded logic to update the isolated uid's in the uid-map.
     // The field numbers need to be currently updated by hand with atoms.proto
-    if (event->GetTagId() == android::os::statsd::util::ISOLATED_UID_CHANGED) {
+    if (atomId == android::os::statsd::util::ISOLATED_UID_CHANGED) {
         onIsolatedUidChangedEventLocked(*event);
     }
 
@@ -432,7 +436,7 @@
     }
 
 
-    if (event->GetTagId() != android::os::statsd::util::ISOLATED_UID_CHANGED) {
+    if (atomId != android::os::statsd::util::ISOLATED_UID_CHANGED) {
         // Map the isolated uid to host uid if necessary.
         mapIsolatedUidToHostUidIfNecessaryLocked(event);
     }
diff --git a/cmds/statsd/src/external/StatsCallbackPuller.cpp b/cmds/statsd/src/external/StatsCallbackPuller.cpp
index 933f48d..3618bb0 100644
--- a/cmds/statsd/src/external/StatsCallbackPuller.cpp
+++ b/cmds/statsd/src/external/StatsCallbackPuller.cpp
@@ -67,8 +67,14 @@
                     lock_guard<mutex> lk(*cv_mutex);
                     for (const StatsEventParcel& parcel: output) {
                         shared_ptr<LogEvent> event = make_shared<LogEvent>(/*uid=*/-1, /*pid=*/-1);
-                        event->parseBuffer((uint8_t*)parcel.buffer.data(), parcel.buffer.size());
-                        sharedData->push_back(event);
+                        bool valid = event->parseBuffer((uint8_t*)parcel.buffer.data(),
+                                                        parcel.buffer.size());
+                        if (valid) {
+                            sharedData->push_back(event);
+                        } else {
+                            StatsdStats::getInstance().noteAtomError(event->GetTagId(),
+                                                                     /*pull=*/true);
+                        }
                     }
                     *pullSuccess = success;
                     *pullFinish = true;
diff --git a/cmds/statsd/src/guardrail/StatsdStats.cpp b/cmds/statsd/src/guardrail/StatsdStats.cpp
index db637b1..46f5dbd 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.cpp
+++ b/cmds/statsd/src/guardrail/StatsdStats.cpp
@@ -54,6 +54,7 @@
 
 const int FIELD_ID_ATOM_STATS_TAG = 1;
 const int FIELD_ID_ATOM_STATS_COUNT = 2;
+const int FIELD_ID_ATOM_STATS_ERROR_COUNT = 3;
 
 const int FIELD_ID_ANOMALY_ALARMS_REGISTERED = 1;
 const int FIELD_ID_PERIODIC_ALARMS_REGISTERED = 1;
@@ -549,6 +550,20 @@
             std::min(pullStats.minBucketBoundaryDelayNs, timeDelayNs);
 }
 
+void StatsdStats::noteAtomError(int atomTag, bool pull) {
+    lock_guard<std::mutex> lock(mLock);
+    if (pull) {
+        mPulledAtomStats[atomTag].atomErrorCount++;
+        return;
+    }
+
+    bool present = (mPushedAtomErrorStats.find(atomTag) != mPushedAtomErrorStats.end());
+    bool full = (mPushedAtomErrorStats.size() >= (size_t)kMaxPushedAtomErrorStatsSize);
+    if (!full || present) {
+        mPushedAtomErrorStats[atomTag]++;
+    }
+}
+
 StatsdStats::AtomMetricStats& StatsdStats::getAtomMetricStats(int64_t metricId) {
     auto atomMetricStatsIter = mAtomMetricStats.find(metricId);
     if (atomMetricStatsIter != mAtomMetricStats.end()) {
@@ -604,9 +619,11 @@
         pullStats.second.pullExceedMaxDelay = 0;
         pullStats.second.registeredCount = 0;
         pullStats.second.unregisteredCount = 0;
+        pullStats.second.atomErrorCount = 0;
     }
     mAtomMetricStats.clear();
     mActivationBroadcastGuardrailStats.clear();
+    mPushedAtomErrorStats.clear();
 }
 
 string buildTimeString(int64_t timeSec) {
@@ -617,6 +634,15 @@
     return string(timeBuffer);
 }
 
+int StatsdStats::getPushedAtomErrors(int atomId) const {
+    const auto& it = mPushedAtomErrorStats.find(atomId);
+    if (it != mPushedAtomErrorStats.end()) {
+        return it->second;
+    } else {
+        return 0;
+    }
+}
+
 void StatsdStats::dumpStats(int out) const {
     lock_guard<std::mutex> lock(mLock);
     time_t t = mStartTimeSec;
@@ -721,11 +747,13 @@
     const size_t atomCounts = mPushedAtomStats.size();
     for (size_t i = 2; i < atomCounts; i++) {
         if (mPushedAtomStats[i] > 0) {
-            dprintf(out, "Atom %lu->%d\n", (unsigned long)i, mPushedAtomStats[i]);
+            dprintf(out, "Atom %zu->(total count)%d, (error count)%d\n", i, mPushedAtomStats[i],
+                    getPushedAtomErrors((int)i));
         }
     }
     for (const auto& pair : mNonPlatformPushedAtomStats) {
-        dprintf(out, "Atom %lu->%d\n", (unsigned long)pair.first, pair.second);
+        dprintf(out, "Atom %d->(total count)%d, (error count)%d\n", pair.first, pair.second,
+                getPushedAtomErrors(pair.first));
     }
 
     dprintf(out, "********Pulled Atom stats***********\n");
@@ -737,13 +765,15 @@
                 "nanos)%lld, "
                 "  (max pull delay nanos)%lld, (data error)%ld\n"
                 "  (pull timeout)%ld, (pull exceed max delay)%ld\n"
-                "  (registered count) %ld, (unregistered count) %ld\n",
+                "  (registered count) %ld, (unregistered count) %ld\n"
+                "  (atom error count) %d\n",
                 (int)pair.first, (long)pair.second.totalPull, (long)pair.second.totalPullFromCache,
                 (long)pair.second.pullFailed, (long)pair.second.minPullIntervalSec,
                 (long long)pair.second.avgPullTimeNs, (long long)pair.second.maxPullTimeNs,
                 (long long)pair.second.avgPullDelayNs, (long long)pair.second.maxPullDelayNs,
                 pair.second.dataError, pair.second.pullTimeout, pair.second.pullExceedMaxDelay,
-                pair.second.registeredCount, pair.second.unregisteredCount);
+                pair.second.registeredCount, pair.second.unregisteredCount,
+                pair.second.atomErrorCount);
     }
 
     if (mAnomalyAlarmRegisteredStats > 0) {
@@ -919,6 +949,10 @@
                     proto.start(FIELD_TYPE_MESSAGE | FIELD_ID_ATOM_STATS | FIELD_COUNT_REPEATED);
             proto.write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_STATS_TAG, (int32_t)i);
             proto.write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_STATS_COUNT, mPushedAtomStats[i]);
+            int errors = getPushedAtomErrors(i);
+            if (errors > 0) {
+                proto.write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_STATS_ERROR_COUNT, errors);
+            }
             proto.end(token);
         }
     }
@@ -928,6 +962,10 @@
                 proto.start(FIELD_TYPE_MESSAGE | FIELD_ID_ATOM_STATS | FIELD_COUNT_REPEATED);
         proto.write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_STATS_TAG, pair.first);
         proto.write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_STATS_COUNT, pair.second);
+        int errors = getPushedAtomErrors(pair.first);
+        if (errors > 0) {
+            proto.write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_STATS_ERROR_COUNT, errors);
+        }
         proto.end(token);
     }
 
diff --git a/cmds/statsd/src/guardrail/StatsdStats.h b/cmds/statsd/src/guardrail/StatsdStats.h
index ff31e9e..21e524a 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.h
+++ b/cmds/statsd/src/guardrail/StatsdStats.h
@@ -461,6 +461,16 @@
      */
      void noteActivationBroadcastGuardrailHit(const int uid);
 
+     /**
+      * Reports that an atom is erroneous or cannot be parsed successfully by
+      * statsd. An atom tag of 0 indicates that the client did not supply the
+      * atom id within the encoding.
+      *
+      * For pushed atoms only, this call should be preceded by a call to
+      * noteAtomLogged.
+      */
+     void noteAtomError(int atomTag, bool pull=false);
+
     /**
      * Reset the historical stats. Including all stats in icebox, and the tracked stats about
      * metrics, matchers, and atoms. The active configs will be kept and StatsdStats will continue
@@ -499,6 +509,7 @@
         long emptyData = 0;
         long registeredCount = 0;
         long unregisteredCount = 0;
+        int32_t atomErrorCount = 0;
     } PulledAtomStats;
 
     typedef struct {
@@ -546,6 +557,12 @@
     // Maps PullAtomId to its stats. The size is capped by the puller atom counts.
     std::map<int, PulledAtomStats> mPulledAtomStats;
 
+    // Stores the number of times a pushed atom was logged erroneously. The
+    // corresponding counts for pulled atoms are stored in PulledAtomStats.
+    // The max size of this map is kMaxAtomErrorsStatsSize.
+    std::map<int, int> mPushedAtomErrorStats;
+    int kMaxPushedAtomErrorStatsSize = 100;
+
     // Maps metric ID to its stats. The size is capped by the number of metrics.
     std::map<int64_t, AtomMetricStats> mAtomMetricStats;
 
@@ -613,6 +630,8 @@
 
     void addToIceBoxLocked(std::shared_ptr<ConfigStats>& stats);
 
+    int getPushedAtomErrors(int atomId) const;
+
     /**
      * Get a reference to AtomMetricStats for a metric. If none exists, create it. The reference
      * will live as long as `this`.
@@ -631,6 +650,7 @@
     FRIEND_TEST(StatsdStatsTest, TestPullAtomStats);
     FRIEND_TEST(StatsdStatsTest, TestAtomMetricsStats);
     FRIEND_TEST(StatsdStatsTest, TestActivationBroadcastGuardrailHit);
+    FRIEND_TEST(StatsdStatsTest, TestAtomErrorStats);
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/logd/LogEvent.cpp b/cmds/statsd/src/logd/LogEvent.cpp
index f41ff74..eb830e1 100644
--- a/cmds/statsd/src/logd/LogEvent.cpp
+++ b/cmds/statsd/src/logd/LogEvent.cpp
@@ -377,7 +377,6 @@
         typeInfo = readNextValue<uint8_t>();
         uint8_t typeId = getTypeId(typeInfo);
 
-        // TODO(b/144373276): handle errors passed to the socket
         switch (typeId) {
             case BOOL_TYPE:
                 parseBool(pos, /*depth=*/0, last, getNumAnnotations(typeInfo));
@@ -404,8 +403,13 @@
                 parseAttributionChain(pos, /*depth=*/0, last, getNumAnnotations(typeInfo));
                 if (mAttributionChainIndex == -1) mAttributionChainIndex = pos[0];
                 break;
+            case ERROR_TYPE:
+                mErrorBitmask = readNextValue<int32_t>();
+                mValid = false;
+                break;
             default:
                 mValid = false;
+                break;
         }
     }
 
diff --git a/cmds/statsd/src/logd/LogEvent.h b/cmds/statsd/src/logd/LogEvent.h
index 8d0e794..dedcfaf 100644
--- a/cmds/statsd/src/logd/LogEvent.h
+++ b/cmds/statsd/src/logd/LogEvent.h
@@ -210,6 +210,14 @@
         return BAD_INDEX;
     }
 
+    bool isValid() const {
+        return mValid;
+    }
+
+    int32_t getErrorBitmask() const {
+        return mErrorBitmask;
+    }
+
 private:
     /**
      * Only use this if copy is absolutely needed.
@@ -236,12 +244,13 @@
     bool checkPreviousValueType(Type expected);
 
     /**
-     * The below three variables are only valid during the execution of
+     * The below two variables are only valid during the execution of
      * parseBuffer. There are no guarantees about the state of these variables
      * before/after.
      */
     uint8_t* mBuf;
     uint32_t mRemainingLen; // number of valid bytes left in the buffer being parsed
+
     bool mValid = true; // stores whether the event we received from the socket is valid
 
     /**
@@ -299,8 +308,9 @@
     // The elapsed timestamp set by statsd log writer.
     int64_t mElapsedTimestampNs;
 
-    // The atom tag of the event.
-    int mTagId;
+    // The atom tag of the event (defaults to 0 if client does not
+    // appropriately set the atom id).
+    int mTagId = 0;
 
     // The uid of the logging client (defaults to -1).
     int32_t mLogUid = -1;
@@ -308,6 +318,9 @@
     // The pid of the logging client (defaults to -1).
     int32_t mLogPid = -1;
 
+    // Bitmask of errors sent by StatsEvent/AStatsEvent.
+    int32_t mErrorBitmask = 0;
+
     // Annotations
     bool mTruncateTimestamp = false;
     int mUidFieldIndex = -1;
diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto
index f4247ec..868247b 100644
--- a/cmds/statsd/src/stats_log.proto
+++ b/cmds/statsd/src/stats_log.proto
@@ -425,6 +425,7 @@
     message AtomStats {
         optional int32 tag = 1;
         optional int32 count = 2;
+        optional int32 error_count = 3;
     }
 
     repeated AtomStats atom_stats = 7;
@@ -460,6 +461,7 @@
         optional int64 empty_data = 15;
         optional int64 registered_count = 16;
         optional int64 unregistered_count = 17;
+        optional int32 atom_error_count = 18;
     }
     repeated PulledAtomStats pulled_atom_stats = 10;
 
diff --git a/cmds/statsd/src/stats_log_util.cpp b/cmds/statsd/src/stats_log_util.cpp
index 5635313..f9fddc8 100644
--- a/cmds/statsd/src/stats_log_util.cpp
+++ b/cmds/statsd/src/stats_log_util.cpp
@@ -80,6 +80,8 @@
 const int FIELD_ID_EMPTY_DATA = 15;
 const int FIELD_ID_PULL_REGISTERED_COUNT = 16;
 const int FIELD_ID_PULL_UNREGISTERED_COUNT = 17;
+const int FIELD_ID_ATOM_ERROR_COUNT = 18;
+
 // for AtomMetricStats proto
 const int FIELD_ID_ATOM_METRIC_STATS = 17;
 const int FIELD_ID_METRIC_ID = 1;
@@ -492,6 +494,7 @@
                        (long long) pair.second.registeredCount);
     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_PULL_UNREGISTERED_COUNT,
                        (long long) pair.second.unregisteredCount);
+    protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_ERROR_COUNT, pair.second.atomErrorCount);
     protoOutput->end(token);
 }
 
diff --git a/cmds/statsd/tests/guardrail/StatsdStats_test.cpp b/cmds/statsd/tests/guardrail/StatsdStats_test.cpp
index 129fafa..cdde603 100644
--- a/cmds/statsd/tests/guardrail/StatsdStats_test.cpp
+++ b/cmds/statsd/tests/guardrail/StatsdStats_test.cpp
@@ -486,6 +486,41 @@
     EXPECT_TRUE(uid2Good);
 }
 
+TEST(StatsdStatsTest, TestAtomErrorStats) {
+    StatsdStats stats;
+
+    int pushAtomTag = 100;
+    int pullAtomTag = 1000;
+    int numErrors = 10;
+
+    for (int i = 0; i < numErrors; i++) {
+        // We must call noteAtomLogged as well because only those pushed atoms
+        // that have been logged will have stats printed about them in the
+        // proto.
+        stats.noteAtomLogged(pushAtomTag, /*timeSec=*/0);
+        stats.noteAtomError(pushAtomTag, /*pull=*/false);
+
+        stats.noteAtomError(pullAtomTag, /*pull=*/true);
+    }
+
+    vector<uint8_t> output;
+    stats.dumpStats(&output, false);
+    StatsdStatsReport report;
+    EXPECT_TRUE(report.ParseFromArray(&output[0], output.size()));
+
+    // Check error count = numErrors for push atom
+    EXPECT_EQ(1, report.atom_stats_size());
+    const auto& pushedAtomStats = report.atom_stats(0);
+    EXPECT_EQ(pushAtomTag, pushedAtomStats.tag());
+    EXPECT_EQ(numErrors, pushedAtomStats.error_count());
+
+    // Check error count = numErrors for pull atom
+    EXPECT_EQ(1, report.pulled_atom_stats_size());
+    const auto& pulledAtomStats = report.pulled_atom_stats(0);
+    EXPECT_EQ(pullAtomTag, pulledAtomStats.atom_id());
+    EXPECT_EQ(numErrors, pulledAtomStats.atom_error_count());
+}
+
 }  // namespace statsd
 }  // namespace os
 }  // namespace android