Handle logd reconnect.

When statsd reconnects to logd, statsd will read all logs from buffer again. To prevent us from
reprocessing old events, we do the following:

1. At any given moment, record the largest timestamp(T_max) and last timestamp (check point) that
   we've seen before.
2. When reconnection happens, we look for the check point until we see a new log with a timestamp
   larger than T_max.
   -> If we found the CP, resume after the CP. Success
   -> If we can't find CP, there is definitely log loss. We reset all configs.

Note:
1. Logd has an API to read logs after a certain timestamp. But this api is vulnerable to
time changes from Settings. So we cannot rely on it.

2. If logd inserts a new log (with older timestamp) before CP, we cannot detect it. It's not
   possible to detect it without record all timestamps we have seen.

Test: statsd_test
Bug: 77813113

Change-Id: Ic3fdb47230807606ab11dc994cb162194adb8448
diff --git a/cmds/statsd/src/StatsLogProcessor.cpp b/cmds/statsd/src/StatsLogProcessor.cpp
index efcb1fe..f2443e8 100644
--- a/cmds/statsd/src/StatsLogProcessor.cpp
+++ b/cmds/statsd/src/StatsLogProcessor.cpp
@@ -79,7 +79,8 @@
       mPeriodicAlarmMonitor(periodicAlarmMonitor),
       mSendBroadcast(sendBroadcast),
       mTimeBaseNs(timeBaseNs),
-      mLastLogTimestamp(0) {
+      mLargestTimestampSeen(0),
+      mLastTimestampSeen(0) {
 }
 
 StatsLogProcessor::~StatsLogProcessor() {
@@ -156,18 +157,54 @@
 }
 
 void StatsLogProcessor::OnLogEvent(LogEvent* event) {
+    OnLogEvent(event, false);
+}
+
+void StatsLogProcessor::OnLogEvent(LogEvent* event, bool reconnected) {
     std::lock_guard<std::mutex> lock(mMetricsMutex);
     const int64_t currentTimestampNs = event->GetElapsedTimestampNs();
 
-    if (currentTimestampNs < mLastLogTimestamp) {
-        StatsdStats::getInstance().noteLogEventSkipped(
-            event->GetTagId(), event->GetElapsedTimestampNs());
-        return;
+    if (reconnected && mLastTimestampSeen != 0) {
+        // LogReader tells us the connection has just been reset. Now we need
+        // to enter reconnection state to find the last CP.
+        mInReconnection = true;
+    }
+
+    if (mInReconnection) {
+        // We see the checkpoint
+        if (currentTimestampNs == mLastTimestampSeen) {
+            mInReconnection = false;
+            // Found the CP. ignore this event, and we will start to read from next event.
+            return;
+        }
+        if (currentTimestampNs > mLargestTimestampSeen) {
+            // We see a new log but CP has not been found yet. Give up now.
+            mLogLossCount++;
+            mInReconnection = false;
+            StatsdStats::getInstance().noteLogLost(currentTimestampNs);
+            // Persist the data before we reset. Do we want this?
+            WriteDataToDiskLocked();
+            // We see fresher event before we see the checkpoint. We might have lost data.
+            // The best we can do is to reset.
+            std::vector<ConfigKey> configKeys;
+            for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) {
+                configKeys.push_back(it->first);
+            }
+            resetConfigsLocked(currentTimestampNs, configKeys);
+        } else {
+            // Still in search of the CP. Keep going.
+            return;
+        }
+    }
+
+    mLogCount++;
+    mLastTimestampSeen = currentTimestampNs;
+    if (mLargestTimestampSeen < currentTimestampNs) {
+        mLargestTimestampSeen = currentTimestampNs;
     }
 
     resetIfConfigTtlExpiredLocked(currentTimestampNs);
 
-    mLastLogTimestamp = currentTimestampNs;
     StatsdStats::getInstance().noteAtomLogged(
         event->GetTagId(), event->GetElapsedTimestampNs() / NS_PER_SEC);
 
@@ -339,15 +376,9 @@
                 (long long)getWallClockNs());
 }
 
