Handle logd reconnect.
When statsd reconnects to logd, statsd will read all logs from buffer again. To prevent us from
reprocessing old events, we do the following:
1. At any given moment, record the largest timestamp(T_max) and last timestamp (check point) that
we've seen before.
2. When reconnection happens, we look for the check point until we see a new log with a timestamp
larger than T_max.
-> If we found the CP, resume after the CP. Success
-> If we can't find CP, there is definitely log loss. We reset all configs.
Note:
1. Logd has an API to read logs after a certain timestamp. But this api is vulnerable to
time changes from Settings. So we cannot rely on it.
2. If logd inserts a new log (with older timestamp) before CP, we cannot detect it. It's not
possible to detect it without record all timestamps we have seen.
Test: statsd_test
Bug: 77813113
Change-Id: Ic3fdb47230807606ab11dc994cb162194adb8448
diff --git a/cmds/statsd/src/StatsLogProcessor.cpp b/cmds/statsd/src/StatsLogProcessor.cpp
index efcb1fe..f2443e8 100644
--- a/cmds/statsd/src/StatsLogProcessor.cpp
+++ b/cmds/statsd/src/StatsLogProcessor.cpp
@@ -79,7 +79,8 @@
mPeriodicAlarmMonitor(periodicAlarmMonitor),
mSendBroadcast(sendBroadcast),
mTimeBaseNs(timeBaseNs),
- mLastLogTimestamp(0) {
+ mLargestTimestampSeen(0),
+ mLastTimestampSeen(0) {
}
StatsLogProcessor::~StatsLogProcessor() {
@@ -156,18 +157,54 @@
}
void StatsLogProcessor::OnLogEvent(LogEvent* event) {
+ OnLogEvent(event, false);
+}
+
+void StatsLogProcessor::OnLogEvent(LogEvent* event, bool reconnected) {
std::lock_guard<std::mutex> lock(mMetricsMutex);
const int64_t currentTimestampNs = event->GetElapsedTimestampNs();
- if (currentTimestampNs < mLastLogTimestamp) {
- StatsdStats::getInstance().noteLogEventSkipped(
- event->GetTagId(), event->GetElapsedTimestampNs());
- return;
+ if (reconnected && mLastTimestampSeen != 0) {
+ // LogReader tells us the connection has just been reset. Now we need
+ // to enter reconnection state to find the last CP.
+ mInReconnection = true;
+ }
+
+ if (mInReconnection) {
+ // We see the checkpoint
+ if (currentTimestampNs == mLastTimestampSeen) {
+ mInReconnection = false;
+ // Found the CP. ignore this event, and we will start to read from next event.
+ return;
+ }
+ if (currentTimestampNs > mLargestTimestampSeen) {
+ // We see a new log but CP has not been found yet. Give up now.
+ mLogLossCount++;
+ mInReconnection = false;
+ StatsdStats::getInstance().noteLogLost(currentTimestampNs);
+ // Persist the data before we reset. Do we want this?
+ WriteDataToDiskLocked();
+ // We see fresher event before we see the checkpoint. We might have lost data.
+ // The best we can do is to reset.
+ std::vector<ConfigKey> configKeys;
+ for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) {
+ configKeys.push_back(it->first);
+ }
+ resetConfigsLocked(currentTimestampNs, configKeys);
+ } else {
+ // Still in search of the CP. Keep going.
+ return;
+ }
+ }
+
+ mLogCount++;
+ mLastTimestampSeen = currentTimestampNs;
+ if (mLargestTimestampSeen < currentTimestampNs) {
+ mLargestTimestampSeen = currentTimestampNs;
}
resetIfConfigTtlExpiredLocked(currentTimestampNs);
- mLastLogTimestamp = currentTimestampNs;
StatsdStats::getInstance().noteAtomLogged(
event->GetTagId(), event->GetElapsedTimestampNs() / NS_PER_SEC);
@@ -339,15 +376,9 @@
(long long)getWallClockNs());
}
-void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) {
- std::vector<ConfigKey> configKeysTtlExpired;
- for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) {
- if (it->second != nullptr && !it->second->isInTtl(timestampNs)) {
- configKeysTtlExpired.push_back(it->first);
- }
- }
-
- for (const auto& key : configKeysTtlExpired) {
+void StatsLogProcessor::resetConfigsLocked(const int64_t timestampNs,
+ const std::vector<ConfigKey>& configs) {
+ for (const auto& key : configs) {
StatsdConfig config;
if (StorageManager::readConfigFromDisk(key, &config)) {
OnConfigUpdatedLocked(timestampNs, key, config);
@@ -362,6 +393,18 @@
}
}
+void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) {
+ std::vector<ConfigKey> configKeysTtlExpired;
+ for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) {
+ if (it->second != nullptr && !it->second->isInTtl(timestampNs)) {
+ configKeysTtlExpired.push_back(it->first);
+ }
+ }
+ if (configKeysTtlExpired.size() > 0) {
+ resetConfigsLocked(timestampNs, configKeysTtlExpired);
+ }
+}
+
void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) {
std::lock_guard<std::mutex> lock(mMetricsMutex);
auto it = mMetricsManagers.find(key);
diff --git a/cmds/statsd/src/StatsLogProcessor.h b/cmds/statsd/src/StatsLogProcessor.h
index 0e1d4ba..6efdf8c 100644
--- a/cmds/statsd/src/StatsLogProcessor.h
+++ b/cmds/statsd/src/StatsLogProcessor.h
@@ -40,6 +40,9 @@
const std::function<void(const ConfigKey&)>& sendBroadcast);
virtual ~StatsLogProcessor();
+ void OnLogEvent(LogEvent* event, bool reconnectionStarts);
+
+ // for testing only.
void OnLogEvent(LogEvent* event);
void OnConfigUpdated(const int64_t timestampNs, const ConfigKey& key,
@@ -122,16 +125,30 @@
// Handler over the isolated uid change event.
void onIsolatedUidChangedEventLocked(const LogEvent& event);
+ void resetConfigsLocked(const int64_t timestampNs, const std::vector<ConfigKey>& configs);
+
// Function used to send a broadcast so that receiver for the config key can call getData
// to retrieve the stored data.
std::function<void(const ConfigKey& key)> mSendBroadcast;
const int64_t mTimeBaseNs;
- int64_t mLastLogTimestamp;
+ // Largest timestamp of the events that we have processed.
+ int64_t mLargestTimestampSeen = 0;
+
+ int64_t mLastTimestampSeen = 0;
+
+ bool mInReconnection = false;
+
+ // Processed log count
+ uint64_t mLogCount = 0;
+
+ // Log loss detected count
+ int mLogLossCount = 0;
long mLastPullerCacheClearTimeSec = 0;
+ FRIEND_TEST(StatsLogProcessorTest, TestOutOfOrderLogs);
FRIEND_TEST(StatsLogProcessorTest, TestRateLimitByteSize);
FRIEND_TEST(StatsLogProcessorTest, TestRateLimitBroadcast);
FRIEND_TEST(StatsLogProcessorTest, TestDropWhenByteSizeTooLarge);
diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp
index 86a3a78..4e693e3 100644
--- a/cmds/statsd/src/StatsService.cpp
+++ b/cmds/statsd/src/StatsService.cpp
@@ -818,8 +818,8 @@
mConfigManager->Startup();
}
-void StatsService::OnLogEvent(LogEvent* event) {
- mProcessor->OnLogEvent(event);
+void StatsService::OnLogEvent(LogEvent* event, bool reconnectionStarts) {
+ mProcessor->OnLogEvent(event, reconnectionStarts);
}
Status StatsService::getData(int64_t key, vector<uint8_t>* output) {
diff --git a/cmds/statsd/src/StatsService.h b/cmds/statsd/src/StatsService.h
index 648e9c5..5712620 100644
--- a/cmds/statsd/src/StatsService.h
+++ b/cmds/statsd/src/StatsService.h
@@ -76,7 +76,7 @@
/**
* Called by LogReader when there's a log event to process.
*/
- virtual void OnLogEvent(LogEvent* event);
+ virtual void OnLogEvent(LogEvent* event, bool reconnectionStarts);
/**
* Binder call for clients to request data for this configuration key.
diff --git a/cmds/statsd/src/guardrail/StatsdStats.cpp b/cmds/statsd/src/guardrail/StatsdStats.cpp
index c342aa5..ee3ed23 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.cpp
+++ b/cmds/statsd/src/guardrail/StatsdStats.cpp
@@ -50,7 +50,7 @@
// const int FIELD_ID_PULLED_ATOM_STATS = 10; // The proto is written in stats_log_util.cpp
const int FIELD_ID_LOGGER_ERROR_STATS = 11;
const int FIELD_ID_PERIODIC_ALARM_STATS = 12;
-const int FIELD_ID_SKIPPED_LOG_EVENT_STATS = 13;
+const int FIELD_ID_LOG_LOSS_STATS = 14;
const int FIELD_ID_ATOM_STATS_TAG = 1;
const int FIELD_ID_ATOM_STATS_COUNT = 2;
@@ -61,9 +61,6 @@
const int FIELD_ID_LOGGER_STATS_TIME = 1;
const int FIELD_ID_LOGGER_STATS_ERROR_CODE = 2;
-const int FIELD_ID_SKIPPED_LOG_EVENT_STATS_TAG = 1;
-const int FIELD_ID_SKIPPED_LOG_EVENT_STATS_TIMESTAMP = 2;
-
const int FIELD_ID_CONFIG_STATS_UID = 1;
const int FIELD_ID_CONFIG_STATS_ID = 2;
const int FIELD_ID_CONFIG_STATS_CREATION = 3;
@@ -182,6 +179,14 @@
noteConfigResetInternalLocked(key);
}
+void StatsdStats::noteLogLost(int64_t timestampNs) {
+ lock_guard<std::mutex> lock(mLock);
+ if (mLogLossTimestampNs.size() == kMaxLoggerErrors) {
+ mLogLossTimestampNs.pop_front();
+ }
+ mLogLossTimestampNs.push_back(timestampNs);
+}
+
void StatsdStats::noteBroadcastSent(const ConfigKey& key) {
noteBroadcastSent(key, getWallClockSec());
}
@@ -350,15 +355,6 @@
mPushedAtomStats[atomId]++;
}
-void StatsdStats::noteLogEventSkipped(int tag, int64_t timestamp) {
- lock_guard<std::mutex> lock(mLock);
- // grows strictly one at a time. so it won't > kMaxSkippedLogEvents
- if (mSkippedLogEvents.size() == kMaxSkippedLogEvents) {
- mSkippedLogEvents.pop_front();
- }
- mSkippedLogEvents.push_back(std::make_pair(tag, timestamp));
-}
-
void StatsdStats::noteLoggerError(int error) {
lock_guard<std::mutex> lock(mLock);
// grows strictly one at a time. so it won't > kMaxLoggerErrors
@@ -381,7 +377,7 @@
mAnomalyAlarmRegisteredStats = 0;
mPeriodicAlarmRegisteredStats = 0;
mLoggerErrors.clear();
- mSkippedLogEvents.clear();
+ mLogLossTimestampNs.clear();
for (auto& config : mConfigStats) {
config.second->broadcast_sent_time_sec.clear();
config.second->data_drop_time_sec.clear();
@@ -515,8 +511,8 @@
strftime(buffer, sizeof(buffer), "%Y-%m-%d %I:%M%p\n", error_tm);
fprintf(out, "Logger error %d at %s\n", error.second, buffer);
}
- for (const auto& skipped : mSkippedLogEvents) {
- fprintf(out, "Log event (%d) skipped at %lld\n", skipped.first, (long long)skipped.second);
+ for (const auto& loss : mLogLossTimestampNs) {
+ fprintf(out, "Log loss detected at %lld (elapsedRealtimeNs)\n", (long long)loss);
}
}
@@ -672,13 +668,9 @@
proto.end(token);
}
- for (const auto& skipped : mSkippedLogEvents) {
- uint64_t token = proto.start(FIELD_TYPE_MESSAGE | FIELD_ID_SKIPPED_LOG_EVENT_STATS |
- FIELD_COUNT_REPEATED);
- proto.write(FIELD_TYPE_INT32 | FIELD_ID_SKIPPED_LOG_EVENT_STATS_TAG, skipped.first);
- proto.write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_LOG_EVENT_STATS_TIMESTAMP,
- (long long)skipped.second);
- proto.end(token);
+ for (const auto& loss : mLogLossTimestampNs) {
+ proto.write(FIELD_TYPE_INT64 | FIELD_ID_LOG_LOSS_STATS | FIELD_COUNT_REPEATED,
+ (long long)loss);
}
output->clear();
diff --git a/cmds/statsd/src/guardrail/StatsdStats.h b/cmds/statsd/src/guardrail/StatsdStats.h
index 123a703..2cbcca3 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.h
+++ b/cmds/statsd/src/guardrail/StatsdStats.h
@@ -102,9 +102,7 @@
// The max number of old config stats we keep.
const static int kMaxIceBoxSize = 20;
- const static int kMaxLoggerErrors = 10;
-
- const static int kMaxSkippedLogEvents = 200;
+ const static int kMaxLoggerErrors = 20;
const static int kMaxTimestampCount = 20;
@@ -280,7 +278,7 @@
/**
* Records statsd skipped an event.
*/
- void noteLogEventSkipped(int tag, int64_t timestamp);
+ void noteLogLost(int64_t timestamp);
/**
* Reset the historical stats. Including all stats in icebox, and the tracked stats about
@@ -337,8 +335,8 @@
// Logd errors. Size capped by kMaxLoggerErrors.
std::list<const std::pair<int, int>> mLoggerErrors;
- // Skipped log events.
- std::list<const std::pair<int, int64_t>> mSkippedLogEvents;
+ // Timestamps when we detect log loss after logd reconnect.
+ std::list<int64_t> mLogLossTimestampNs;
// Stores the number of times statsd modified the anomaly alarm registered with
// StatsCompanionService.
diff --git a/cmds/statsd/src/logd/LogListener.h b/cmds/statsd/src/logd/LogListener.h
index 69ca571..f924040 100644
--- a/cmds/statsd/src/logd/LogListener.h
+++ b/cmds/statsd/src/logd/LogListener.h
@@ -33,7 +33,7 @@
LogListener();
virtual ~LogListener();
- virtual void OnLogEvent(LogEvent* msg) = 0;
+ virtual void OnLogEvent(LogEvent* msg, bool reconnectionStarts) = 0;
};
} // namespace statsd
diff --git a/cmds/statsd/src/logd/LogReader.cpp b/cmds/statsd/src/logd/LogReader.cpp
index 0fe896b..26ae6a3 100644
--- a/cmds/statsd/src/logd/LogReader.cpp
+++ b/cmds/statsd/src/logd/LogReader.cpp
@@ -113,7 +113,8 @@
LogEvent event(msg);
// Call the listener
- mListener->OnLogEvent(&event);
+ mListener->OnLogEvent(&event,
+ lineCount == 1 /* indicate whether it's a new connection */);
}
}
diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto
index 4f7581d..eaa7bf1 100644
--- a/cmds/statsd/src/stats_log.proto
+++ b/cmds/statsd/src/stats_log.proto
@@ -305,4 +305,6 @@
optional int64 elapsed_timestamp_nanos = 2;
}
repeated SkippedLogEventStats skipped_log_event_stats = 13;
+
+ repeated int64 log_loss_stats = 14;
}
diff --git a/cmds/statsd/tests/StatsLogProcessor_test.cpp b/cmds/statsd/tests/StatsLogProcessor_test.cpp
index fb8877a..91a40e3 100644
--- a/cmds/statsd/tests/StatsLogProcessor_test.cpp
+++ b/cmds/statsd/tests/StatsLogProcessor_test.cpp
@@ -178,6 +178,128 @@
EXPECT_EQ(2, report.annotation(0).field_int32());
}
+TEST(StatsLogProcessorTest, TestOutOfOrderLogs) {
+ // Setup simple config key corresponding to empty config.
+ sp<UidMap> m = new UidMap();
+ sp<AlarmMonitor> anomalyAlarmMonitor;
+ sp<AlarmMonitor> subscriberAlarmMonitor;
+ int broadcastCount = 0;
+ StatsLogProcessor p(m, anomalyAlarmMonitor, subscriberAlarmMonitor, 0,
+ [&broadcastCount](const ConfigKey& key) { broadcastCount++; });
+
+ LogEvent event1(0, 1 /*logd timestamp*/, 1001 /*elapsedRealtime*/);
+ event1.init();
+
+ LogEvent event2(0, 2, 1002);
+ event2.init();
+
+ LogEvent event3(0, 3, 1005);
+ event3.init();
+
+ LogEvent event4(0, 4, 1004);
+ event4.init();
+
+ // <----- Reconnection happens
+
+ LogEvent event5(0, 5, 999);
+ event5.init();
+
+ LogEvent event6(0, 6, 2000);
+ event6.init();
+
+ // <----- Reconnection happens
+
+ LogEvent event7(0, 7, 3000);
+ event7.init();
+
+ // first event ever
+ p.OnLogEvent(&event1, true);
+ EXPECT_EQ(1UL, p.mLogCount);
+ EXPECT_EQ(1001LL, p.mLargestTimestampSeen);
+ EXPECT_EQ(1001LL, p.mLastTimestampSeen);
+
+ p.OnLogEvent(&event2, false);
+ EXPECT_EQ(2UL, p.mLogCount);
+ EXPECT_EQ(1002LL, p.mLargestTimestampSeen);
+ EXPECT_EQ(1002LL, p.mLastTimestampSeen);
+
+ p.OnLogEvent(&event3, false);
+ EXPECT_EQ(3UL, p.mLogCount);
+ EXPECT_EQ(1005LL, p.mLargestTimestampSeen);
+ EXPECT_EQ(1005LL, p.mLastTimestampSeen);
+
+ p.OnLogEvent(&event4, false);
+ EXPECT_EQ(4UL, p.mLogCount);
+ EXPECT_EQ(1005LL, p.mLargestTimestampSeen);
+ EXPECT_EQ(1004LL, p.mLastTimestampSeen);
+ EXPECT_FALSE(p.mInReconnection);
+
+ // Reconnect happens, event1 out of buffer. Read event2
+ p.OnLogEvent(&event2, true);
+ EXPECT_EQ(4UL, p.mLogCount);
+ EXPECT_EQ(1005LL, p.mLargestTimestampSeen);
+ EXPECT_EQ(1004LL, p.mLastTimestampSeen);
+ EXPECT_TRUE(p.mInReconnection);
+
+ p.OnLogEvent(&event3, false);
+ EXPECT_EQ(4UL, p.mLogCount);
+ EXPECT_EQ(1005LL, p.mLargestTimestampSeen);
+ EXPECT_EQ(1004LL, p.mLastTimestampSeen);
+ EXPECT_TRUE(p.mInReconnection);
+
+ p.OnLogEvent(&event4, false);
+ EXPECT_EQ(4UL, p.mLogCount);
+ EXPECT_EQ(1005LL, p.mLargestTimestampSeen);
+ EXPECT_EQ(1004LL, p.mLastTimestampSeen);
+ EXPECT_FALSE(p.mInReconnection);
+
+ // Fresh event comes.
+ p.OnLogEvent(&event5, false);
+ EXPECT_EQ(5UL, p.mLogCount);
+ EXPECT_EQ(1005LL, p.mLargestTimestampSeen);
+ EXPECT_EQ(999LL, p.mLastTimestampSeen);
+
+ p.OnLogEvent(&event6, false);
+ EXPECT_EQ(6UL, p.mLogCount);
+ EXPECT_EQ(2000LL, p.mLargestTimestampSeen);
+ EXPECT_EQ(2000LL, p.mLastTimestampSeen);
+
+ // Reconnect happens, read from event4
+ p.OnLogEvent(&event4, true);
+ EXPECT_EQ(6UL, p.mLogCount);
+ EXPECT_EQ(2000LL, p.mLargestTimestampSeen);
+ EXPECT_EQ(2000LL, p.mLastTimestampSeen);
+ EXPECT_TRUE(p.mInReconnection);
+
+ p.OnLogEvent(&event5, false);
+ EXPECT_EQ(6UL, p.mLogCount);
+ EXPECT_EQ(2000LL, p.mLargestTimestampSeen);
+ EXPECT_EQ(2000LL, p.mLastTimestampSeen);
+ EXPECT_TRUE(p.mInReconnection);
+
+ // Before we get out of reconnection state, it reconnects again.
+ p.OnLogEvent(&event5, true);
+ EXPECT_EQ(6UL, p.mLogCount);
+ EXPECT_EQ(2000LL, p.mLargestTimestampSeen);
+ EXPECT_EQ(2000LL, p.mLastTimestampSeen);
+ EXPECT_TRUE(p.mInReconnection);
+
+ p.OnLogEvent(&event6, false);
+ EXPECT_EQ(6UL, p.mLogCount);
+ EXPECT_EQ(2000LL, p.mLargestTimestampSeen);
+ EXPECT_EQ(2000LL, p.mLastTimestampSeen);
+ EXPECT_FALSE(p.mInReconnection);
+ EXPECT_EQ(0, p.mLogLossCount);
+
+ // it reconnects again. All old events are gone. We lose CP.
+ p.OnLogEvent(&event7, true);
+ EXPECT_EQ(7UL, p.mLogCount);
+ EXPECT_EQ(3000LL, p.mLargestTimestampSeen);
+ EXPECT_EQ(3000LL, p.mLastTimestampSeen);
+ EXPECT_EQ(1, p.mLogLossCount);
+ EXPECT_FALSE(p.mInReconnection);
+}
+
#else
GTEST_LOG_(INFO) << "This test does nothing.\n";
#endif