Puller refactor
1) Refactor pullers and add tests.
2) Add timeout to a puller.
mPullTimeoutNs is intrinsic to puller. A pull taking longer than this is
deemed failed and the data discarded.
A metric or StatsPullerManager requesting a pull should monitor the pull
and have deadlineNs. A successful pull may come later than desired due
to statsd processing delays.
3) Add unit tests to puller now that the base puller is more
complicated.
Bug: 118756964
Test: unit test
Change-Id: I0e5d47e2527391f7beef4b2d06bfd5c2f82f1179
diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp
index 50b64b9..f2a4663 100644
--- a/cmds/statsd/src/StatsService.cpp
+++ b/cmds/statsd/src/StatsService.cpp
@@ -704,7 +704,7 @@
status_t StatsService::cmd_print_pulled_metrics(int out, const Vector<String8>& args) {
int s = atoi(args[1].c_str());
vector<shared_ptr<LogEvent> > stats;
- if (mPullerManager->Pull(s, getElapsedRealtimeNs(), &stats)) {
+ if (mPullerManager->Pull(s, &stats)) {
for (const auto& it : stats) {
dprintf(out, "Pull from %d: %s\n", s, it->ToString().c_str());
}
diff --git a/cmds/statsd/src/external/PowerStatsPuller.h b/cmds/statsd/src/external/PowerStatsPuller.h
index dd5ff8f..6f15bd6 100644
--- a/cmds/statsd/src/external/PowerStatsPuller.h
+++ b/cmds/statsd/src/external/PowerStatsPuller.h
@@ -28,6 +28,8 @@
class PowerStatsPuller : public StatsPuller {
public:
PowerStatsPuller();
+
+private:
bool PullInternal(vector<std::shared_ptr<LogEvent>>* data) override;
};
diff --git a/cmds/statsd/src/external/ResourceHealthManagerPuller.h b/cmds/statsd/src/external/ResourceHealthManagerPuller.h
index ba6e6c3..f650fcc 100644
--- a/cmds/statsd/src/external/ResourceHealthManagerPuller.h
+++ b/cmds/statsd/src/external/ResourceHealthManagerPuller.h
@@ -29,6 +29,8 @@
class ResourceHealthManagerPuller : public StatsPuller {
public:
explicit ResourceHealthManagerPuller(int tagId);
+
+private:
bool PullInternal(vector<std::shared_ptr<LogEvent>>* data) override;
};
diff --git a/cmds/statsd/src/external/ResourceThermalManagerPuller.h b/cmds/statsd/src/external/ResourceThermalManagerPuller.h
index 13c675a..5313792 100644
--- a/cmds/statsd/src/external/ResourceThermalManagerPuller.h
+++ b/cmds/statsd/src/external/ResourceThermalManagerPuller.h
@@ -29,6 +29,8 @@
class ResourceThermalManagerPuller : public StatsPuller {
public:
ResourceThermalManagerPuller();
+
+private:
bool PullInternal(vector<std::shared_ptr<LogEvent>>* data) override;
};
diff --git a/cmds/statsd/src/external/StatsCompanionServicePuller.h b/cmds/statsd/src/external/StatsCompanionServicePuller.h
index a16baf0..2e13320 100644
--- a/cmds/statsd/src/external/StatsCompanionServicePuller.h
+++ b/cmds/statsd/src/external/StatsCompanionServicePuller.h
@@ -26,13 +26,13 @@
class StatsCompanionServicePuller : public StatsPuller {
public:
explicit StatsCompanionServicePuller(int tagId);
- bool PullInternal(vector<std::shared_ptr<LogEvent> >* data) override;
void SetStatsCompanionService(sp<IStatsCompanionService> statsCompanionService) override;
private:
Mutex mStatsCompanionServiceLock;
sp<IStatsCompanionService> mStatsCompanionService = nullptr;
+ bool PullInternal(vector<std::shared_ptr<LogEvent> >* data) override;
};
} // namespace statsd
diff --git a/cmds/statsd/src/external/StatsPuller.cpp b/cmds/statsd/src/external/StatsPuller.cpp
index 7043d66..c7c22ee 100644
--- a/cmds/statsd/src/external/StatsPuller.cpp
+++ b/cmds/statsd/src/external/StatsPuller.cpp
@@ -34,48 +34,52 @@
StatsPuller::StatsPuller(const int tagId)
: mTagId(tagId) {
- // Pullers can cause significant impact to system health and battery.
- // So that we don't pull too frequently.
- mCoolDownNs = StatsPullerManager::kAllPullAtomInfo.find(tagId)->second.coolDownNs;
- VLOG("Puller for tag %d created. Cooldown set to %lld", mTagId, (long long)mCoolDownNs);
}
-bool StatsPuller::Pull(const int64_t elapsedTimeNs, std::vector<std::shared_ptr<LogEvent>>* data) {
+bool StatsPuller::Pull(std::vector<std::shared_ptr<LogEvent>>* data) {
lock_guard<std::mutex> lock(mLock);
- int64_t wallClockTimeNs = getWallClockNs();
+ int64_t elapsedTimeNs = getElapsedRealtimeNs();
StatsdStats::getInstance().notePull(mTagId);
- if (elapsedTimeNs - mLastPullTimeNs < mCoolDownNs) {
- (*data) = mCachedData;
- StatsdStats::getInstance().notePullFromCache(mTagId);
- StatsdStats::getInstance().notePullDelay(mTagId, getElapsedRealtimeNs() - elapsedTimeNs);
- return true;
+ const bool shouldUseCache = elapsedTimeNs - mLastPullTimeNs <
+ StatsPullerManager::kAllPullAtomInfo.at(mTagId).coolDownNs;
+ if (shouldUseCache) {
+ if (mHasGoodData) {
+ (*data) = mCachedData;
+ StatsdStats::getInstance().notePullFromCache(mTagId);
+ }
+ return mHasGoodData;
}
- if (mMinPullIntervalNs > elapsedTimeNs - mLastPullTimeNs) {
- mMinPullIntervalNs = elapsedTimeNs - mLastPullTimeNs;
- StatsdStats::getInstance().updateMinPullIntervalSec(mTagId,
- mMinPullIntervalNs / NS_PER_SEC);
+
+ if (mLastPullTimeNs > 0) {
+ StatsdStats::getInstance().updateMinPullIntervalSec(
+ mTagId, (elapsedTimeNs - mLastPullTimeNs) / NS_PER_SEC);
}
mCachedData.clear();
mLastPullTimeNs = elapsedTimeNs;
- int64_t pullStartTimeNs = getElapsedRealtimeNs();
- bool ret = PullInternal(&mCachedData);
- if (!ret) {
- mCachedData.clear();
- return false;
+ mHasGoodData = PullInternal(&mCachedData);
+ if (!mHasGoodData) {
+ return mHasGoodData;
}
- StatsdStats::getInstance().notePullTime(mTagId, getElapsedRealtimeNs() - pullStartTimeNs);
- for (const shared_ptr<LogEvent>& data : mCachedData) {
- data->setElapsedTimestampNs(elapsedTimeNs);
- data->setLogdWallClockTimestampNs(wallClockTimeNs);
+ const int64_t pullDurationNs = getElapsedRealtimeNs() - elapsedTimeNs;
+ StatsdStats::getInstance().notePullTime(mTagId, pullDurationNs);
+ const bool pullTimeOut =
+ pullDurationNs > StatsPullerManager::kAllPullAtomInfo.at(mTagId).pullTimeoutNs;
+ if (pullTimeOut) {
+ // Something went wrong. Discard the data.
+ clearCacheLocked();
+ mHasGoodData = false;
+ StatsdStats::getInstance().notePullTimeout(mTagId);
+ ALOGW("Pull for atom %d exceeds timeout %lld nano seconds.", mTagId,
+ (long long)pullDurationNs);
+ return mHasGoodData;
}
if (mCachedData.size() > 0) {
mapAndMergeIsolatedUidsToHostUid(mCachedData, mUidMap, mTagId);
- (*data) = mCachedData;
}
- StatsdStats::getInstance().notePullDelay(mTagId, getElapsedRealtimeNs() - elapsedTimeNs);
- return ret;
+ (*data) = mCachedData;
+ return mHasGoodData;
}
int StatsPuller::ForceClearCache() {
@@ -84,6 +88,10 @@
int StatsPuller::clearCache() {
lock_guard<std::mutex> lock(mLock);
+ return clearCacheLocked();
+}
+
+int StatsPuller::clearCacheLocked() {
int ret = mCachedData.size();
mCachedData.clear();
mLastPullTimeNs = 0;
@@ -91,7 +99,8 @@
}
int StatsPuller::ClearCacheIfNecessary(int64_t timestampNs) {
- if (timestampNs - mLastPullTimeNs > mCoolDownNs) {
+ if (timestampNs - mLastPullTimeNs >
+ StatsPullerManager::kAllPullAtomInfo.at(mTagId).coolDownNs) {
return clearCache();
} else {
return 0;
diff --git a/cmds/statsd/src/external/StatsPuller.h b/cmds/statsd/src/external/StatsPuller.h
index f8ecb87..c83c4f8 100644
--- a/cmds/statsd/src/external/StatsPuller.h
+++ b/cmds/statsd/src/external/StatsPuller.h
@@ -18,7 +18,6 @@
#include <android/os/IStatsCompanionService.h>
#include <utils/RefBase.h>
-#include <utils/String16.h>
#include <mutex>
#include <vector>
#include "packages/UidMap.h"
@@ -37,10 +36,16 @@
virtual ~StatsPuller() {}
- // Pulls the data. The returned data will have elapsedTimeNs set as timeNs
- // and will have wallClockTimeNs set as current wall clock time.
- // Return true if the pull is successful.
- bool Pull(const int64_t timeNs, std::vector<std::shared_ptr<LogEvent>>* data);
+ // Pulls the most recent data.
+ // The data may be served from cache if consecutive pulls come within
+ // predefined cooldown time.
+ // Returns true if the pull was successful.
+ // Returns false when
+ // 1) the pull fails
+ // 2) pull takes longer than mPullTimeoutNs (intrinsic to puller)
+ // If a metric wants to make any change to the data, like timestamps, it
+ // should make a copy as this data may be shared with multiple metrics.
+ bool Pull(std::vector<std::shared_ptr<LogEvent>>* data);
// Clear cache immediately
int ForceClearCache();
@@ -53,29 +58,30 @@
virtual void SetStatsCompanionService(sp<IStatsCompanionService> statsCompanionService){};
protected:
- // The atom tag id this puller pulls
const int mTagId;
private:
mutable std::mutex mLock;
- // Minimum time before this puller does actual pull again.
- // If a pull request comes before cooldown, a cached version from purevious pull
- // will be returned.
- // The actual value should be determined by individual pullers.
- int64_t mCoolDownNs;
- // For puller stats
- int64_t mMinPullIntervalNs = LONG_MAX;
+ // Real puller impl.
virtual bool PullInternal(std::vector<std::shared_ptr<LogEvent>>* data) = 0;
- // Cache of data from last pull. If next request comes before cool down finishes,
- // cached data will be returned.
- std::vector<std::shared_ptr<LogEvent>> mCachedData;
+ bool mHasGoodData = false;
int64_t mLastPullTimeNs;
+ // Cache of data from last pull. If next request comes before cool down finishes,
+ // cached data will be returned.
+ // Cached data is cleared when
+ // 1) A pull fails
+ // 2) A new pull request comes after cooldown time.
+ // 3) clearCache is called.
+ std::vector<std::shared_ptr<LogEvent>> mCachedData;
+
int clearCache();
+ int clearCacheLocked();
+
static sp<UidMap> mUidMap;
};
diff --git a/cmds/statsd/src/external/StatsPullerManager.cpp b/cmds/statsd/src/external/StatsPullerManager.cpp
index c070ca3..4a716cf 100644
--- a/cmds/statsd/src/external/StatsPullerManager.cpp
+++ b/cmds/statsd/src/external/StatsPullerManager.cpp
@@ -53,195 +53,172 @@
const std::map<int, PullAtomInfo> StatsPullerManager::kAllPullAtomInfo = {
// wifi_bytes_transfer
{android::util::WIFI_BYTES_TRANSFER,
- {{2, 3, 4, 5},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::WIFI_BYTES_TRANSFER)}},
+ {.additiveFields = {2, 3, 4, 5},
+ .puller = new StatsCompanionServicePuller(android::util::WIFI_BYTES_TRANSFER)}},
// wifi_bytes_transfer_by_fg_bg
{android::util::WIFI_BYTES_TRANSFER_BY_FG_BG,
- {{3, 4, 5, 6},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::WIFI_BYTES_TRANSFER_BY_FG_BG)}},
+ {.additiveFields = {3, 4, 5, 6},
+ .puller = new StatsCompanionServicePuller(android::util::WIFI_BYTES_TRANSFER_BY_FG_BG)}},
// mobile_bytes_transfer
{android::util::MOBILE_BYTES_TRANSFER,
- {{2, 3, 4, 5},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::MOBILE_BYTES_TRANSFER)}},
+ {.additiveFields = {2, 3, 4, 5},
+ .puller = new StatsCompanionServicePuller(android::util::MOBILE_BYTES_TRANSFER)}},
// mobile_bytes_transfer_by_fg_bg
{android::util::MOBILE_BYTES_TRANSFER_BY_FG_BG,
- {{3, 4, 5, 6},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::MOBILE_BYTES_TRANSFER_BY_FG_BG)}},
+ {.additiveFields = {3, 4, 5, 6},
+ .puller =
+ new StatsCompanionServicePuller(android::util::MOBILE_BYTES_TRANSFER_BY_FG_BG)}},
// bluetooth_bytes_transfer
{android::util::BLUETOOTH_BYTES_TRANSFER,
- {{2, 3},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::BLUETOOTH_BYTES_TRANSFER)}},
+ {.additiveFields = {2, 3},
+ .puller = new StatsCompanionServicePuller(android::util::BLUETOOTH_BYTES_TRANSFER)}},
// kernel_wakelock
{android::util::KERNEL_WAKELOCK,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::KERNEL_WAKELOCK)}},
+ {.puller = new StatsCompanionServicePuller(android::util::KERNEL_WAKELOCK)}},
// subsystem_sleep_state
- {android::util::SUBSYSTEM_SLEEP_STATE,
- {{}, 1 * NS_PER_SEC, new SubsystemSleepStatePuller()}},
+ {android::util::SUBSYSTEM_SLEEP_STATE, {.puller = new SubsystemSleepStatePuller()}},
// on_device_power_measurement
- {android::util::ON_DEVICE_POWER_MEASUREMENT, {{}, 1 * NS_PER_SEC, new PowerStatsPuller()}},
+ {android::util::ON_DEVICE_POWER_MEASUREMENT, {.puller = new PowerStatsPuller()}},
// cpu_time_per_freq
{android::util::CPU_TIME_PER_FREQ,
- {{3}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::CPU_TIME_PER_FREQ)}},
+ {.additiveFields = {3},
+ .puller = new StatsCompanionServicePuller(android::util::CPU_TIME_PER_FREQ)}},
// cpu_time_per_uid
{android::util::CPU_TIME_PER_UID,
- {{2, 3},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::CPU_TIME_PER_UID)}},
+ {.additiveFields = {2, 3},
+ .puller = new StatsCompanionServicePuller(android::util::CPU_TIME_PER_UID)}},
// cpu_time_per_uid_freq
// the throttling is 3sec, handled in
// frameworks/base/core/java/com/android/internal/os/KernelCpuProcReader
{android::util::CPU_TIME_PER_UID_FREQ,
- {{4},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::CPU_TIME_PER_UID_FREQ)}},
+ {.additiveFields = {4},
+ .puller = new StatsCompanionServicePuller(android::util::CPU_TIME_PER_UID_FREQ)}},
// cpu_active_time
// the throttling is 3sec, handled in
// frameworks/base/core/java/com/android/internal/os/KernelCpuProcReader
{android::util::CPU_ACTIVE_TIME,
- {{2}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::CPU_ACTIVE_TIME)}},
+ {.additiveFields = {2},
+ .puller = new StatsCompanionServicePuller(android::util::CPU_ACTIVE_TIME)}},
// cpu_cluster_time
// the throttling is 3sec, handled in
// frameworks/base/core/java/com/android/internal/os/KernelCpuProcReader
{android::util::CPU_CLUSTER_TIME,
- {{3}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::CPU_CLUSTER_TIME)}},
+ {.additiveFields = {3},
+ .puller = new StatsCompanionServicePuller(android::util::CPU_CLUSTER_TIME)}},
// wifi_activity_energy_info
{android::util::WIFI_ACTIVITY_INFO,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::WIFI_ACTIVITY_INFO)}},
+ {.puller = new StatsCompanionServicePuller(android::util::WIFI_ACTIVITY_INFO)}},
// modem_activity_info
{android::util::MODEM_ACTIVITY_INFO,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::MODEM_ACTIVITY_INFO)}},
+ {.puller = new StatsCompanionServicePuller(android::util::MODEM_ACTIVITY_INFO)}},
// bluetooth_activity_info
{android::util::BLUETOOTH_ACTIVITY_INFO,
- {{},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::BLUETOOTH_ACTIVITY_INFO)}},
+ {.puller = new StatsCompanionServicePuller(android::util::BLUETOOTH_ACTIVITY_INFO)}},
// system_elapsed_realtime
{android::util::SYSTEM_ELAPSED_REALTIME,
- {{},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::SYSTEM_ELAPSED_REALTIME)}},
+ {.pullTimeoutNs = NS_PER_SEC / 2,
+ .coolDownNs = NS_PER_SEC,
+ .puller = new StatsCompanionServicePuller(android::util::SYSTEM_ELAPSED_REALTIME)}},
// system_uptime
{android::util::SYSTEM_UPTIME,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::SYSTEM_UPTIME)}},
+ {.puller = new StatsCompanionServicePuller(android::util::SYSTEM_UPTIME)}},
// remaining_battery_capacity
{android::util::REMAINING_BATTERY_CAPACITY,
- {{},
- 1 * NS_PER_SEC,
- new ResourceHealthManagerPuller(android::util::REMAINING_BATTERY_CAPACITY)}},
+ {.puller = new ResourceHealthManagerPuller(android::util::REMAINING_BATTERY_CAPACITY)}},
// full_battery_capacity
{android::util::FULL_BATTERY_CAPACITY,
- {{},
- 1 * NS_PER_SEC,
- new ResourceHealthManagerPuller(android::util::FULL_BATTERY_CAPACITY)}},
+ {.puller = new ResourceHealthManagerPuller(android::util::FULL_BATTERY_CAPACITY)}},
// battery_voltage
{android::util::BATTERY_VOLTAGE,
- {{}, 1 * NS_PER_SEC, new ResourceHealthManagerPuller(android::util::BATTERY_VOLTAGE)}},
- // battery_level
+ {.puller = new ResourceHealthManagerPuller(android::util::BATTERY_VOLTAGE)}},
+ // battery_voltage
{android::util::BATTERY_LEVEL,
- {{}, 1 * NS_PER_SEC, new ResourceHealthManagerPuller(android::util::BATTERY_LEVEL)}},
+ {.puller = new ResourceHealthManagerPuller(android::util::BATTERY_LEVEL)}},
// process_memory_state
{android::util::PROCESS_MEMORY_STATE,
- {{4, 5, 6, 7, 8, 9},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::PROCESS_MEMORY_STATE)}},
+ {.additiveFields = {4, 5, 6, 7, 8, 9},
+ .puller = new StatsCompanionServicePuller(android::util::PROCESS_MEMORY_STATE)}},
// native_process_memory_state
{android::util::NATIVE_PROCESS_MEMORY_STATE,
- {{3, 4, 5, 6},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::NATIVE_PROCESS_MEMORY_STATE)}},
+ {.additiveFields = {3, 4, 5, 6},
+ .puller = new StatsCompanionServicePuller(android::util::NATIVE_PROCESS_MEMORY_STATE)}},
{android::util::PROCESS_MEMORY_HIGH_WATER_MARK,
- {{3},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::PROCESS_MEMORY_HIGH_WATER_MARK)}},
+ {.additiveFields = {3},
+ .puller =
+ new StatsCompanionServicePuller(android::util::PROCESS_MEMORY_HIGH_WATER_MARK)}},
// temperature
- {android::util::TEMPERATURE, {{}, 1 * NS_PER_SEC, new ResourceThermalManagerPuller()}},
+ {android::util::TEMPERATURE, {.puller = new ResourceThermalManagerPuller()}},
// binder_calls
{android::util::BINDER_CALLS,
- {{4, 5, 6, 8, 12},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::BINDER_CALLS)}},
+ {.additiveFields = {4, 5, 6, 8, 12},
+ .puller = new StatsCompanionServicePuller(android::util::BINDER_CALLS)}},
// binder_calls_exceptions
{android::util::BINDER_CALLS_EXCEPTIONS,
- {{},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::BINDER_CALLS_EXCEPTIONS)}},
+ {.puller = new StatsCompanionServicePuller(android::util::BINDER_CALLS_EXCEPTIONS)}},
// looper_stats
{android::util::LOOPER_STATS,
- {{5, 6, 7, 8, 9},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::LOOPER_STATS)}},
+ {.additiveFields = {5, 6, 7, 8, 9},
+ .puller = new StatsCompanionServicePuller(android::util::LOOPER_STATS)}},
// Disk Stats
{android::util::DISK_STATS,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::DISK_STATS)}},
+ {.puller = new StatsCompanionServicePuller(android::util::DISK_STATS)}},
// Directory usage
{android::util::DIRECTORY_USAGE,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::DIRECTORY_USAGE)}},
+ {.puller = new StatsCompanionServicePuller(android::util::DIRECTORY_USAGE)}},
// Size of app's code, data, and cache
{android::util::APP_SIZE,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::APP_SIZE)}},
+ {.puller = new StatsCompanionServicePuller(android::util::APP_SIZE)}},
// Size of specific categories of files. Eg. Music.
{android::util::CATEGORY_SIZE,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::CATEGORY_SIZE)}},
+ {.puller = new StatsCompanionServicePuller(android::util::CATEGORY_SIZE)}},
// Number of fingerprints registered to each user.
{android::util::NUM_FINGERPRINTS,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::NUM_FINGERPRINTS)}},
+ {.puller = new StatsCompanionServicePuller(android::util::NUM_FINGERPRINTS)}},
// ProcStats.
{android::util::PROC_STATS,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::PROC_STATS)}},
+ {.puller = new StatsCompanionServicePuller(android::util::PROC_STATS)}},
// ProcStatsPkgProc.
{android::util::PROC_STATS_PKG_PROC,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::PROC_STATS_PKG_PROC)}},
+ {.puller = new StatsCompanionServicePuller(android::util::PROC_STATS_PKG_PROC)}},
// Disk I/O stats per uid.
{android::util::DISK_IO,
- {{2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
- 3 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::DISK_IO)}},
+ {.additiveFields = {2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
+ .coolDownNs = 3 * NS_PER_SEC,
+ .puller = new StatsCompanionServicePuller(android::util::DISK_IO)}},
// PowerProfile constants for power model calculations.
{android::util::POWER_PROFILE,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::POWER_PROFILE)}},
+ {.puller = new StatsCompanionServicePuller(android::util::POWER_PROFILE)}},
// Process cpu stats. Min cool-down is 5 sec, inline with what AcitivityManagerService uses.
{android::util::PROCESS_CPU_TIME,
- {{} /* additive fields */,
- 5 * NS_PER_SEC /* min cool-down in seconds*/,
- new StatsCompanionServicePuller(android::util::PROCESS_CPU_TIME)}},
+ {.coolDownNs = 5 * NS_PER_SEC /* min cool-down in seconds*/,
+ .puller = new StatsCompanionServicePuller(android::util::PROCESS_CPU_TIME)}},
{android::util::CPU_TIME_PER_THREAD_FREQ,
- {{7, 9, 11, 13, 15, 17, 19, 21},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::CPU_TIME_PER_THREAD_FREQ)}},
+ {.additiveFields = {7, 9, 11, 13, 15, 17, 19, 21},
+ .puller = new StatsCompanionServicePuller(android::util::CPU_TIME_PER_THREAD_FREQ)}},
// DeviceCalculatedPowerUse.
{android::util::DEVICE_CALCULATED_POWER_USE,
- {{},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::DEVICE_CALCULATED_POWER_USE)}},
+ {.puller = new StatsCompanionServicePuller(android::util::DEVICE_CALCULATED_POWER_USE)}},
// DeviceCalculatedPowerBlameUid.
{android::util::DEVICE_CALCULATED_POWER_BLAME_UID,
- {{}, // BatteryStats already merged isolated with host ids so it's unnecessary here.
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::DEVICE_CALCULATED_POWER_BLAME_UID)}},
+ {.puller = new StatsCompanionServicePuller(
+ android::util::DEVICE_CALCULATED_POWER_BLAME_UID)}},
// DeviceCalculatedPowerBlameOther.
{android::util::DEVICE_CALCULATED_POWER_BLAME_OTHER,
- {{},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::DEVICE_CALCULATED_POWER_BLAME_OTHER)}},
+ {.puller = new StatsCompanionServicePuller(
+ android::util::DEVICE_CALCULATED_POWER_BLAME_OTHER)}},
// BuildInformation.
{android::util::BUILD_INFORMATION,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::BUILD_INFORMATION)}},
+ {.puller = new StatsCompanionServicePuller(android::util::BUILD_INFORMATION)}},
};
StatsPullerManager::StatsPullerManager() : mNextPullTimeNs(NO_ALARM_UPDATE) {
}
-bool StatsPullerManager::Pull(const int tagId, const int64_t timeNs,
- vector<shared_ptr<LogEvent>>* data) {
+bool StatsPullerManager::Pull(int tagId, vector<shared_ptr<LogEvent>>* data) {
VLOG("Initiating pulling %d", tagId);
if (kAllPullAtomInfo.find(tagId) != kAllPullAtomInfo.end()) {
- bool ret = kAllPullAtomInfo.find(tagId)->second.puller->Pull(timeNs, data);
+ bool ret = kAllPullAtomInfo.find(tagId)->second.puller->Pull(data);
VLOG("pulled %d items", (int)data->size());
return ret;
} else {
@@ -333,8 +310,9 @@
}
}
-void StatsPullerManager::OnAlarmFired(const int64_t currentTimeNs) {
+void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {
AutoMutex _l(mLock);
+ int64_t wallClockNs = getWallClockNs();
int64_t minNextPullTimeNs = NO_ALARM_UPDATE;
@@ -344,7 +322,7 @@
vector<ReceiverInfo*> receivers = vector<ReceiverInfo*>();
if (pair.second.size() != 0) {
for (ReceiverInfo& receiverInfo : pair.second) {
- if (receiverInfo.nextPullTimeNs <= currentTimeNs) {
+ if (receiverInfo.nextPullTimeNs <= elapsedTimeNs) {
receivers.push_back(&receiverInfo);
} else {
if (receiverInfo.nextPullTimeNs < minNextPullTimeNs) {
@@ -360,22 +338,38 @@
for (const auto& pullInfo : needToPull) {
vector<shared_ptr<LogEvent>> data;
- if (Pull(pullInfo.first, currentTimeNs, &data)) {
- for (const auto& receiverInfo : pullInfo.second) {
- sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
- if (receiverPtr != nullptr) {
- receiverPtr->onDataPulled(data);
- // we may have just come out of a coma, compute next pull time
- receiverInfo->nextPullTimeNs =
- (currentTimeNs - receiverInfo->nextPullTimeNs) /
- receiverInfo->intervalNs * receiverInfo->intervalNs +
- receiverInfo->intervalNs + receiverInfo->nextPullTimeNs;
- if (receiverInfo->nextPullTimeNs < minNextPullTimeNs) {
- minNextPullTimeNs = receiverInfo->nextPullTimeNs;
- }
- } else {
- VLOG("receiver already gone.");
+ if (!Pull(pullInfo.first, &data)) {
+ VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
+ continue;
+ }
+ StatsdStats::getInstance().notePullDelay(pullInfo.first,
+ getElapsedRealtimeNs() - elapsedTimeNs);
+
+ // Convention is to mark pull atom timestamp at request time.
+ // If we pull at t0, puller starts at t1, finishes at t2, and send back
+ // at t3, we mark t0 as its timestamp, which should correspond to its
+ // triggering event, such as condition change at t0.
+ // Here the triggering event is alarm fired from AlarmManager.
+ // In ValueMetricProducer and GaugeMetricProducer we do same thing
+ // when pull on condition change, etc.
+ for (auto& event : data) {
+ event->setElapsedTimestampNs(elapsedTimeNs);
+ event->setLogdWallClockTimestampNs(wallClockNs);
+ }
+
+ for (const auto& receiverInfo : pullInfo.second) {
+ sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
+ if (receiverPtr != nullptr) {
+ receiverPtr->onDataPulled(data);
+ // we may have just come out of a coma, compute next pull time
+ int numBucketsAhead =
+ (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
+ receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;
+ if (receiverInfo->nextPullTimeNs < minNextPullTimeNs) {
+ minNextPullTimeNs = receiverInfo->nextPullTimeNs;
}
+ } else {
+ VLOG("receiver already gone.");
}
}
}
diff --git a/cmds/statsd/src/external/StatsPullerManager.h b/cmds/statsd/src/external/StatsPullerManager.h
index 3350736..807e4af 100644
--- a/cmds/statsd/src/external/StatsPullerManager.h
+++ b/cmds/statsd/src/external/StatsPullerManager.h
@@ -26,6 +26,7 @@
#include <vector>
#include "PullDataReceiver.h"
#include "StatsPuller.h"
+#include "guardrail/StatsdStats.h"
#include "logd/LogEvent.h"
namespace android {
@@ -36,11 +37,19 @@
// The field numbers of the fields that need to be summed when merging
// isolated uid with host uid.
std::vector<int> additiveFields;
- // How long should the puller wait before doing an actual pull again. Default
- // 1 sec. Set this to 0 if this is handled elsewhere.
+ // Minimum time before this puller does actual pull again.
+ // Pullers can cause significant impact to system health and battery.
+ // So that we don't pull too frequently.
+ // If a pull request comes before cooldown, a cached version from previous pull
+ // will be returned.
int64_t coolDownNs = 1 * NS_PER_SEC;
// The actual puller
sp<StatsPuller> puller;
+ // Max time allowed to pull this atom.
+ // We cannot reliably kill a pull thread. So we don't terminate the puller.
+ // The data is discarded if the pull takes longer than this and mHasGoodData
+ // marked as false.
+ int64_t pullTimeoutNs = StatsdStats::kPullMaxDelayNs;
} PullAtomInfo;
class StatsPullerManager : public virtual RefBase {
@@ -61,13 +70,18 @@
// Verify if we know how to pull for this matcher
bool PullerForMatcherExists(int tagId) const;
- void OnAlarmFired(const int64_t timeNs);
+ void OnAlarmFired(int64_t elapsedTimeNs);
- // Use respective puller to pull the data. The returned data will have
- // elapsedTimeNs set as timeNs and will have wallClockTimeNs set as current
- // wall clock time.
- virtual bool Pull(const int tagId, const int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data);
+ // Pulls the most recent data.
+ // The data may be served from cache if consecutive pulls come within
+ // mCoolDownNs.
+ // Returns true if the pull was successful.
+ // Returns false when
+ // 1) the pull fails
+ // 2) pull takes longer than mPullTimeoutNs (intrinsic to puller)
+ // If the metric wants to make any change to the data, like timestamps, they
+ // should make a copy as this data may be shared with multiple metrics.
+ virtual bool Pull(int tagId, vector<std::shared_ptr<LogEvent>>* data);
// Clear pull data cache immediately.
int ForceClearPullerCache();
diff --git a/cmds/statsd/src/external/SubsystemSleepStatePuller.h b/cmds/statsd/src/external/SubsystemSleepStatePuller.h
index 17ce5b4cb..87f5f02 100644
--- a/cmds/statsd/src/external/SubsystemSleepStatePuller.h
+++ b/cmds/statsd/src/external/SubsystemSleepStatePuller.h
@@ -29,6 +29,8 @@
class SubsystemSleepStatePuller : public StatsPuller {
public:
SubsystemSleepStatePuller();
+
+private:
bool PullInternal(vector<std::shared_ptr<LogEvent>>* data) override;
};
diff --git a/cmds/statsd/src/guardrail/StatsdStats.cpp b/cmds/statsd/src/guardrail/StatsdStats.cpp
index 3e5e82f..f4d0144 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.cpp
+++ b/cmds/statsd/src/guardrail/StatsdStats.cpp
@@ -373,6 +373,16 @@
mPulledAtomStats[pullAtomId].dataError++;
}
+void StatsdStats::notePullTimeout(int pullAtomId) {
+ lock_guard<std::mutex> lock(mLock);
+ mPulledAtomStats[pullAtomId].pullTimeout++;
+}
+
+void StatsdStats::notePullExceedMaxDelay(int pullAtomId) {
+ lock_guard<std::mutex> lock(mLock);
+ mPulledAtomStats[pullAtomId].pullExceedMaxDelay++;
+}
+
void StatsdStats::noteAtomLogged(int atomId, int32_t timeSec) {
lock_guard<std::mutex> lock(mLock);
@@ -429,6 +439,8 @@
pullStats.second.maxPullDelayNs = 0;
pullStats.second.numPullDelay = 0;
pullStats.second.dataError = 0;
+ pullStats.second.pullTimeout = 0;
+ pullStats.second.pullExceedMaxDelay = 0;
}
}
@@ -535,13 +547,16 @@
dprintf(out, "********Pulled Atom stats***********\n");
for (const auto& pair : mPulledAtomStats) {
dprintf(out,
- "Atom %d->(total pull)%ld, (pull from cache)%ld, (min pull interval)%ld, (average "
- "pull time nanos)%lld, (max pull time nanos)%lld, (average pull delay nanos)%lld, "
- "(max pull delay nanos)%lld, (data error)%ld\n",
+ "Atom %d->(total pull)%ld, (pull from cache)%ld, (min pull interval)%ld \n"
+ " (average pull time nanos)%lld, (max pull time nanos)%lld, (average pull delay "
+ "nanos)%lld, "
+ " (max pull delay nanos)%lld, (data error)%ld\n"
+ " (pull timeout)%ld, (pull exceed max delay)%ld\n",
(int)pair.first, (long)pair.second.totalPull, (long)pair.second.totalPullFromCache,
(long)pair.second.minPullIntervalSec, (long long)pair.second.avgPullTimeNs,
(long long)pair.second.maxPullTimeNs, (long long)pair.second.avgPullDelayNs,
- (long long)pair.second.maxPullDelayNs, pair.second.dataError);
+ (long long)pair.second.maxPullDelayNs, pair.second.dataError,
+ pair.second.pullTimeout, pair.second.pullExceedMaxDelay);
}
if (mAnomalyAlarmRegisteredStats > 0) {
diff --git a/cmds/statsd/src/guardrail/StatsdStats.h b/cmds/statsd/src/guardrail/StatsdStats.h
index 3157037..dc647f8 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.h
+++ b/cmds/statsd/src/guardrail/StatsdStats.h
@@ -144,6 +144,8 @@
// How long to try to clear puller cache from last time
static const long kPullerCacheClearIntervalSec = 1;
+ // Max time to do a pull.
+ static const int64_t kPullMaxDelayNs = 10 * NS_PER_SEC;
/**
* Report a new config has been received and report the static stats about the config.
*
@@ -296,6 +298,16 @@
void notePullDelay(int pullAtomId, int64_t pullDelayNs);
/*
+ * Records pull exceeds timeout for the puller.
+ */
+ void notePullTimeout(int pullAtomId);
+
+ /*
+ * Records pull exceeds max delay for a metric.
+ */
+ void notePullExceedMaxDelay(int pullAtomId);
+
+ /*
* Records when system server restarts.
*/
void noteSystemServerRestart(int32_t timeSec);
@@ -335,6 +347,8 @@
int64_t maxPullDelayNs = 0;
long numPullDelay = 0;
long dataError = 0;
+ long pullTimeout = 0;
+ long pullExceedMaxDelay = 0;
} PulledAtomStats;
private:
diff --git a/cmds/statsd/src/logd/LogEvent.cpp b/cmds/statsd/src/logd/LogEvent.cpp
index 8d61aba..2ff8aa1 100644
--- a/cmds/statsd/src/logd/LogEvent.cpp
+++ b/cmds/statsd/src/logd/LogEvent.cpp
@@ -41,6 +41,14 @@
}
}
+LogEvent::LogEvent(const LogEvent& event) {
+ mTagId = event.mTagId;
+ mLogUid = event.mLogUid;
+ mElapsedTimestampNs = event.mElapsedTimestampNs;
+ mLogdTimestampNs = event.mLogdTimestampNs;
+ mValues = event.mValues;
+}
+
LogEvent::LogEvent(const StatsLogEventWrapper& statsLogEventWrapper, int workChainIndex) {
mTagId = statsLogEventWrapper.getTagId();
mLogdTimestampNs = statsLogEventWrapper.getWallClockTimeNs();
diff --git a/cmds/statsd/src/logd/LogEvent.h b/cmds/statsd/src/logd/LogEvent.h
index 4e37e9b..43e6e4f 100644
--- a/cmds/statsd/src/logd/LogEvent.h
+++ b/cmds/statsd/src/logd/LogEvent.h
@@ -207,10 +207,13 @@
return &mValues;
}
+ inline LogEvent makeCopy() {
+ return LogEvent(*this);
+ }
+
private:
/**
- * Don't copy, it's slower. If we really need this we can add it but let's try to
- * avoid it.
+ * Only use this if copy is absolutely needed.
*/
LogEvent(const LogEvent&);
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index ec60244..67a1a47 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -68,15 +68,12 @@
const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 7;
const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 8;
-GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
- const int conditionIndex,
- const sp<ConditionWizard>& wizard,
- const int whatMatcherIndex,
- const sp<EventMatcherWizard>& matcherWizard,
- const int pullTagId,
- const int triggerAtomId, const int atomId,
- const int64_t timeBaseNs, const int64_t startTimeNs,
- const sp<StatsPullerManager>& pullerManager)
+GaugeMetricProducer::GaugeMetricProducer(
+ const ConfigKey& key, const GaugeMetric& metric, const int conditionIndex,
+ const sp<ConditionWizard>& wizard, const int whatMatcherIndex,
+ const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int triggerAtomId,
+ const int atomId, const int64_t timeBaseNs, const int64_t startTimeNs,
+ const sp<StatsPullerManager>& pullerManager)
: MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard),
mWhatMatcherIndex(whatMatcherIndex),
mEventMatcherWizard(matcherWizard),
@@ -86,6 +83,8 @@
mAtomId(atomId),
mIsPulled(pullTagId != -1),
mMinBucketSizeNs(metric.min_bucket_size_nanos()),
+ mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
+ : StatsdStats::kPullMaxDelayNs),
mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
StatsdStats::kAtomDimensionKeySizeLimitMap.end()
? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
@@ -338,14 +337,24 @@
return;
}
vector<std::shared_ptr<LogEvent>> allData;
- if (!mPullerManager->Pull(mPullTagId, timestampNs, &allData)) {
+ if (!mPullerManager->Pull(mPullTagId, &allData)) {
ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
return;
}
+ const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs;
+ if (pullDelayNs > mMaxPullDelayNs) {
+ ALOGE("Pull finish too late for atom %d", mPullTagId);
+ StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
+ StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
+ return;
+ }
+ StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
for (const auto& data : allData) {
- if (mEventMatcherWizard->matchLogEvent(
- *data, mWhatMatcherIndex) == MatchingState::kMatched) {
- onMatchedLogEventLocked(mWhatMatcherIndex, *data);
+ LogEvent localCopy = data->makeCopy();
+ localCopy.setElapsedTimestampNs(timestampNs);
+ if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
+ MatchingState::kMatched) {
+ onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
}
}
}
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index 6e3530b..a1a5061 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -160,6 +160,8 @@
GaugeMetric::SamplingType mSamplingType;
+ const int64_t mMaxPullDelayNs;
+
// apply a whitelist on the original input
std::shared_ptr<vector<FieldValue>> getGaugeFields(const LogEvent& event);
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index cf56e2d..9a8e3bd 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -103,7 +103,9 @@
mValueDirection(metric.value_direction()),
mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
mUseZeroDefaultBase(metric.use_zero_default_base()),
- mHasGlobalBase(false) {
+ mHasGlobalBase(false),
+ mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
+ : StatsdStats::kPullMaxDelayNs) {
int64_t bucketSizeMills = 0;
if (metric.has_bucket()) {
bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
@@ -340,19 +342,32 @@
void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
vector<std::shared_ptr<LogEvent>> allData;
- if (mPullerManager->Pull(mPullTagId, timestampNs, &allData)) {
- for (const auto& data : allData) {
- if (mEventMatcherWizard->matchLogEvent(
- *data, mWhatMatcherIndex) == MatchingState::kMatched) {
- onMatchedLogEventLocked(mWhatMatcherIndex, *data);
- }
- }
- mHasGlobalBase = true;
- } else {
- // for pulled data, every pull is needed. So we reset the base if any
- // pull fails.
+ if (!mPullerManager->Pull(mPullTagId, &allData)) {
+ ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
resetBase();
+ return;
}
+ const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs;
+ if (pullDelayNs > mMaxPullDelayNs) {
+ ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
+ (long long)mMaxPullDelayNs);
+ StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
+ StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
+ resetBase();
+ return;
+ }
+ StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
+
+ for (const auto& data : allData) {
+ // make a copy before doing and changes
+ LogEvent localCopy = data->makeCopy();
+ localCopy.setElapsedTimestampNs(timestampNs);
+ if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
+ MatchingState::kMatched) {
+ onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
+ }
+ }
+ mHasGlobalBase = true;
}
int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
@@ -381,10 +396,11 @@
return;
}
for (const auto& data : allData) {
- if (mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex) ==
+ LogEvent localCopy = data->makeCopy();
+ if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
MatchingState::kMatched) {
- data->setElapsedTimestampNs(bucketEndTime);
- onMatchedLogEventLocked(mWhatMatcherIndex, *data);
+ localCopy.setElapsedTimestampNs(bucketEndTime);
+ onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
}
}
mHasGlobalBase = true;
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 4991af4..4865aee 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -183,6 +183,8 @@
// diff against.
bool mHasGlobalBase;
+ const int64_t mMaxPullDelayNs;
+
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsNoCondition);
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering);
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset);
@@ -207,6 +209,8 @@
FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase);
FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures);
FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFail);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate);
};
} // namespace statsd
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp
index dffff7a..22883f3 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.cpp
+++ b/cmds/statsd/src/shell/ShellSubscriber.cpp
@@ -131,8 +131,7 @@
VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());
vector<std::shared_ptr<LogEvent>> data;
- mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), nowMillis * 1000000L,
- &data);
+ mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data);
VLOG("pulled %zu atoms", data.size());
if (data.size() > 0) {
writeToOutputLocked(data, pullInfo.mPullerMatcher);
diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto
index 5a87e46..e8de875 100644
--- a/cmds/statsd/src/stats_log.proto
+++ b/cmds/statsd/src/stats_log.proto
@@ -401,6 +401,8 @@
optional int64 average_pull_delay_nanos = 7;
optional int64 max_pull_delay_nanos = 8;
optional int64 data_error = 9;
+ optional int64 pull_timeout = 10;
+ optional int64 pull_exceed_max_delay = 11;
}
repeated PulledAtomStats pulled_atom_stats = 10;
diff --git a/cmds/statsd/src/stats_log_util.cpp b/cmds/statsd/src/stats_log_util.cpp
index f1310db..7de0bb3 100644
--- a/cmds/statsd/src/stats_log_util.cpp
+++ b/cmds/statsd/src/stats_log_util.cpp
@@ -64,6 +64,8 @@
const int FIELD_ID_AVERAGE_PULL_DELAY_NANOS = 7;
const int FIELD_ID_MAX_PULL_DELAY_NANOS = 8;
const int FIELD_ID_DATA_ERROR = 9;
+const int FIELD_ID_PULL_TIMEOUT = 10;
+const int FIELD_ID_PULL_EXCEED_MAX_DELAY = 11;
namespace {
@@ -450,6 +452,10 @@
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_MAX_PULL_DELAY_NANOS,
(long long)pair.second.maxPullDelayNs);
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DATA_ERROR, (long long)pair.second.dataError);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_PULL_TIMEOUT,
+ (long long)pair.second.pullTimeout);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_PULL_EXCEED_MAX_DELAY,
+ (long long)pair.second.pullExceedMaxDelay);
protoOutput->end(token);
}
diff --git a/cmds/statsd/src/statsd_config.proto b/cmds/statsd/src/statsd_config.proto
index f485185..381ac32 100644
--- a/cmds/statsd/src/statsd_config.proto
+++ b/cmds/statsd/src/statsd_config.proto
@@ -240,7 +240,10 @@
optional SamplingType sampling_type = 9 [default = RANDOM_ONE_SAMPLE] ;
optional int64 min_bucket_size_nanos = 10;
+
optional int64 max_num_gauge_atoms_per_bucket = 11 [default = 10];
+
+ optional int32 max_pull_delay_sec = 13 [default = 10];
}
message ValueMetric {
@@ -285,6 +288,8 @@
optional ValueDirection value_direction = 13 [default = INCREASING];
optional bool skip_zero_diff_output = 14 [default = true];
+
+ optional int32 max_pull_delay_sec = 16 [default = 10];
}
message Alert {