-void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) {
-    std::vector<ConfigKey> configKeysTtlExpired;
-    for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) {
-        if (it->second != nullptr && !it->second->isInTtl(timestampNs)) {
-            configKeysTtlExpired.push_back(it->first);
-        }
-    }
-
-    for (const auto& key : configKeysTtlExpired) {
+void StatsLogProcessor::resetConfigsLocked(const int64_t timestampNs,
+                                           const std::vector<ConfigKey>& configs) {
+    for (const auto& key : configs) {
         StatsdConfig config;
         if (StorageManager::readConfigFromDisk(key, &config)) {
             OnConfigUpdatedLocked(timestampNs, key, config);
@@ -362,6 +393,18 @@
     }
 }
 
+void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) {
+    std::vector<ConfigKey> configKeysTtlExpired;
+    for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) {
+        if (it->second != nullptr && !it->second->isInTtl(timestampNs)) {
+            configKeysTtlExpired.push_back(it->first);
+        }
+    }
+    if (configKeysTtlExpired.size() > 0) {
+        resetConfigsLocked(timestampNs, configKeysTtlExpired);
+    }
+}
+
 void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) {
     std::lock_guard<std::mutex> lock(mMetricsMutex);
     auto it = mMetricsManagers.find(key);
diff --git a/cmds/statsd/src/StatsLogProcessor.h b/cmds/statsd/src/StatsLogProcessor.h
index 0e1d4ba..6efdf8c 100644
--- a/cmds/statsd/src/StatsLogProcessor.h
+++ b/cmds/statsd/src/StatsLogProcessor.h
@@ -40,6 +40,9 @@
                       const std::function<void(const ConfigKey&)>& sendBroadcast);
     virtual ~StatsLogProcessor();
 
+    void OnLogEvent(LogEvent* event, bool reconnectionStarts);
+
+    // for testing only.
     void OnLogEvent(LogEvent* event);
 
     void OnConfigUpdated(const int64_t timestampNs, const ConfigKey& key,
@@ -122,16 +125,30 @@
     // Handler over the isolated uid change event.
     void onIsolatedUidChangedEventLocked(const LogEvent& event);
 
+    void resetConfigsLocked(const int64_t timestampNs, const std::vector<ConfigKey>& configs);
+
     // Function used to send a broadcast so that receiver for the config key can call getData
     // to retrieve the stored data.
     std::function<void(const ConfigKey& key)> mSendBroadcast;
 
     const int64_t mTimeBaseNs;
 
-    int64_t mLastLogTimestamp;
+    // Largest timestamp of the events that we have processed.
+    int64_t mLargestTimestampSeen = 0;
+
+    int64_t mLastTimestampSeen = 0;
+
+    bool mInReconnection = false;
+
+    // Processed log count
+    uint64_t mLogCount = 0;
+
+    // Log loss detected count
+    int mLogLossCount = 0;
 
     long mLastPullerCacheClearTimeSec = 0;
 
+    FRIEND_TEST(StatsLogProcessorTest, TestOutOfOrderLogs);
     FRIEND_TEST(StatsLogProcessorTest, TestRateLimitByteSize);
     FRIEND_TEST(StatsLogProcessorTest, TestRateLimitBroadcast);
     FRIEND_TEST(StatsLogProcessorTest, TestDropWhenByteSizeTooLarge);
diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp
index 86a3a78..4e693e3 100644
--- a/cmds/statsd/src/StatsService.cpp
+++ b/cmds/statsd/src/StatsService.cpp
@@ -818,8 +818,8 @@
     mConfigManager->Startup();
 }
 
-void StatsService::OnLogEvent(LogEvent* event) {
-    mProcessor->OnLogEvent(event);
+void StatsService::OnLogEvent(LogEvent* event, bool reconnectionStarts) {
+    mProcessor->OnLogEvent(event, reconnectionStarts);
 }
 
 Status StatsService::getData(int64_t key, vector<uint8_t>* output) {
diff --git a/cmds/statsd/src/StatsService.h b/cmds/statsd/src/StatsService.h
index 648e9c5..5712620 100644
--- a/cmds/statsd/src/StatsService.h
+++ b/cmds/statsd/src/StatsService.h
@@ -76,7 +76,7 @@
     /**
      * Called by LogReader when there's a log event to process.
      */
-    virtual void OnLogEvent(LogEvent* event);
+    virtual void OnLogEvent(LogEvent* event, bool reconnectionStarts);
 
     /**
      * Binder call for clients to request data for this configuration key.
diff --git a/cmds/statsd/src/guardrail/StatsdStats.cpp b/cmds/statsd/src/guardrail/StatsdStats.cpp
index c342aa5..ee3ed23 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.cpp
+++ b/cmds/statsd/src/guardrail/StatsdStats.cpp
@@ -50,7 +50,7 @@
 // const int FIELD_ID_PULLED_ATOM_STATS = 10; // The proto is written in stats_log_util.cpp
 const int FIELD_ID_LOGGER_ERROR_STATS = 11;
 const int FIELD_ID_PERIODIC_ALARM_STATS = 12;
-const int FIELD_ID_SKIPPED_LOG_EVENT_STATS = 13;
+const int FIELD_ID_LOG_LOSS_STATS = 14;
 
 const int FIELD_ID_ATOM_STATS_TAG = 1;
 const int FIELD_ID_ATOM_STATS_COUNT = 2;
@@ -61,9 +61,6 @@
 const int FIELD_ID_LOGGER_STATS_TIME = 1;
 const int FIELD_ID_LOGGER_STATS_ERROR_CODE = 2;
 
-const int FIELD_ID_SKIPPED_LOG_EVENT_STATS_TAG = 1;
-const int FIELD_ID_SKIPPED_LOG_EVENT_STATS_TIMESTAMP = 2;
-
 const int FIELD_ID_CONFIG_STATS_UID = 1;
 const int FIELD_ID_CONFIG_STATS_ID = 2;
 const int FIELD_ID_CONFIG_STATS_CREATION = 3;
@@ -182,6 +179,14 @@
     noteConfigResetInternalLocked(key);
 }
 
+void StatsdStats::noteLogLost(int64_t timestampNs) {
+    lock_guard<std::mutex> lock(mLock);
+    if (mLogLossTimestampNs.size() == kMaxLoggerErrors) {
+        mLogLossTimestampNs.pop_front();
+    }
+    mLogLossTimestampNs.push_back(timestampNs);
+}
+
 void StatsdStats::noteBroadcastSent(const ConfigKey& key) {
     noteBroadcastSent(key, getWallClockSec());
 }
@@ -350,15 +355,6 @@
     mPushedAtomStats[atomId]++;
 }
 
-void StatsdStats::noteLogEventSkipped(int tag, int64_t timestamp) {
-    lock_guard<std::mutex> lock(mLock);
-    // grows strictly one at a time. so it won't > kMaxSkippedLogEvents
-    if (mSkippedLogEvents.size() == kMaxSkippedLogEvents) {
-        mSkippedLogEvents.pop_front();
-    }
-    mSkippedLogEvents.push_back(std::make_pair(tag, timestamp));
-}
-
 void StatsdStats::noteLoggerError(int error) {
     lock_guard<std::mutex> lock(mLock);
     // grows strictly one at a time. so it won't > kMaxLoggerErrors
@@ -381,7 +377,7 @@
     mAnomalyAlarmRegisteredStats = 0;
     mPeriodicAlarmRegisteredStats = 0;
     mLoggerErrors.clear();
-    mSkippedLogEvents.clear();
+    mLogLossTimestampNs.clear();
     for (auto& config : mConfigStats) {
         config.second->broadcast_sent_time_sec.clear();
         config.second->data_drop_time_sec.clear();
@@ -515,8 +511,8 @@
         strftime(buffer, sizeof(buffer), "%Y-%m-%d %I:%M%p\n", error_tm);
         fprintf(out, "Logger error %d at %s\n", error.second, buffer);
     }
-    for (const auto& skipped : mSkippedLogEvents) {
-        fprintf(out, "Log event (%d) skipped at %lld\n", skipped.first, (long long)skipped.second);
+    for (const auto& loss : mLogLossTimestampNs) {
+        fprintf(out, "Log loss detected at %lld (elapsedRealtimeNs)\n", (long long)loss);
     }
 }
 
@@ -672,13 +668,9 @@
         proto.end(token);
     }
 
-    for (const auto& skipped : mSkippedLogEvents) {
-        uint64_t token = proto.start(FIELD_TYPE_MESSAGE | FIELD_ID_SKIPPED_LOG_EVENT_STATS |
-                                      FIELD_COUNT_REPEATED);
-        proto.write(FIELD_TYPE_INT32 | FIELD_ID_SKIPPED_LOG_EVENT_STATS_TAG, skipped.first);
-        proto.write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_LOG_EVENT_STATS_TIMESTAMP,
-                    (long long)skipped.second);
-        proto.end(token);
+    for (const auto& loss : mLogLossTimestampNs) {
+        proto.write(FIELD_TYPE_INT64 | FIELD_ID_LOG_LOSS_STATS | FIELD_COUNT_REPEATED,
+                    (long long)loss);
     }
 
     output->clear();
diff --git a/cmds/statsd/src/guardrail/StatsdStats.h b/cmds/statsd/src/guardrail/StatsdStats.h
index 123a703..2cbcca3 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.h
+++ b/cmds/statsd/src/guardrail/StatsdStats.h
@@ -102,9 +102,7 @@
     // The max number of old config stats we keep.
     const static int kMaxIceBoxSize = 20;
 
-    const static int kMaxLoggerErrors = 10;
-
-    const static int kMaxSkippedLogEvents = 200;
+    const static int kMaxLoggerErrors = 20;
 
     const static int kMaxTimestampCount = 20;
 
@@ -280,7 +278,7 @@
     /**
      * Records statsd skipped an event.
      */
-    void noteLogEventSkipped(int tag, int64_t timestamp);
+    void noteLogLost(int64_t timestamp);
 
     /**
      * Reset the historical stats. Including all stats in icebox, and the tracked stats about
@@ -337,8 +335,8 @@
     // Logd errors. Size capped by kMaxLoggerErrors.
     std::list<const std::pair<int, int>> mLoggerErrors;
 
-    // Skipped log events.
-    std::list<const std::pair<int, int64_t>> mSkippedLogEvents;
+    // Timestamps when we detect log loss after logd reconnect.
+    std::list<int64_t> mLogLossTimestampNs;
 
     // Stores the number of times statsd modified the anomaly alarm registered with
     // StatsCompanionService.
diff --git a/cmds/statsd/src/logd/LogListener.h b/cmds/statsd/src/logd/LogListener.h
index 69ca571..f924040 100644
--- a/cmds/statsd/src/logd/LogListener.h
+++ b/cmds/statsd/src/logd/LogListener.h
@@ -33,7 +33,7 @@
     LogListener();
     virtual ~LogListener();
 
-    virtual void OnLogEvent(LogEvent* msg) = 0;
+    virtual void OnLogEvent(LogEvent* msg, bool reconnectionStarts) = 0;
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/logd/LogReader.cpp b/cmds/statsd/src/logd/LogReader.cpp
index 0fe896b..26ae6a3 100644
--- a/cmds/statsd/src/logd/LogReader.cpp
+++ b/cmds/statsd/src/logd/LogReader.cpp
@@ -113,7 +113,8 @@
             LogEvent event(msg);
 
             // Call the listener
-            mListener->OnLogEvent(&event);
+            mListener->OnLogEvent(&event,
+                                  lineCount == 1 /* indicate whether it's a new connection */);
         }
     }
 
diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto
index 4f7581d..eaa7bf1 100644
--- a/cmds/statsd/src/stats_log.proto
+++ b/cmds/statsd/src/stats_log.proto
@@ -305,4 +305,6 @@
         optional int64 elapsed_timestamp_nanos = 2;
     }
     repeated SkippedLogEventStats skipped_log_event_stats = 13;
+
+    repeated int64 log_loss_stats = 14;
 }